qemu/io/channel.c
<<
>>
Prefs
   1/*
   2 * QEMU I/O channels
   3 *
   4 * Copyright (c) 2015 Red Hat, Inc.
   5 *
   6 * This library is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU Lesser General Public
   8 * License as published by the Free Software Foundation; either
   9 * version 2.1 of the License, or (at your option) any later version.
  10 *
  11 * This library is distributed in the hope that it will be useful,
  12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * Lesser General Public License for more details.
  15 *
  16 * You should have received a copy of the GNU Lesser General Public
  17 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
  18 *
  19 */
  20
  21#include "qemu/osdep.h"
  22#include "block/aio-wait.h"
  23#include "io/channel.h"
  24#include "qapi/error.h"
  25#include "qemu/main-loop.h"
  26#include "qemu/module.h"
  27#include "qemu/iov.h"
  28
  29bool qio_channel_has_feature(QIOChannel *ioc,
  30                             QIOChannelFeature feature)
  31{
  32    return ioc->features & (1 << feature);
  33}
  34
  35
  36void qio_channel_set_feature(QIOChannel *ioc,
  37                             QIOChannelFeature feature)
  38{
  39    ioc->features |= (1 << feature);
  40}
  41
  42
  43void qio_channel_set_name(QIOChannel *ioc,
  44                          const char *name)
  45{
  46    g_free(ioc->name);
  47    ioc->name = g_strdup(name);
  48}
  49
  50
  51ssize_t qio_channel_readv_full(QIOChannel *ioc,
  52                               const struct iovec *iov,
  53                               size_t niov,
  54                               int **fds,
  55                               size_t *nfds,
  56                               int flags,
  57                               Error **errp)
  58{
  59    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
  60
  61    if ((fds || nfds) &&
  62        !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
  63        error_setg_errno(errp, EINVAL,
  64                         "Channel does not support file descriptor passing");
  65        return -1;
  66    }
  67
  68    if ((flags & QIO_CHANNEL_READ_FLAG_MSG_PEEK) &&
  69        !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_READ_MSG_PEEK)) {
  70        error_setg_errno(errp, EINVAL,
  71                         "Channel does not support peek read");
  72        return -1;
  73    }
  74
  75    return klass->io_readv(ioc, iov, niov, fds, nfds, flags, errp);
  76}
  77
  78
  79ssize_t qio_channel_writev_full(QIOChannel *ioc,
  80                                const struct iovec *iov,
  81                                size_t niov,
  82                                int *fds,
  83                                size_t nfds,
  84                                int flags,
  85                                Error **errp)
  86{
  87    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
  88
  89    if (fds || nfds) {
  90        if (!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
  91            error_setg_errno(errp, EINVAL,
  92                             "Channel does not support file descriptor passing");
  93            return -1;
  94        }
  95        if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
  96            error_setg_errno(errp, EINVAL,
  97                             "Zero Copy does not support file descriptor passing");
  98            return -1;
  99        }
 100    }
 101
 102    if ((flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) &&
 103        !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
 104        error_setg_errno(errp, EINVAL,
 105                         "Requested Zero Copy feature is not available");
 106        return -1;
 107    }
 108
 109    return klass->io_writev(ioc, iov, niov, fds, nfds, flags, errp);
 110}
 111
 112
 113int qio_channel_readv_all_eof(QIOChannel *ioc,
 114                              const struct iovec *iov,
 115                              size_t niov,
 116                              Error **errp)
 117{
 118    return qio_channel_readv_full_all_eof(ioc, iov, niov, NULL, NULL, errp);
 119}
 120
 121int qio_channel_readv_all(QIOChannel *ioc,
 122                          const struct iovec *iov,
 123                          size_t niov,
 124                          Error **errp)
 125{
 126    return qio_channel_readv_full_all(ioc, iov, niov, NULL, NULL, errp);
 127}
 128
 129int qio_channel_readv_full_all_eof(QIOChannel *ioc,
 130                                   const struct iovec *iov,
 131                                   size_t niov,
 132                                   int **fds, size_t *nfds,
 133                                   Error **errp)
 134{
 135    int ret = -1;
 136    struct iovec *local_iov = g_new(struct iovec, niov);
 137    struct iovec *local_iov_head = local_iov;
 138    unsigned int nlocal_iov = niov;
 139    int **local_fds = fds;
 140    size_t *local_nfds = nfds;
 141    bool partial = false;
 142
 143    if (nfds) {
 144        *nfds = 0;
 145    }
 146
 147    if (fds) {
 148        *fds = NULL;
 149    }
 150
 151    nlocal_iov = iov_copy(local_iov, nlocal_iov,
 152                          iov, niov,
 153                          0, iov_size(iov, niov));
 154
 155    while ((nlocal_iov > 0) || local_fds) {
 156        ssize_t len;
 157        len = qio_channel_readv_full(ioc, local_iov, nlocal_iov, local_fds,
 158                                     local_nfds, 0, errp);
 159        if (len == QIO_CHANNEL_ERR_BLOCK) {
 160            if (qemu_in_coroutine()) {
 161                qio_channel_yield(ioc, G_IO_IN);
 162            } else {
 163                qio_channel_wait(ioc, G_IO_IN);
 164            }
 165            continue;
 166        }
 167
 168        if (len == 0) {
 169            if (local_nfds && *local_nfds) {
 170                /*
 171                 * Got some FDs, but no data yet. This isn't an EOF
 172                 * scenario (yet), so carry on to try to read data
 173                 * on next loop iteration
 174                 */
 175                goto next_iter;
 176            } else if (!partial) {
 177                /* No fds and no data - EOF before any data read */
 178                ret = 0;
 179                goto cleanup;
 180            } else {
 181                len = -1;
 182                error_setg(errp,
 183                           "Unexpected end-of-file before all data were read");
 184                /* Fallthrough into len < 0 handling */
 185            }
 186        }
 187
 188        if (len < 0) {
 189            /* Close any FDs we previously received */
 190            if (nfds && fds) {
 191                size_t i;
 192                for (i = 0; i < (*nfds); i++) {
 193                    close((*fds)[i]);
 194                }
 195                g_free(*fds);
 196                *fds = NULL;
 197                *nfds = 0;
 198            }
 199            goto cleanup;
 200        }
 201
 202        if (nlocal_iov) {
 203            iov_discard_front(&local_iov, &nlocal_iov, len);
 204        }
 205
 206next_iter:
 207        partial = true;
 208        local_fds = NULL;
 209        local_nfds = NULL;
 210    }
 211
 212    ret = 1;
 213
 214 cleanup:
 215    g_free(local_iov_head);
 216    return ret;
 217}
 218
 219int qio_channel_readv_full_all(QIOChannel *ioc,
 220                               const struct iovec *iov,
 221                               size_t niov,
 222                               int **fds, size_t *nfds,
 223                               Error **errp)
 224{
 225    int ret = qio_channel_readv_full_all_eof(ioc, iov, niov, fds, nfds, errp);
 226
 227    if (ret == 0) {
 228        error_setg(errp, "Unexpected end-of-file before all data were read");
 229        return -1;
 230    }
 231    if (ret == 1) {
 232        return 0;
 233    }
 234
 235    return ret;
 236}
 237
 238int qio_channel_writev_all(QIOChannel *ioc,
 239                           const struct iovec *iov,
 240                           size_t niov,
 241                           Error **errp)
 242{
 243    return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, 0, errp);
 244}
 245
 246int qio_channel_writev_full_all(QIOChannel *ioc,
 247                                const struct iovec *iov,
 248                                size_t niov,
 249                                int *fds, size_t nfds,
 250                                int flags, Error **errp)
 251{
 252    int ret = -1;
 253    struct iovec *local_iov = g_new(struct iovec, niov);
 254    struct iovec *local_iov_head = local_iov;
 255    unsigned int nlocal_iov = niov;
 256
 257    nlocal_iov = iov_copy(local_iov, nlocal_iov,
 258                          iov, niov,
 259                          0, iov_size(iov, niov));
 260
 261    while (nlocal_iov > 0) {
 262        ssize_t len;
 263
 264        len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds,
 265                                            nfds, flags, errp);
 266
 267        if (len == QIO_CHANNEL_ERR_BLOCK) {
 268            if (qemu_in_coroutine()) {
 269                qio_channel_yield(ioc, G_IO_OUT);
 270            } else {
 271                qio_channel_wait(ioc, G_IO_OUT);
 272            }
 273            continue;
 274        }
 275        if (len < 0) {
 276            goto cleanup;
 277        }
 278
 279        iov_discard_front(&local_iov, &nlocal_iov, len);
 280
 281        fds = NULL;
 282        nfds = 0;
 283    }
 284
 285    ret = 0;
 286 cleanup:
 287    g_free(local_iov_head);
 288    return ret;
 289}
 290
 291ssize_t qio_channel_readv(QIOChannel *ioc,
 292                          const struct iovec *iov,
 293                          size_t niov,
 294                          Error **errp)
 295{
 296    return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, 0, errp);
 297}
 298
 299
 300ssize_t qio_channel_writev(QIOChannel *ioc,
 301                           const struct iovec *iov,
 302                           size_t niov,
 303                           Error **errp)
 304{
 305    return qio_channel_writev_full(ioc, iov, niov, NULL, 0, 0, errp);
 306}
 307
 308
 309ssize_t qio_channel_read(QIOChannel *ioc,
 310                         char *buf,
 311                         size_t buflen,
 312                         Error **errp)
 313{
 314    struct iovec iov = { .iov_base = buf, .iov_len = buflen };
 315    return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, 0, errp);
 316}
 317
 318
 319ssize_t qio_channel_write(QIOChannel *ioc,
 320                          const char *buf,
 321                          size_t buflen,
 322                          Error **errp)
 323{
 324    struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
 325    return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, 0, errp);
 326}
 327
 328
 329int qio_channel_read_all_eof(QIOChannel *ioc,
 330                             char *buf,
 331                             size_t buflen,
 332                             Error **errp)
 333{
 334    struct iovec iov = { .iov_base = buf, .iov_len = buflen };
 335    return qio_channel_readv_all_eof(ioc, &iov, 1, errp);
 336}
 337
 338
 339int qio_channel_read_all(QIOChannel *ioc,
 340                         char *buf,
 341                         size_t buflen,
 342                         Error **errp)
 343{
 344    struct iovec iov = { .iov_base = buf, .iov_len = buflen };
 345    return qio_channel_readv_all(ioc, &iov, 1, errp);
 346}
 347
 348
 349int qio_channel_write_all(QIOChannel *ioc,
 350                          const char *buf,
 351                          size_t buflen,
 352                          Error **errp)
 353{
 354    struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
 355    return qio_channel_writev_all(ioc, &iov, 1, errp);
 356}
 357
 358
 359int qio_channel_set_blocking(QIOChannel *ioc,
 360                              bool enabled,
 361                              Error **errp)
 362{
 363    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 364    return klass->io_set_blocking(ioc, enabled, errp);
 365}
 366
 367
 368int qio_channel_close(QIOChannel *ioc,
 369                      Error **errp)
 370{
 371    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 372    return klass->io_close(ioc, errp);
 373}
 374
 375
 376GSource *qio_channel_create_watch(QIOChannel *ioc,
 377                                  GIOCondition condition)
 378{
 379    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 380    GSource *ret = klass->io_create_watch(ioc, condition);
 381
 382    if (ioc->name) {
 383        g_source_set_name(ret, ioc->name);
 384    }
 385
 386    return ret;
 387}
 388
 389
 390void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
 391                                    AioContext *ctx,
 392                                    IOHandler *io_read,
 393                                    IOHandler *io_write,
 394                                    void *opaque)
 395{
 396    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 397
 398    klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
 399}
 400
 401guint qio_channel_add_watch_full(QIOChannel *ioc,
 402                                 GIOCondition condition,
 403                                 QIOChannelFunc func,
 404                                 gpointer user_data,
 405                                 GDestroyNotify notify,
 406                                 GMainContext *context)
 407{
 408    GSource *source;
 409    guint id;
 410
 411    source = qio_channel_create_watch(ioc, condition);
 412
 413    g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
 414
 415    id = g_source_attach(source, context);
 416    g_source_unref(source);
 417
 418    return id;
 419}
 420
 421guint qio_channel_add_watch(QIOChannel *ioc,
 422                            GIOCondition condition,
 423                            QIOChannelFunc func,
 424                            gpointer user_data,
 425                            GDestroyNotify notify)
 426{
 427    return qio_channel_add_watch_full(ioc, condition, func,
 428                                      user_data, notify, NULL);
 429}
 430
 431GSource *qio_channel_add_watch_source(QIOChannel *ioc,
 432                                      GIOCondition condition,
 433                                      QIOChannelFunc func,
 434                                      gpointer user_data,
 435                                      GDestroyNotify notify,
 436                                      GMainContext *context)
 437{
 438    GSource *source;
 439    guint id;
 440
 441    id = qio_channel_add_watch_full(ioc, condition, func,
 442                                    user_data, notify, context);
 443    source = g_main_context_find_source_by_id(context, id);
 444    g_source_ref(source);
 445    return source;
 446}
 447
 448
 449int qio_channel_shutdown(QIOChannel *ioc,
 450                         QIOChannelShutdown how,
 451                         Error **errp)
 452{
 453    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 454
 455    if (!klass->io_shutdown) {
 456        error_setg(errp, "Data path shutdown not supported");
 457        return -1;
 458    }
 459
 460    return klass->io_shutdown(ioc, how, errp);
 461}
 462
 463
 464void qio_channel_set_delay(QIOChannel *ioc,
 465                           bool enabled)
 466{
 467    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 468
 469    if (klass->io_set_delay) {
 470        klass->io_set_delay(ioc, enabled);
 471    }
 472}
 473
 474
 475void qio_channel_set_cork(QIOChannel *ioc,
 476                          bool enabled)
 477{
 478    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 479
 480    if (klass->io_set_cork) {
 481        klass->io_set_cork(ioc, enabled);
 482    }
 483}
 484
 485
 486off_t qio_channel_io_seek(QIOChannel *ioc,
 487                          off_t offset,
 488                          int whence,
 489                          Error **errp)
 490{
 491    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 492
 493    if (!klass->io_seek) {
 494        error_setg(errp, "Channel does not support random access");
 495        return -1;
 496    }
 497
 498    return klass->io_seek(ioc, offset, whence, errp);
 499}
 500
 501int qio_channel_flush(QIOChannel *ioc,
 502                                Error **errp)
 503{
 504    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 505
 506    if (!klass->io_flush ||
 507        !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
 508        return 0;
 509    }
 510
 511    return klass->io_flush(ioc, errp);
 512}
 513
 514
 515static void qio_channel_restart_read(void *opaque)
 516{
 517    QIOChannel *ioc = opaque;
 518    Coroutine *co = qatomic_xchg(&ioc->read_coroutine, NULL);
 519
 520    if (!co) {
 521        return;
 522    }
 523
 524    /* Assert that aio_co_wake() reenters the coroutine directly */
 525    assert(qemu_get_current_aio_context() ==
 526           qemu_coroutine_get_aio_context(co));
 527    aio_co_wake(co);
 528}
 529
 530static void qio_channel_restart_write(void *opaque)
 531{
 532    QIOChannel *ioc = opaque;
 533    Coroutine *co = qatomic_xchg(&ioc->write_coroutine, NULL);
 534
 535    if (!co) {
 536        return;
 537    }
 538
 539    /* Assert that aio_co_wake() reenters the coroutine directly */
 540    assert(qemu_get_current_aio_context() ==
 541           qemu_coroutine_get_aio_context(co));
 542    aio_co_wake(co);
 543}
 544
 545static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
 546{
 547    IOHandler *rd_handler = NULL, *wr_handler = NULL;
 548    AioContext *ctx;
 549
 550    if (ioc->read_coroutine) {
 551        rd_handler = qio_channel_restart_read;
 552    }
 553    if (ioc->write_coroutine) {
 554        wr_handler = qio_channel_restart_write;
 555    }
 556
 557    ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
 558    qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
 559}
 560
 561void qio_channel_attach_aio_context(QIOChannel *ioc,
 562                                    AioContext *ctx)
 563{
 564    assert(!ioc->read_coroutine);
 565    assert(!ioc->write_coroutine);
 566    ioc->ctx = ctx;
 567}
 568
 569void qio_channel_detach_aio_context(QIOChannel *ioc)
 570{
 571    ioc->read_coroutine = NULL;
 572    ioc->write_coroutine = NULL;
 573    qio_channel_set_aio_fd_handlers(ioc);
 574    ioc->ctx = NULL;
 575}
 576
 577void coroutine_fn qio_channel_yield(QIOChannel *ioc,
 578                                    GIOCondition condition)
 579{
 580    AioContext *ioc_ctx = ioc->ctx ?: qemu_get_aio_context();
 581
 582    assert(qemu_in_coroutine());
 583    assert(in_aio_context_home_thread(ioc_ctx));
 584
 585    if (condition == G_IO_IN) {
 586        assert(!ioc->read_coroutine);
 587        ioc->read_coroutine = qemu_coroutine_self();
 588    } else if (condition == G_IO_OUT) {
 589        assert(!ioc->write_coroutine);
 590        ioc->write_coroutine = qemu_coroutine_self();
 591    } else {
 592        abort();
 593    }
 594    qio_channel_set_aio_fd_handlers(ioc);
 595    qemu_coroutine_yield();
 596    assert(in_aio_context_home_thread(ioc_ctx));
 597
 598    /* Allow interrupting the operation by reentering the coroutine other than
 599     * through the aio_fd_handlers. */
 600    if (condition == G_IO_IN) {
 601        assert(ioc->read_coroutine == NULL);
 602        qio_channel_set_aio_fd_handlers(ioc);
 603    } else if (condition == G_IO_OUT) {
 604        assert(ioc->write_coroutine == NULL);
 605        qio_channel_set_aio_fd_handlers(ioc);
 606    }
 607}
 608
 609void qio_channel_wake_read(QIOChannel *ioc)
 610{
 611    Coroutine *co = qatomic_xchg(&ioc->read_coroutine, NULL);
 612    if (co) {
 613        aio_co_wake(co);
 614    }
 615}
 616
 617static gboolean qio_channel_wait_complete(QIOChannel *ioc,
 618                                          GIOCondition condition,
 619                                          gpointer opaque)
 620{
 621    GMainLoop *loop = opaque;
 622
 623    g_main_loop_quit(loop);
 624    return FALSE;
 625}
 626
 627
 628void qio_channel_wait(QIOChannel *ioc,
 629                      GIOCondition condition)
 630{
 631    GMainContext *ctxt = g_main_context_new();
 632    GMainLoop *loop = g_main_loop_new(ctxt, TRUE);
 633    GSource *source;
 634
 635    source = qio_channel_create_watch(ioc, condition);
 636
 637    g_source_set_callback(source,
 638                          (GSourceFunc)qio_channel_wait_complete,
 639                          loop,
 640                          NULL);
 641
 642    g_source_attach(source, ctxt);
 643
 644    g_main_loop_run(loop);
 645
 646    g_source_unref(source);
 647    g_main_loop_unref(loop);
 648    g_main_context_unref(ctxt);
 649}
 650
 651
 652static void qio_channel_finalize(Object *obj)
 653{
 654    QIOChannel *ioc = QIO_CHANNEL(obj);
 655
 656    g_free(ioc->name);
 657
 658#ifdef _WIN32
 659    if (ioc->event) {
 660        CloseHandle(ioc->event);
 661    }
 662#endif
 663}
 664
 665static const TypeInfo qio_channel_info = {
 666    .parent = TYPE_OBJECT,
 667    .name = TYPE_QIO_CHANNEL,
 668    .instance_size = sizeof(QIOChannel),
 669    .instance_finalize = qio_channel_finalize,
 670    .abstract = true,
 671    .class_size = sizeof(QIOChannelClass),
 672};
 673
 674
 675static void qio_channel_register_types(void)
 676{
 677    type_register_static(&qio_channel_info);
 678}
 679
 680
 681type_init(qio_channel_register_types);
 682