qemu/io/channel.c
<<
>>
Prefs
   1/*
   2 * QEMU I/O channels
   3 *
   4 * Copyright (c) 2015 Red Hat, Inc.
   5 *
   6 * This library is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU Lesser General Public
   8 * License as published by the Free Software Foundation; either
   9 * version 2.1 of the License, or (at your option) any later version.
  10 *
  11 * This library is distributed in the hope that it will be useful,
  12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * Lesser General Public License for more details.
  15 *
  16 * You should have received a copy of the GNU Lesser General Public
  17 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
  18 *
  19 */
  20
  21#include "qemu/osdep.h"
  22#include "io/channel.h"
  23#include "qapi/error.h"
  24#include "qemu/main-loop.h"
  25#include "qemu/module.h"
  26#include "qemu/iov.h"
  27
  28bool qio_channel_has_feature(QIOChannel *ioc,
  29                             QIOChannelFeature feature)
  30{
  31    return ioc->features & (1 << feature);
  32}
  33
  34
  35void qio_channel_set_feature(QIOChannel *ioc,
  36                             QIOChannelFeature feature)
  37{
  38    ioc->features |= (1 << feature);
  39}
  40
  41
  42void qio_channel_set_name(QIOChannel *ioc,
  43                          const char *name)
  44{
  45    g_free(ioc->name);
  46    ioc->name = g_strdup(name);
  47}
  48
  49
  50ssize_t qio_channel_readv_full(QIOChannel *ioc,
  51                               const struct iovec *iov,
  52                               size_t niov,
  53                               int **fds,
  54                               size_t *nfds,
  55                               Error **errp)
  56{
  57    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
  58
  59    if ((fds || nfds) &&
  60        !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
  61        error_setg_errno(errp, EINVAL,
  62                         "Channel does not support file descriptor passing");
  63        return -1;
  64    }
  65
  66    return klass->io_readv(ioc, iov, niov, fds, nfds, errp);
  67}
  68
  69
  70ssize_t qio_channel_writev_full(QIOChannel *ioc,
  71                                const struct iovec *iov,
  72                                size_t niov,
  73                                int *fds,
  74                                size_t nfds,
  75                                Error **errp)
  76{
  77    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
  78
  79    if ((fds || nfds) &&
  80        !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
  81        error_setg_errno(errp, EINVAL,
  82                         "Channel does not support file descriptor passing");
  83        return -1;
  84    }
  85
  86    return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
  87}
  88
  89
  90int qio_channel_readv_all_eof(QIOChannel *ioc,
  91                              const struct iovec *iov,
  92                              size_t niov,
  93                              Error **errp)
  94{
  95    return qio_channel_readv_full_all_eof(ioc, iov, niov, NULL, NULL, errp);
  96}
  97
  98int qio_channel_readv_all(QIOChannel *ioc,
  99                          const struct iovec *iov,
 100                          size_t niov,
 101                          Error **errp)
 102{
 103    return qio_channel_readv_full_all(ioc, iov, niov, NULL, NULL, errp);
 104}
 105
 106int qio_channel_readv_full_all_eof(QIOChannel *ioc,
 107                                   const struct iovec *iov,
 108                                   size_t niov,
 109                                   int **fds, size_t *nfds,
 110                                   Error **errp)
 111{
 112    int ret = -1;
 113    struct iovec *local_iov = g_new(struct iovec, niov);
 114    struct iovec *local_iov_head = local_iov;
 115    unsigned int nlocal_iov = niov;
 116    int **local_fds = fds;
 117    size_t *local_nfds = nfds;
 118    bool partial = false;
 119
 120    if (nfds) {
 121        *nfds = 0;
 122    }
 123
 124    if (fds) {
 125        *fds = NULL;
 126    }
 127
 128    nlocal_iov = iov_copy(local_iov, nlocal_iov,
 129                          iov, niov,
 130                          0, iov_size(iov, niov));
 131
 132    while ((nlocal_iov > 0) || local_fds) {
 133        ssize_t len;
 134        len = qio_channel_readv_full(ioc, local_iov, nlocal_iov, local_fds,
 135                                     local_nfds, errp);
 136        if (len == QIO_CHANNEL_ERR_BLOCK) {
 137            if (qemu_in_coroutine()) {
 138                qio_channel_yield(ioc, G_IO_IN);
 139            } else {
 140                qio_channel_wait(ioc, G_IO_IN);
 141            }
 142            continue;
 143        }
 144
 145        if (len == 0) {
 146            if (local_nfds && *local_nfds) {
 147                /*
 148                 * Got some FDs, but no data yet. This isn't an EOF
 149                 * scenario (yet), so carry on to try to read data
 150                 * on next loop iteration
 151                 */
 152                goto next_iter;
 153            } else if (!partial) {
 154                /* No fds and no data - EOF before any data read */
 155                ret = 0;
 156                goto cleanup;
 157            } else {
 158                len = -1;
 159                error_setg(errp,
 160                           "Unexpected end-of-file before all data were read");
 161                /* Fallthrough into len < 0 handling */
 162            }
 163        }
 164
 165        if (len < 0) {
 166            /* Close any FDs we previously received */
 167            if (nfds && fds) {
 168                size_t i;
 169                for (i = 0; i < (*nfds); i++) {
 170                    close((*fds)[i]);
 171                }
 172                g_free(*fds);
 173                *fds = NULL;
 174                *nfds = 0;
 175            }
 176            goto cleanup;
 177        }
 178
 179        if (nlocal_iov) {
 180            iov_discard_front(&local_iov, &nlocal_iov, len);
 181        }
 182
 183next_iter:
 184        partial = true;
 185        local_fds = NULL;
 186        local_nfds = NULL;
 187    }
 188
 189    ret = 1;
 190
 191 cleanup:
 192    g_free(local_iov_head);
 193    return ret;
 194}
 195
 196int qio_channel_readv_full_all(QIOChannel *ioc,
 197                               const struct iovec *iov,
 198                               size_t niov,
 199                               int **fds, size_t *nfds,
 200                               Error **errp)
 201{
 202    int ret = qio_channel_readv_full_all_eof(ioc, iov, niov, fds, nfds, errp);
 203
 204    if (ret == 0) {
 205        error_setg(errp, "Unexpected end-of-file before all data were read");
 206        return -1;
 207    }
 208    if (ret == 1) {
 209        return 0;
 210    }
 211
 212    return ret;
 213}
 214
 215int qio_channel_writev_all(QIOChannel *ioc,
 216                           const struct iovec *iov,
 217                           size_t niov,
 218                           Error **errp)
 219{
 220    return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp);
 221}
 222
 223int qio_channel_writev_full_all(QIOChannel *ioc,
 224                                const struct iovec *iov,
 225                                size_t niov,
 226                                int *fds, size_t nfds,
 227                                Error **errp)
 228{
 229    int ret = -1;
 230    struct iovec *local_iov = g_new(struct iovec, niov);
 231    struct iovec *local_iov_head = local_iov;
 232    unsigned int nlocal_iov = niov;
 233
 234    nlocal_iov = iov_copy(local_iov, nlocal_iov,
 235                          iov, niov,
 236                          0, iov_size(iov, niov));
 237
 238    while (nlocal_iov > 0) {
 239        ssize_t len;
 240        len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
 241                                      errp);
 242        if (len == QIO_CHANNEL_ERR_BLOCK) {
 243            if (qemu_in_coroutine()) {
 244                qio_channel_yield(ioc, G_IO_OUT);
 245            } else {
 246                qio_channel_wait(ioc, G_IO_OUT);
 247            }
 248            continue;
 249        }
 250        if (len < 0) {
 251            goto cleanup;
 252        }
 253
 254        iov_discard_front(&local_iov, &nlocal_iov, len);
 255
 256        fds = NULL;
 257        nfds = 0;
 258    }
 259
 260    ret = 0;
 261 cleanup:
 262    g_free(local_iov_head);
 263    return ret;
 264}
 265
 266ssize_t qio_channel_readv(QIOChannel *ioc,
 267                          const struct iovec *iov,
 268                          size_t niov,
 269                          Error **errp)
 270{
 271    return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp);
 272}
 273
 274
 275ssize_t qio_channel_writev(QIOChannel *ioc,
 276                           const struct iovec *iov,
 277                           size_t niov,
 278                           Error **errp)
 279{
 280    return qio_channel_writev_full(ioc, iov, niov, NULL, 0, errp);
 281}
 282
 283
 284ssize_t qio_channel_read(QIOChannel *ioc,
 285                         char *buf,
 286                         size_t buflen,
 287                         Error **errp)
 288{
 289    struct iovec iov = { .iov_base = buf, .iov_len = buflen };
 290    return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp);
 291}
 292
 293
 294ssize_t qio_channel_write(QIOChannel *ioc,
 295                          const char *buf,
 296                          size_t buflen,
 297                          Error **errp)
 298{
 299    struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
 300    return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, errp);
 301}
 302
 303
 304int qio_channel_read_all_eof(QIOChannel *ioc,
 305                             char *buf,
 306                             size_t buflen,
 307                             Error **errp)
 308{
 309    struct iovec iov = { .iov_base = buf, .iov_len = buflen };
 310    return qio_channel_readv_all_eof(ioc, &iov, 1, errp);
 311}
 312
 313
 314int qio_channel_read_all(QIOChannel *ioc,
 315                         char *buf,
 316                         size_t buflen,
 317                         Error **errp)
 318{
 319    struct iovec iov = { .iov_base = buf, .iov_len = buflen };
 320    return qio_channel_readv_all(ioc, &iov, 1, errp);
 321}
 322
 323
 324int qio_channel_write_all(QIOChannel *ioc,
 325                          const char *buf,
 326                          size_t buflen,
 327                          Error **errp)
 328{
 329    struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
 330    return qio_channel_writev_all(ioc, &iov, 1, errp);
 331}
 332
 333
 334int qio_channel_set_blocking(QIOChannel *ioc,
 335                              bool enabled,
 336                              Error **errp)
 337{
 338    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 339    return klass->io_set_blocking(ioc, enabled, errp);
 340}
 341
 342
 343int qio_channel_close(QIOChannel *ioc,
 344                      Error **errp)
 345{
 346    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 347    return klass->io_close(ioc, errp);
 348}
 349
 350
 351GSource *qio_channel_create_watch(QIOChannel *ioc,
 352                                  GIOCondition condition)
 353{
 354    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 355    GSource *ret = klass->io_create_watch(ioc, condition);
 356
 357    if (ioc->name) {
 358        g_source_set_name(ret, ioc->name);
 359    }
 360
 361    return ret;
 362}
 363
 364
 365void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
 366                                    AioContext *ctx,
 367                                    IOHandler *io_read,
 368                                    IOHandler *io_write,
 369                                    void *opaque)
 370{
 371    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 372
 373    klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
 374}
 375
 376guint qio_channel_add_watch_full(QIOChannel *ioc,
 377                                 GIOCondition condition,
 378                                 QIOChannelFunc func,
 379                                 gpointer user_data,
 380                                 GDestroyNotify notify,
 381                                 GMainContext *context)
 382{
 383    GSource *source;
 384    guint id;
 385
 386    source = qio_channel_create_watch(ioc, condition);
 387
 388    g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
 389
 390    id = g_source_attach(source, context);
 391    g_source_unref(source);
 392
 393    return id;
 394}
 395
 396guint qio_channel_add_watch(QIOChannel *ioc,
 397                            GIOCondition condition,
 398                            QIOChannelFunc func,
 399                            gpointer user_data,
 400                            GDestroyNotify notify)
 401{
 402    return qio_channel_add_watch_full(ioc, condition, func,
 403                                      user_data, notify, NULL);
 404}
 405
 406GSource *qio_channel_add_watch_source(QIOChannel *ioc,
 407                                      GIOCondition condition,
 408                                      QIOChannelFunc func,
 409                                      gpointer user_data,
 410                                      GDestroyNotify notify,
 411                                      GMainContext *context)
 412{
 413    GSource *source;
 414    guint id;
 415
 416    id = qio_channel_add_watch_full(ioc, condition, func,
 417                                    user_data, notify, context);
 418    source = g_main_context_find_source_by_id(context, id);
 419    g_source_ref(source);
 420    return source;
 421}
 422
 423
 424int qio_channel_shutdown(QIOChannel *ioc,
 425                         QIOChannelShutdown how,
 426                         Error **errp)
 427{
 428    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 429
 430    if (!klass->io_shutdown) {
 431        error_setg(errp, "Data path shutdown not supported");
 432        return -1;
 433    }
 434
 435    return klass->io_shutdown(ioc, how, errp);
 436}
 437
 438
 439void qio_channel_set_delay(QIOChannel *ioc,
 440                           bool enabled)
 441{
 442    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 443
 444    if (klass->io_set_delay) {
 445        klass->io_set_delay(ioc, enabled);
 446    }
 447}
 448
 449
 450void qio_channel_set_cork(QIOChannel *ioc,
 451                          bool enabled)
 452{
 453    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 454
 455    if (klass->io_set_cork) {
 456        klass->io_set_cork(ioc, enabled);
 457    }
 458}
 459
 460
 461off_t qio_channel_io_seek(QIOChannel *ioc,
 462                          off_t offset,
 463                          int whence,
 464                          Error **errp)
 465{
 466    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 467
 468    if (!klass->io_seek) {
 469        error_setg(errp, "Channel does not support random access");
 470        return -1;
 471    }
 472
 473    return klass->io_seek(ioc, offset, whence, errp);
 474}
 475
 476
 477static void qio_channel_restart_read(void *opaque)
 478{
 479    QIOChannel *ioc = opaque;
 480    Coroutine *co = ioc->read_coroutine;
 481
 482    /* Assert that aio_co_wake() reenters the coroutine directly */
 483    assert(qemu_get_current_aio_context() ==
 484           qemu_coroutine_get_aio_context(co));
 485    aio_co_wake(co);
 486}
 487
 488static void qio_channel_restart_write(void *opaque)
 489{
 490    QIOChannel *ioc = opaque;
 491    Coroutine *co = ioc->write_coroutine;
 492
 493    /* Assert that aio_co_wake() reenters the coroutine directly */
 494    assert(qemu_get_current_aio_context() ==
 495           qemu_coroutine_get_aio_context(co));
 496    aio_co_wake(co);
 497}
 498
 499static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
 500{
 501    IOHandler *rd_handler = NULL, *wr_handler = NULL;
 502    AioContext *ctx;
 503
 504    if (ioc->read_coroutine) {
 505        rd_handler = qio_channel_restart_read;
 506    }
 507    if (ioc->write_coroutine) {
 508        wr_handler = qio_channel_restart_write;
 509    }
 510
 511    ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
 512    qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
 513}
 514
 515void qio_channel_attach_aio_context(QIOChannel *ioc,
 516                                    AioContext *ctx)
 517{
 518    assert(!ioc->read_coroutine);
 519    assert(!ioc->write_coroutine);
 520    ioc->ctx = ctx;
 521}
 522
 523void qio_channel_detach_aio_context(QIOChannel *ioc)
 524{
 525    ioc->read_coroutine = NULL;
 526    ioc->write_coroutine = NULL;
 527    qio_channel_set_aio_fd_handlers(ioc);
 528    ioc->ctx = NULL;
 529}
 530
 531void coroutine_fn qio_channel_yield(QIOChannel *ioc,
 532                                    GIOCondition condition)
 533{
 534    assert(qemu_in_coroutine());
 535    if (condition == G_IO_IN) {
 536        assert(!ioc->read_coroutine);
 537        ioc->read_coroutine = qemu_coroutine_self();
 538    } else if (condition == G_IO_OUT) {
 539        assert(!ioc->write_coroutine);
 540        ioc->write_coroutine = qemu_coroutine_self();
 541    } else {
 542        abort();
 543    }
 544    qio_channel_set_aio_fd_handlers(ioc);
 545    qemu_coroutine_yield();
 546
 547    /* Allow interrupting the operation by reentering the coroutine other than
 548     * through the aio_fd_handlers. */
 549    if (condition == G_IO_IN && ioc->read_coroutine) {
 550        ioc->read_coroutine = NULL;
 551        qio_channel_set_aio_fd_handlers(ioc);
 552    } else if (condition == G_IO_OUT && ioc->write_coroutine) {
 553        ioc->write_coroutine = NULL;
 554        qio_channel_set_aio_fd_handlers(ioc);
 555    }
 556}
 557
 558
 559static gboolean qio_channel_wait_complete(QIOChannel *ioc,
 560                                          GIOCondition condition,
 561                                          gpointer opaque)
 562{
 563    GMainLoop *loop = opaque;
 564
 565    g_main_loop_quit(loop);
 566    return FALSE;
 567}
 568
 569
 570void qio_channel_wait(QIOChannel *ioc,
 571                      GIOCondition condition)
 572{
 573    GMainContext *ctxt = g_main_context_new();
 574    GMainLoop *loop = g_main_loop_new(ctxt, TRUE);
 575    GSource *source;
 576
 577    source = qio_channel_create_watch(ioc, condition);
 578
 579    g_source_set_callback(source,
 580                          (GSourceFunc)qio_channel_wait_complete,
 581                          loop,
 582                          NULL);
 583
 584    g_source_attach(source, ctxt);
 585
 586    g_main_loop_run(loop);
 587
 588    g_source_unref(source);
 589    g_main_loop_unref(loop);
 590    g_main_context_unref(ctxt);
 591}
 592
 593
 594static void qio_channel_finalize(Object *obj)
 595{
 596    QIOChannel *ioc = QIO_CHANNEL(obj);
 597
 598    g_free(ioc->name);
 599
 600#ifdef _WIN32
 601    if (ioc->event) {
 602        CloseHandle(ioc->event);
 603    }
 604#endif
 605}
 606
 607static const TypeInfo qio_channel_info = {
 608    .parent = TYPE_OBJECT,
 609    .name = TYPE_QIO_CHANNEL,
 610    .instance_size = sizeof(QIOChannel),
 611    .instance_finalize = qio_channel_finalize,
 612    .abstract = true,
 613    .class_size = sizeof(QIOChannelClass),
 614};
 615
 616
 617static void qio_channel_register_types(void)
 618{
 619    type_register_static(&qio_channel_info);
 620}
 621
 622
 623type_init(qio_channel_register_types);
 624