1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21#include "qemu/osdep.h"
22#include "io/channel.h"
23#include "qapi/error.h"
24#include "qemu/main-loop.h"
25#include "qemu/module.h"
26#include "qemu/iov.h"
27
28bool qio_channel_has_feature(QIOChannel *ioc,
29 QIOChannelFeature feature)
30{
31 return ioc->features & (1 << feature);
32}
33
34
35void qio_channel_set_feature(QIOChannel *ioc,
36 QIOChannelFeature feature)
37{
38 ioc->features |= (1 << feature);
39}
40
41
42void qio_channel_set_name(QIOChannel *ioc,
43 const char *name)
44{
45 g_free(ioc->name);
46 ioc->name = g_strdup(name);
47}
48
49
50ssize_t qio_channel_readv_full(QIOChannel *ioc,
51 const struct iovec *iov,
52 size_t niov,
53 int **fds,
54 size_t *nfds,
55 Error **errp)
56{
57 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
58
59 if ((fds || nfds) &&
60 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
61 error_setg_errno(errp, EINVAL,
62 "Channel does not support file descriptor passing");
63 return -1;
64 }
65
66 return klass->io_readv(ioc, iov, niov, fds, nfds, errp);
67}
68
69
70ssize_t qio_channel_writev_full(QIOChannel *ioc,
71 const struct iovec *iov,
72 size_t niov,
73 int *fds,
74 size_t nfds,
75 Error **errp)
76{
77 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
78
79 if ((fds || nfds) &&
80 !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
81 error_setg_errno(errp, EINVAL,
82 "Channel does not support file descriptor passing");
83 return -1;
84 }
85
86 return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
87}
88
89
90int qio_channel_readv_all_eof(QIOChannel *ioc,
91 const struct iovec *iov,
92 size_t niov,
93 Error **errp)
94{
95 int ret = -1;
96 struct iovec *local_iov = g_new(struct iovec, niov);
97 struct iovec *local_iov_head = local_iov;
98 unsigned int nlocal_iov = niov;
99 bool partial = false;
100
101 nlocal_iov = iov_copy(local_iov, nlocal_iov,
102 iov, niov,
103 0, iov_size(iov, niov));
104
105 while (nlocal_iov > 0) {
106 ssize_t len;
107 len = qio_channel_readv(ioc, local_iov, nlocal_iov, errp);
108 if (len == QIO_CHANNEL_ERR_BLOCK) {
109 if (qemu_in_coroutine()) {
110 qio_channel_yield(ioc, G_IO_IN);
111 } else {
112 qio_channel_wait(ioc, G_IO_IN);
113 }
114 continue;
115 } else if (len < 0) {
116 goto cleanup;
117 } else if (len == 0) {
118 if (partial) {
119 error_setg(errp,
120 "Unexpected end-of-file before all bytes were read");
121 } else {
122 ret = 0;
123 }
124 goto cleanup;
125 }
126
127 partial = true;
128 iov_discard_front(&local_iov, &nlocal_iov, len);
129 }
130
131 ret = 1;
132
133 cleanup:
134 g_free(local_iov_head);
135 return ret;
136}
137
138int qio_channel_readv_all(QIOChannel *ioc,
139 const struct iovec *iov,
140 size_t niov,
141 Error **errp)
142{
143 int ret = qio_channel_readv_all_eof(ioc, iov, niov, errp);
144
145 if (ret == 0) {
146 ret = -1;
147 error_setg(errp,
148 "Unexpected end-of-file before all bytes were read");
149 } else if (ret == 1) {
150 ret = 0;
151 }
152 return ret;
153}
154
155int qio_channel_writev_all(QIOChannel *ioc,
156 const struct iovec *iov,
157 size_t niov,
158 Error **errp)
159{
160 int ret = -1;
161 struct iovec *local_iov = g_new(struct iovec, niov);
162 struct iovec *local_iov_head = local_iov;
163 unsigned int nlocal_iov = niov;
164
165 nlocal_iov = iov_copy(local_iov, nlocal_iov,
166 iov, niov,
167 0, iov_size(iov, niov));
168
169 while (nlocal_iov > 0) {
170 ssize_t len;
171 len = qio_channel_writev(ioc, local_iov, nlocal_iov, errp);
172 if (len == QIO_CHANNEL_ERR_BLOCK) {
173 if (qemu_in_coroutine()) {
174 qio_channel_yield(ioc, G_IO_OUT);
175 } else {
176 qio_channel_wait(ioc, G_IO_OUT);
177 }
178 continue;
179 }
180 if (len < 0) {
181 goto cleanup;
182 }
183
184 iov_discard_front(&local_iov, &nlocal_iov, len);
185 }
186
187 ret = 0;
188 cleanup:
189 g_free(local_iov_head);
190 return ret;
191}
192
193ssize_t qio_channel_readv(QIOChannel *ioc,
194 const struct iovec *iov,
195 size_t niov,
196 Error **errp)
197{
198 return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp);
199}
200
201
202ssize_t qio_channel_writev(QIOChannel *ioc,
203 const struct iovec *iov,
204 size_t niov,
205 Error **errp)
206{
207 return qio_channel_writev_full(ioc, iov, niov, NULL, 0, errp);
208}
209
210
211ssize_t qio_channel_read(QIOChannel *ioc,
212 char *buf,
213 size_t buflen,
214 Error **errp)
215{
216 struct iovec iov = { .iov_base = buf, .iov_len = buflen };
217 return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp);
218}
219
220
221ssize_t qio_channel_write(QIOChannel *ioc,
222 const char *buf,
223 size_t buflen,
224 Error **errp)
225{
226 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
227 return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, errp);
228}
229
230
231int qio_channel_read_all_eof(QIOChannel *ioc,
232 char *buf,
233 size_t buflen,
234 Error **errp)
235{
236 struct iovec iov = { .iov_base = buf, .iov_len = buflen };
237 return qio_channel_readv_all_eof(ioc, &iov, 1, errp);
238}
239
240
241int qio_channel_read_all(QIOChannel *ioc,
242 char *buf,
243 size_t buflen,
244 Error **errp)
245{
246 struct iovec iov = { .iov_base = buf, .iov_len = buflen };
247 return qio_channel_readv_all(ioc, &iov, 1, errp);
248}
249
250
251int qio_channel_write_all(QIOChannel *ioc,
252 const char *buf,
253 size_t buflen,
254 Error **errp)
255{
256 struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
257 return qio_channel_writev_all(ioc, &iov, 1, errp);
258}
259
260
261int qio_channel_set_blocking(QIOChannel *ioc,
262 bool enabled,
263 Error **errp)
264{
265 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
266 return klass->io_set_blocking(ioc, enabled, errp);
267}
268
269
270int qio_channel_close(QIOChannel *ioc,
271 Error **errp)
272{
273 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
274 return klass->io_close(ioc, errp);
275}
276
277
278GSource *qio_channel_create_watch(QIOChannel *ioc,
279 GIOCondition condition)
280{
281 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
282 GSource *ret = klass->io_create_watch(ioc, condition);
283
284 if (ioc->name) {
285 g_source_set_name(ret, ioc->name);
286 }
287
288 return ret;
289}
290
291
292void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
293 AioContext *ctx,
294 IOHandler *io_read,
295 IOHandler *io_write,
296 void *opaque)
297{
298 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
299
300 klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
301}
302
303guint qio_channel_add_watch_full(QIOChannel *ioc,
304 GIOCondition condition,
305 QIOChannelFunc func,
306 gpointer user_data,
307 GDestroyNotify notify,
308 GMainContext *context)
309{
310 GSource *source;
311 guint id;
312
313 source = qio_channel_create_watch(ioc, condition);
314
315 g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
316
317 id = g_source_attach(source, context);
318 g_source_unref(source);
319
320 return id;
321}
322
323guint qio_channel_add_watch(QIOChannel *ioc,
324 GIOCondition condition,
325 QIOChannelFunc func,
326 gpointer user_data,
327 GDestroyNotify notify)
328{
329 return qio_channel_add_watch_full(ioc, condition, func,
330 user_data, notify, NULL);
331}
332
333GSource *qio_channel_add_watch_source(QIOChannel *ioc,
334 GIOCondition condition,
335 QIOChannelFunc func,
336 gpointer user_data,
337 GDestroyNotify notify,
338 GMainContext *context)
339{
340 GSource *source;
341 guint id;
342
343 id = qio_channel_add_watch_full(ioc, condition, func,
344 user_data, notify, context);
345 source = g_main_context_find_source_by_id(context, id);
346 g_source_ref(source);
347 return source;
348}
349
350
351int qio_channel_shutdown(QIOChannel *ioc,
352 QIOChannelShutdown how,
353 Error **errp)
354{
355 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
356
357 if (!klass->io_shutdown) {
358 error_setg(errp, "Data path shutdown not supported");
359 return -1;
360 }
361
362 return klass->io_shutdown(ioc, how, errp);
363}
364
365
366void qio_channel_set_delay(QIOChannel *ioc,
367 bool enabled)
368{
369 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
370
371 if (klass->io_set_delay) {
372 klass->io_set_delay(ioc, enabled);
373 }
374}
375
376
377void qio_channel_set_cork(QIOChannel *ioc,
378 bool enabled)
379{
380 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
381
382 if (klass->io_set_cork) {
383 klass->io_set_cork(ioc, enabled);
384 }
385}
386
387
388off_t qio_channel_io_seek(QIOChannel *ioc,
389 off_t offset,
390 int whence,
391 Error **errp)
392{
393 QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
394
395 if (!klass->io_seek) {
396 error_setg(errp, "Channel does not support random access");
397 return -1;
398 }
399
400 return klass->io_seek(ioc, offset, whence, errp);
401}
402
403
404static void qio_channel_restart_read(void *opaque)
405{
406 QIOChannel *ioc = opaque;
407 Coroutine *co = ioc->read_coroutine;
408
409
410 assert(qemu_get_current_aio_context() ==
411 qemu_coroutine_get_aio_context(co));
412 aio_co_wake(co);
413}
414
415static void qio_channel_restart_write(void *opaque)
416{
417 QIOChannel *ioc = opaque;
418 Coroutine *co = ioc->write_coroutine;
419
420
421 assert(qemu_get_current_aio_context() ==
422 qemu_coroutine_get_aio_context(co));
423 aio_co_wake(co);
424}
425
426static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
427{
428 IOHandler *rd_handler = NULL, *wr_handler = NULL;
429 AioContext *ctx;
430
431 if (ioc->read_coroutine) {
432 rd_handler = qio_channel_restart_read;
433 }
434 if (ioc->write_coroutine) {
435 wr_handler = qio_channel_restart_write;
436 }
437
438 ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
439 qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
440}
441
442void qio_channel_attach_aio_context(QIOChannel *ioc,
443 AioContext *ctx)
444{
445 assert(!ioc->read_coroutine);
446 assert(!ioc->write_coroutine);
447 ioc->ctx = ctx;
448}
449
450void qio_channel_detach_aio_context(QIOChannel *ioc)
451{
452 ioc->read_coroutine = NULL;
453 ioc->write_coroutine = NULL;
454 qio_channel_set_aio_fd_handlers(ioc);
455 ioc->ctx = NULL;
456}
457
458void coroutine_fn qio_channel_yield(QIOChannel *ioc,
459 GIOCondition condition)
460{
461 assert(qemu_in_coroutine());
462 if (condition == G_IO_IN) {
463 assert(!ioc->read_coroutine);
464 ioc->read_coroutine = qemu_coroutine_self();
465 } else if (condition == G_IO_OUT) {
466 assert(!ioc->write_coroutine);
467 ioc->write_coroutine = qemu_coroutine_self();
468 } else {
469 abort();
470 }
471 qio_channel_set_aio_fd_handlers(ioc);
472 qemu_coroutine_yield();
473
474
475
476 if (condition == G_IO_IN && ioc->read_coroutine) {
477 ioc->read_coroutine = NULL;
478 qio_channel_set_aio_fd_handlers(ioc);
479 } else if (condition == G_IO_OUT && ioc->write_coroutine) {
480 ioc->write_coroutine = NULL;
481 qio_channel_set_aio_fd_handlers(ioc);
482 }
483}
484
485
486static gboolean qio_channel_wait_complete(QIOChannel *ioc,
487 GIOCondition condition,
488 gpointer opaque)
489{
490 GMainLoop *loop = opaque;
491
492 g_main_loop_quit(loop);
493 return FALSE;
494}
495
496
497void qio_channel_wait(QIOChannel *ioc,
498 GIOCondition condition)
499{
500 GMainContext *ctxt = g_main_context_new();
501 GMainLoop *loop = g_main_loop_new(ctxt, TRUE);
502 GSource *source;
503
504 source = qio_channel_create_watch(ioc, condition);
505
506 g_source_set_callback(source,
507 (GSourceFunc)qio_channel_wait_complete,
508 loop,
509 NULL);
510
511 g_source_attach(source, ctxt);
512
513 g_main_loop_run(loop);
514
515 g_source_unref(source);
516 g_main_loop_unref(loop);
517 g_main_context_unref(ctxt);
518}
519
520
521static void qio_channel_finalize(Object *obj)
522{
523 QIOChannel *ioc = QIO_CHANNEL(obj);
524
525 g_free(ioc->name);
526
527#ifdef _WIN32
528 if (ioc->event) {
529 CloseHandle(ioc->event);
530 }
531#endif
532}
533
534static const TypeInfo qio_channel_info = {
535 .parent = TYPE_OBJECT,
536 .name = TYPE_QIO_CHANNEL,
537 .instance_size = sizeof(QIOChannel),
538 .instance_finalize = qio_channel_finalize,
539 .abstract = true,
540 .class_size = sizeof(QIOChannelClass),
541};
542
543
544static void qio_channel_register_types(void)
545{
546 type_register_static(&qio_channel_info);
547}
548
549
550type_init(qio_channel_register_types);
551