qemu/io/channel.c
<<
>>
Prefs
   1/*
   2 * QEMU I/O channels
   3 *
   4 * Copyright (c) 2015 Red Hat, Inc.
   5 *
   6 * This library is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU Lesser General Public
   8 * License as published by the Free Software Foundation; either
   9 * version 2.1 of the License, or (at your option) any later version.
  10 *
  11 * This library is distributed in the hope that it will be useful,
  12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * Lesser General Public License for more details.
  15 *
  16 * You should have received a copy of the GNU Lesser General Public
  17 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
  18 *
  19 */
  20
  21#include "qemu/osdep.h"
  22#include "io/channel.h"
  23#include "qapi/error.h"
  24#include "qemu/main-loop.h"
  25#include "qemu/module.h"
  26#include "qemu/iov.h"
  27
  28bool qio_channel_has_feature(QIOChannel *ioc,
  29                             QIOChannelFeature feature)
  30{
  31    return ioc->features & (1 << feature);
  32}
  33
  34
  35void qio_channel_set_feature(QIOChannel *ioc,
  36                             QIOChannelFeature feature)
  37{
  38    ioc->features |= (1 << feature);
  39}
  40
  41
  42void qio_channel_set_name(QIOChannel *ioc,
  43                          const char *name)
  44{
  45    g_free(ioc->name);
  46    ioc->name = g_strdup(name);
  47}
  48
  49
  50ssize_t qio_channel_readv_full(QIOChannel *ioc,
  51                               const struct iovec *iov,
  52                               size_t niov,
  53                               int **fds,
  54                               size_t *nfds,
  55                               int flags,
  56                               Error **errp)
  57{
  58    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
  59
  60    if ((fds || nfds) &&
  61        !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
  62        error_setg_errno(errp, EINVAL,
  63                         "Channel does not support file descriptor passing");
  64        return -1;
  65    }
  66
  67    if ((flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) &&
  68        !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_READ_MSG_PEEK)) {
  69        error_setg_errno(errp, EINVAL,
  70                         "Channel does not support peek read");
  71        return -1;
  72    }
  73
  74    return klass->io_readv(ioc, iov, niov, fds, nfds, flags, errp);
  75}
  76
  77
  78ssize_t qio_channel_writev_full(QIOChannel *ioc,
  79                                const struct iovec *iov,
  80                                size_t niov,
  81                                int *fds,
  82                                size_t nfds,
  83                                int flags,
  84                                Error **errp)
  85{
  86    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
  87
  88    if (fds || nfds) {
  89        if (!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
  90            error_setg_errno(errp, EINVAL,
  91                             "Channel does not support file descriptor passing");
  92            return -1;
  93        }
  94        if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
  95            error_setg_errno(errp, EINVAL,
  96                             "Zero Copy does not support file descriptor passing");
  97            return -1;
  98        }
  99    }
 100
 101    if ((flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) &&
 102        !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
 103        error_setg_errno(errp, EINVAL,
 104                         "Requested Zero Copy feature is not available");
 105        return -1;
 106    }
 107
 108    return klass->io_writev(ioc, iov, niov, fds, nfds, flags, errp);
 109}
 110
 111
 112int qio_channel_readv_all_eof(QIOChannel *ioc,
 113                              const struct iovec *iov,
 114                              size_t niov,
 115                              Error **errp)
 116{
 117    return qio_channel_readv_full_all_eof(ioc, iov, niov, NULL, NULL, errp);
 118}
 119
 120int qio_channel_readv_all(QIOChannel *ioc,
 121                          const struct iovec *iov,
 122                          size_t niov,
 123                          Error **errp)
 124{
 125    return qio_channel_readv_full_all(ioc, iov, niov, NULL, NULL, errp);
 126}
 127
 128int qio_channel_readv_full_all_eof(QIOChannel *ioc,
 129                                   const struct iovec *iov,
 130                                   size_t niov,
 131                                   int **fds, size_t *nfds,
 132                                   Error **errp)
 133{
 134    int ret = -1;
 135    struct iovec *local_iov = g_new(struct iovec, niov);
 136    struct iovec *local_iov_head = local_iov;
 137    unsigned int nlocal_iov = niov;
 138    int **local_fds = fds;
 139    size_t *local_nfds = nfds;
 140    bool partial = false;
 141
 142    if (nfds) {
 143        *nfds = 0;
 144    }
 145
 146    if (fds) {
 147        *fds = NULL;
 148    }
 149
 150    nlocal_iov = iov_copy(local_iov, nlocal_iov,
 151                          iov, niov,
 152                          0, iov_size(iov, niov));
 153
 154    while ((nlocal_iov > 0) || local_fds) {
 155        ssize_t len;
 156        len = qio_channel_readv_full(ioc, local_iov, nlocal_iov, local_fds,
 157                                     local_nfds, 0, errp);
 158        if (len == QIO_CHANNEL_ERR_BLOCK) {
 159            if (qemu_in_coroutine()) {
 160                qio_channel_yield(ioc, G_IO_IN);
 161            } else {
 162                qio_channel_wait(ioc, G_IO_IN);
 163            }
 164            continue;
 165        }
 166
 167        if (len == 0) {
 168            if (local_nfds && *local_nfds) {
 169                /*
 170                 * Got some FDs, but no data yet. This isn't an EOF
 171                 * scenario (yet), so carry on to try to read data
 172                 * on next loop iteration
 173                 */
 174                goto next_iter;
 175            } else if (!partial) {
 176                /* No fds and no data - EOF before any data read */
 177                ret = 0;
 178                goto cleanup;
 179            } else {
 180                len = -1;
 181                error_setg(errp,
 182                           "Unexpected end-of-file before all data were read");
 183                /* Fallthrough into len < 0 handling */
 184            }
 185        }
 186
 187        if (len < 0) {
 188            /* Close any FDs we previously received */
 189            if (nfds && fds) {
 190                size_t i;
 191                for (i = 0; i < (*nfds); i++) {
 192                    close((*fds)[i]);
 193                }
 194                g_free(*fds);
 195                *fds = NULL;
 196                *nfds = 0;
 197            }
 198            goto cleanup;
 199        }
 200
 201        if (nlocal_iov) {
 202            iov_discard_front(&local_iov, &nlocal_iov, len);
 203        }
 204
 205next_iter:
 206        partial = true;
 207        local_fds = NULL;
 208        local_nfds = NULL;
 209    }
 210
 211    ret = 1;
 212
 213 cleanup:
 214    g_free(local_iov_head);
 215    return ret;
 216}
 217
 218int qio_channel_readv_full_all(QIOChannel *ioc,
 219                               const struct iovec *iov,
 220                               size_t niov,
 221                               int **fds, size_t *nfds,
 222                               Error **errp)
 223{
 224    int ret = qio_channel_readv_full_all_eof(ioc, iov, niov, fds, nfds, errp);
 225
 226    if (ret == 0) {
 227        error_setg(errp, "Unexpected end-of-file before all data were read");
 228        return -1;
 229    }
 230    if (ret == 1) {
 231        return 0;
 232    }
 233
 234    return ret;
 235}
 236
 237int qio_channel_writev_all(QIOChannel *ioc,
 238                           const struct iovec *iov,
 239                           size_t niov,
 240                           Error **errp)
 241{
 242    return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, 0, errp);
 243}
 244
 245int qio_channel_writev_full_all(QIOChannel *ioc,
 246                                const struct iovec *iov,
 247                                size_t niov,
 248                                int *fds, size_t nfds,
 249                                int flags, Error **errp)
 250{
 251    int ret = -1;
 252    struct iovec *local_iov = g_new(struct iovec, niov);
 253    struct iovec *local_iov_head = local_iov;
 254    unsigned int nlocal_iov = niov;
 255
 256    nlocal_iov = iov_copy(local_iov, nlocal_iov,
 257                          iov, niov,
 258                          0, iov_size(iov, niov));
 259
 260    while (nlocal_iov > 0) {
 261        ssize_t len;
 262
 263        len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds,
 264                                            nfds, flags, errp);
 265
 266        if (len == QIO_CHANNEL_ERR_BLOCK) {
 267            if (qemu_in_coroutine()) {
 268                qio_channel_yield(ioc, G_IO_OUT);
 269            } else {
 270                qio_channel_wait(ioc, G_IO_OUT);
 271            }
 272            continue;
 273        }
 274        if (len < 0) {
 275            goto cleanup;
 276        }
 277
 278        iov_discard_front(&local_iov, &nlocal_iov, len);
 279
 280        fds = NULL;
 281        nfds = 0;
 282    }
 283
 284    ret = 0;
 285 cleanup:
 286    g_free(local_iov_head);
 287    return ret;
 288}
 289
 290ssize_t qio_channel_readv(QIOChannel *ioc,
 291                          const struct iovec *iov,
 292                          size_t niov,
 293                          Error **errp)
 294{
 295    return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, 0, errp);
 296}
 297
 298
 299ssize_t qio_channel_writev(QIOChannel *ioc,
 300                           const struct iovec *iov,
 301                           size_t niov,
 302                           Error **errp)
 303{
 304    return qio_channel_writev_full(ioc, iov, niov, NULL, 0, 0, errp);
 305}
 306
 307
 308ssize_t qio_channel_read(QIOChannel *ioc,
 309                         char *buf,
 310                         size_t buflen,
 311                         Error **errp)
 312{
 313    struct iovec iov = { .iov_base = buf, .iov_len = buflen };
 314    return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, 0, errp);
 315}
 316
 317
 318ssize_t qio_channel_write(QIOChannel *ioc,
 319                          const char *buf,
 320                          size_t buflen,
 321                          Error **errp)
 322{
 323    struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
 324    return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, 0, errp);
 325}
 326
 327
 328int qio_channel_read_all_eof(QIOChannel *ioc,
 329                             char *buf,
 330                             size_t buflen,
 331                             Error **errp)
 332{
 333    struct iovec iov = { .iov_base = buf, .iov_len = buflen };
 334    return qio_channel_readv_all_eof(ioc, &iov, 1, errp);
 335}
 336
 337
 338int qio_channel_read_all(QIOChannel *ioc,
 339                         char *buf,
 340                         size_t buflen,
 341                         Error **errp)
 342{
 343    struct iovec iov = { .iov_base = buf, .iov_len = buflen };
 344    return qio_channel_readv_all(ioc, &iov, 1, errp);
 345}
 346
 347
 348int qio_channel_write_all(QIOChannel *ioc,
 349                          const char *buf,
 350                          size_t buflen,
 351                          Error **errp)
 352{
 353    struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
 354    return qio_channel_writev_all(ioc, &iov, 1, errp);
 355}
 356
 357
 358int qio_channel_set_blocking(QIOChannel *ioc,
 359                              bool enabled,
 360                              Error **errp)
 361{
 362    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 363    return klass->io_set_blocking(ioc, enabled, errp);
 364}
 365
 366
 367int qio_channel_close(QIOChannel *ioc,
 368                      Error **errp)
 369{
 370    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 371    return klass->io_close(ioc, errp);
 372}
 373
 374
 375GSource *qio_channel_create_watch(QIOChannel *ioc,
 376                                  GIOCondition condition)
 377{
 378    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 379    GSource *ret = klass->io_create_watch(ioc, condition);
 380
 381    if (ioc->name) {
 382        g_source_set_name(ret, ioc->name);
 383    }
 384
 385    return ret;
 386}
 387
 388
 389void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
 390                                    AioContext *ctx,
 391                                    IOHandler *io_read,
 392                                    IOHandler *io_write,
 393                                    void *opaque)
 394{
 395    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 396
 397    klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
 398}
 399
 400guint qio_channel_add_watch_full(QIOChannel *ioc,
 401                                 GIOCondition condition,
 402                                 QIOChannelFunc func,
 403                                 gpointer user_data,
 404                                 GDestroyNotify notify,
 405                                 GMainContext *context)
 406{
 407    GSource *source;
 408    guint id;
 409
 410    source = qio_channel_create_watch(ioc, condition);
 411
 412    g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
 413
 414    id = g_source_attach(source, context);
 415    g_source_unref(source);
 416
 417    return id;
 418}
 419
 420guint qio_channel_add_watch(QIOChannel *ioc,
 421                            GIOCondition condition,
 422                            QIOChannelFunc func,
 423                            gpointer user_data,
 424                            GDestroyNotify notify)
 425{
 426    return qio_channel_add_watch_full(ioc, condition, func,
 427                                      user_data, notify, NULL);
 428}
 429
 430GSource *qio_channel_add_watch_source(QIOChannel *ioc,
 431                                      GIOCondition condition,
 432                                      QIOChannelFunc func,
 433                                      gpointer user_data,
 434                                      GDestroyNotify notify,
 435                                      GMainContext *context)
 436{
 437    GSource *source;
 438    guint id;
 439
 440    id = qio_channel_add_watch_full(ioc, condition, func,
 441                                    user_data, notify, context);
 442    source = g_main_context_find_source_by_id(context, id);
 443    g_source_ref(source);
 444    return source;
 445}
 446
 447
 448int qio_channel_shutdown(QIOChannel *ioc,
 449                         QIOChannelShutdown how,
 450                         Error **errp)
 451{
 452    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 453
 454    if (!klass->io_shutdown) {
 455        error_setg(errp, "Data path shutdown not supported");
 456        return -1;
 457    }
 458
 459    return klass->io_shutdown(ioc, how, errp);
 460}
 461
 462
 463void qio_channel_set_delay(QIOChannel *ioc,
 464                           bool enabled)
 465{
 466    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 467
 468    if (klass->io_set_delay) {
 469        klass->io_set_delay(ioc, enabled);
 470    }
 471}
 472
 473
 474void qio_channel_set_cork(QIOChannel *ioc,
 475                          bool enabled)
 476{
 477    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 478
 479    if (klass->io_set_cork) {
 480        klass->io_set_cork(ioc, enabled);
 481    }
 482}
 483
 484
 485off_t qio_channel_io_seek(QIOChannel *ioc,
 486                          off_t offset,
 487                          int whence,
 488                          Error **errp)
 489{
 490    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 491
 492    if (!klass->io_seek) {
 493        error_setg(errp, "Channel does not support random access");
 494        return -1;
 495    }
 496
 497    return klass->io_seek(ioc, offset, whence, errp);
 498}
 499
 500int qio_channel_flush(QIOChannel *ioc,
 501                                Error **errp)
 502{
 503    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 504
 505    if (!klass->io_flush ||
 506        !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
 507        return 0;
 508    }
 509
 510    return klass->io_flush(ioc, errp);
 511}
 512
 513
 514static void qio_channel_restart_read(void *opaque)
 515{
 516    QIOChannel *ioc = opaque;
 517    Coroutine *co = ioc->read_coroutine;
 518
 519    /* Assert that aio_co_wake() reenters the coroutine directly */
 520    assert(qemu_get_current_aio_context() ==
 521           qemu_coroutine_get_aio_context(co));
 522    aio_co_wake(co);
 523}
 524
 525static void qio_channel_restart_write(void *opaque)
 526{
 527    QIOChannel *ioc = opaque;
 528    Coroutine *co = ioc->write_coroutine;
 529
 530    /* Assert that aio_co_wake() reenters the coroutine directly */
 531    assert(qemu_get_current_aio_context() ==
 532           qemu_coroutine_get_aio_context(co));
 533    aio_co_wake(co);
 534}
 535
 536static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
 537{
 538    IOHandler *rd_handler = NULL, *wr_handler = NULL;
 539    AioContext *ctx;
 540
 541    if (ioc->read_coroutine) {
 542        rd_handler = qio_channel_restart_read;
 543    }
 544    if (ioc->write_coroutine) {
 545        wr_handler = qio_channel_restart_write;
 546    }
 547
 548    ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
 549    qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
 550}
 551
 552void qio_channel_attach_aio_context(QIOChannel *ioc,
 553                                    AioContext *ctx)
 554{
 555    assert(!ioc->read_coroutine);
 556    assert(!ioc->write_coroutine);
 557    ioc->ctx = ctx;
 558}
 559
 560void qio_channel_detach_aio_context(QIOChannel *ioc)
 561{
 562    ioc->read_coroutine = NULL;
 563    ioc->write_coroutine = NULL;
 564    qio_channel_set_aio_fd_handlers(ioc);
 565    ioc->ctx = NULL;
 566}
 567
 568void coroutine_fn qio_channel_yield(QIOChannel *ioc,
 569                                    GIOCondition condition)
 570{
 571    assert(qemu_in_coroutine());
 572    if (condition == G_IO_IN) {
 573        assert(!ioc->read_coroutine);
 574        ioc->read_coroutine = qemu_coroutine_self();
 575    } else if (condition == G_IO_OUT) {
 576        assert(!ioc->write_coroutine);
 577        ioc->write_coroutine = qemu_coroutine_self();
 578    } else {
 579        abort();
 580    }
 581    qio_channel_set_aio_fd_handlers(ioc);
 582    qemu_coroutine_yield();
 583
 584    /* Allow interrupting the operation by reentering the coroutine other than
 585     * through the aio_fd_handlers. */
 586    if (condition == G_IO_IN && ioc->read_coroutine) {
 587        ioc->read_coroutine = NULL;
 588        qio_channel_set_aio_fd_handlers(ioc);
 589    } else if (condition == G_IO_OUT && ioc->write_coroutine) {
 590        ioc->write_coroutine = NULL;
 591        qio_channel_set_aio_fd_handlers(ioc);
 592    }
 593}
 594
 595
 596static gboolean qio_channel_wait_complete(QIOChannel *ioc,
 597                                          GIOCondition condition,
 598                                          gpointer opaque)
 599{
 600    GMainLoop *loop = opaque;
 601
 602    g_main_loop_quit(loop);
 603    return FALSE;
 604}
 605
 606
 607void qio_channel_wait(QIOChannel *ioc,
 608                      GIOCondition condition)
 609{
 610    GMainContext *ctxt = g_main_context_new();
 611    GMainLoop *loop = g_main_loop_new(ctxt, TRUE);
 612    GSource *source;
 613
 614    source = qio_channel_create_watch(ioc, condition);
 615
 616    g_source_set_callback(source,
 617                          (GSourceFunc)qio_channel_wait_complete,
 618                          loop,
 619                          NULL);
 620
 621    g_source_attach(source, ctxt);
 622
 623    g_main_loop_run(loop);
 624
 625    g_source_unref(source);
 626    g_main_loop_unref(loop);
 627    g_main_context_unref(ctxt);
 628}
 629
 630
 631static void qio_channel_finalize(Object *obj)
 632{
 633    QIOChannel *ioc = QIO_CHANNEL(obj);
 634
 635    g_free(ioc->name);
 636
 637#ifdef _WIN32
 638    if (ioc->event) {
 639        CloseHandle(ioc->event);
 640    }
 641#endif
 642}
 643
 644static const TypeInfo qio_channel_info = {
 645    .parent = TYPE_OBJECT,
 646    .name = TYPE_QIO_CHANNEL,
 647    .instance_size = sizeof(QIOChannel),
 648    .instance_finalize = qio_channel_finalize,
 649    .abstract = true,
 650    .class_size = sizeof(QIOChannelClass),
 651};
 652
 653
 654static void qio_channel_register_types(void)
 655{
 656    type_register_static(&qio_channel_info);
 657}
 658
 659
 660type_init(qio_channel_register_types);
 661