qemu/io/channel.c
<<
>>
Prefs
   1/*
   2 * QEMU I/O channels
   3 *
   4 * Copyright (c) 2015 Red Hat, Inc.
   5 *
   6 * This library is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU Lesser General Public
   8 * License as published by the Free Software Foundation; either
   9 * version 2.1 of the License, or (at your option) any later version.
  10 *
  11 * This library is distributed in the hope that it will be useful,
  12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * Lesser General Public License for more details.
  15 *
  16 * You should have received a copy of the GNU Lesser General Public
  17 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
  18 *
  19 */
  20
  21#include "qemu/osdep.h"
  22#include "io/channel.h"
  23#include "qapi/error.h"
  24#include "qemu/main-loop.h"
  25#include "qemu/module.h"
  26#include "qemu/iov.h"
  27
  28bool qio_channel_has_feature(QIOChannel *ioc,
  29                             QIOChannelFeature feature)
  30{
  31    return ioc->features & (1 << feature);
  32}
  33
  34
  35void qio_channel_set_feature(QIOChannel *ioc,
  36                             QIOChannelFeature feature)
  37{
  38    ioc->features |= (1 << feature);
  39}
  40
  41
  42void qio_channel_set_name(QIOChannel *ioc,
  43                          const char *name)
  44{
  45    g_free(ioc->name);
  46    ioc->name = g_strdup(name);
  47}
  48
  49
  50ssize_t qio_channel_readv_full(QIOChannel *ioc,
  51                               const struct iovec *iov,
  52                               size_t niov,
  53                               int **fds,
  54                               size_t *nfds,
  55                               Error **errp)
  56{
  57    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
  58
  59    if ((fds || nfds) &&
  60        !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
  61        error_setg_errno(errp, EINVAL,
  62                         "Channel does not support file descriptor passing");
  63        return -1;
  64    }
  65
  66    return klass->io_readv(ioc, iov, niov, fds, nfds, errp);
  67}
  68
  69
  70ssize_t qio_channel_writev_full(QIOChannel *ioc,
  71                                const struct iovec *iov,
  72                                size_t niov,
  73                                int *fds,
  74                                size_t nfds,
  75                                int flags,
  76                                Error **errp)
  77{
  78    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
  79
  80    if (fds || nfds) {
  81        if (!qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS)) {
  82            error_setg_errno(errp, EINVAL,
  83                             "Channel does not support file descriptor passing");
  84            return -1;
  85        }
  86        if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
  87            error_setg_errno(errp, EINVAL,
  88                             "Zero Copy does not support file descriptor passing");
  89            return -1;
  90        }
  91    }
  92
  93    if ((flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) &&
  94        !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
  95        error_setg_errno(errp, EINVAL,
  96                         "Requested Zero Copy feature is not available");
  97        return -1;
  98    }
  99
 100    return klass->io_writev(ioc, iov, niov, fds, nfds, flags, errp);
 101}
 102
 103
 104int qio_channel_readv_all_eof(QIOChannel *ioc,
 105                              const struct iovec *iov,
 106                              size_t niov,
 107                              Error **errp)
 108{
 109    return qio_channel_readv_full_all_eof(ioc, iov, niov, NULL, NULL, errp);
 110}
 111
 112int qio_channel_readv_all(QIOChannel *ioc,
 113                          const struct iovec *iov,
 114                          size_t niov,
 115                          Error **errp)
 116{
 117    return qio_channel_readv_full_all(ioc, iov, niov, NULL, NULL, errp);
 118}
 119
 120int qio_channel_readv_full_all_eof(QIOChannel *ioc,
 121                                   const struct iovec *iov,
 122                                   size_t niov,
 123                                   int **fds, size_t *nfds,
 124                                   Error **errp)
 125{
 126    int ret = -1;
 127    struct iovec *local_iov = g_new(struct iovec, niov);
 128    struct iovec *local_iov_head = local_iov;
 129    unsigned int nlocal_iov = niov;
 130    int **local_fds = fds;
 131    size_t *local_nfds = nfds;
 132    bool partial = false;
 133
 134    if (nfds) {
 135        *nfds = 0;
 136    }
 137
 138    if (fds) {
 139        *fds = NULL;
 140    }
 141
 142    nlocal_iov = iov_copy(local_iov, nlocal_iov,
 143                          iov, niov,
 144                          0, iov_size(iov, niov));
 145
 146    while ((nlocal_iov > 0) || local_fds) {
 147        ssize_t len;
 148        len = qio_channel_readv_full(ioc, local_iov, nlocal_iov, local_fds,
 149                                     local_nfds, errp);
 150        if (len == QIO_CHANNEL_ERR_BLOCK) {
 151            if (qemu_in_coroutine()) {
 152                qio_channel_yield(ioc, G_IO_IN);
 153            } else {
 154                qio_channel_wait(ioc, G_IO_IN);
 155            }
 156            continue;
 157        }
 158
 159        if (len == 0) {
 160            if (local_nfds && *local_nfds) {
 161                /*
 162                 * Got some FDs, but no data yet. This isn't an EOF
 163                 * scenario (yet), so carry on to try to read data
 164                 * on next loop iteration
 165                 */
 166                goto next_iter;
 167            } else if (!partial) {
 168                /* No fds and no data - EOF before any data read */
 169                ret = 0;
 170                goto cleanup;
 171            } else {
 172                len = -1;
 173                error_setg(errp,
 174                           "Unexpected end-of-file before all data were read");
 175                /* Fallthrough into len < 0 handling */
 176            }
 177        }
 178
 179        if (len < 0) {
 180            /* Close any FDs we previously received */
 181            if (nfds && fds) {
 182                size_t i;
 183                for (i = 0; i < (*nfds); i++) {
 184                    close((*fds)[i]);
 185                }
 186                g_free(*fds);
 187                *fds = NULL;
 188                *nfds = 0;
 189            }
 190            goto cleanup;
 191        }
 192
 193        if (nlocal_iov) {
 194            iov_discard_front(&local_iov, &nlocal_iov, len);
 195        }
 196
 197next_iter:
 198        partial = true;
 199        local_fds = NULL;
 200        local_nfds = NULL;
 201    }
 202
 203    ret = 1;
 204
 205 cleanup:
 206    g_free(local_iov_head);
 207    return ret;
 208}
 209
 210int qio_channel_readv_full_all(QIOChannel *ioc,
 211                               const struct iovec *iov,
 212                               size_t niov,
 213                               int **fds, size_t *nfds,
 214                               Error **errp)
 215{
 216    int ret = qio_channel_readv_full_all_eof(ioc, iov, niov, fds, nfds, errp);
 217
 218    if (ret == 0) {
 219        error_setg(errp, "Unexpected end-of-file before all data were read");
 220        return -1;
 221    }
 222    if (ret == 1) {
 223        return 0;
 224    }
 225
 226    return ret;
 227}
 228
 229int qio_channel_writev_all(QIOChannel *ioc,
 230                           const struct iovec *iov,
 231                           size_t niov,
 232                           Error **errp)
 233{
 234    return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, 0, errp);
 235}
 236
 237int qio_channel_writev_full_all(QIOChannel *ioc,
 238                                const struct iovec *iov,
 239                                size_t niov,
 240                                int *fds, size_t nfds,
 241                                int flags, Error **errp)
 242{
 243    int ret = -1;
 244    struct iovec *local_iov = g_new(struct iovec, niov);
 245    struct iovec *local_iov_head = local_iov;
 246    unsigned int nlocal_iov = niov;
 247
 248    nlocal_iov = iov_copy(local_iov, nlocal_iov,
 249                          iov, niov,
 250                          0, iov_size(iov, niov));
 251
 252    while (nlocal_iov > 0) {
 253        ssize_t len;
 254
 255        len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds,
 256                                            nfds, flags, errp);
 257
 258        if (len == QIO_CHANNEL_ERR_BLOCK) {
 259            if (qemu_in_coroutine()) {
 260                qio_channel_yield(ioc, G_IO_OUT);
 261            } else {
 262                qio_channel_wait(ioc, G_IO_OUT);
 263            }
 264            continue;
 265        }
 266        if (len < 0) {
 267            goto cleanup;
 268        }
 269
 270        iov_discard_front(&local_iov, &nlocal_iov, len);
 271
 272        fds = NULL;
 273        nfds = 0;
 274    }
 275
 276    ret = 0;
 277 cleanup:
 278    g_free(local_iov_head);
 279    return ret;
 280}
 281
 282ssize_t qio_channel_readv(QIOChannel *ioc,
 283                          const struct iovec *iov,
 284                          size_t niov,
 285                          Error **errp)
 286{
 287    return qio_channel_readv_full(ioc, iov, niov, NULL, NULL, errp);
 288}
 289
 290
 291ssize_t qio_channel_writev(QIOChannel *ioc,
 292                           const struct iovec *iov,
 293                           size_t niov,
 294                           Error **errp)
 295{
 296    return qio_channel_writev_full(ioc, iov, niov, NULL, 0, 0, errp);
 297}
 298
 299
 300ssize_t qio_channel_read(QIOChannel *ioc,
 301                         char *buf,
 302                         size_t buflen,
 303                         Error **errp)
 304{
 305    struct iovec iov = { .iov_base = buf, .iov_len = buflen };
 306    return qio_channel_readv_full(ioc, &iov, 1, NULL, NULL, errp);
 307}
 308
 309
 310ssize_t qio_channel_write(QIOChannel *ioc,
 311                          const char *buf,
 312                          size_t buflen,
 313                          Error **errp)
 314{
 315    struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
 316    return qio_channel_writev_full(ioc, &iov, 1, NULL, 0, 0, errp);
 317}
 318
 319
 320int qio_channel_read_all_eof(QIOChannel *ioc,
 321                             char *buf,
 322                             size_t buflen,
 323                             Error **errp)
 324{
 325    struct iovec iov = { .iov_base = buf, .iov_len = buflen };
 326    return qio_channel_readv_all_eof(ioc, &iov, 1, errp);
 327}
 328
 329
 330int qio_channel_read_all(QIOChannel *ioc,
 331                         char *buf,
 332                         size_t buflen,
 333                         Error **errp)
 334{
 335    struct iovec iov = { .iov_base = buf, .iov_len = buflen };
 336    return qio_channel_readv_all(ioc, &iov, 1, errp);
 337}
 338
 339
 340int qio_channel_write_all(QIOChannel *ioc,
 341                          const char *buf,
 342                          size_t buflen,
 343                          Error **errp)
 344{
 345    struct iovec iov = { .iov_base = (char *)buf, .iov_len = buflen };
 346    return qio_channel_writev_all(ioc, &iov, 1, errp);
 347}
 348
 349
 350int qio_channel_set_blocking(QIOChannel *ioc,
 351                              bool enabled,
 352                              Error **errp)
 353{
 354    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 355    return klass->io_set_blocking(ioc, enabled, errp);
 356}
 357
 358
 359int qio_channel_close(QIOChannel *ioc,
 360                      Error **errp)
 361{
 362    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 363    return klass->io_close(ioc, errp);
 364}
 365
 366
 367GSource *qio_channel_create_watch(QIOChannel *ioc,
 368                                  GIOCondition condition)
 369{
 370    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 371    GSource *ret = klass->io_create_watch(ioc, condition);
 372
 373    if (ioc->name) {
 374        g_source_set_name(ret, ioc->name);
 375    }
 376
 377    return ret;
 378}
 379
 380
 381void qio_channel_set_aio_fd_handler(QIOChannel *ioc,
 382                                    AioContext *ctx,
 383                                    IOHandler *io_read,
 384                                    IOHandler *io_write,
 385                                    void *opaque)
 386{
 387    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 388
 389    klass->io_set_aio_fd_handler(ioc, ctx, io_read, io_write, opaque);
 390}
 391
 392guint qio_channel_add_watch_full(QIOChannel *ioc,
 393                                 GIOCondition condition,
 394                                 QIOChannelFunc func,
 395                                 gpointer user_data,
 396                                 GDestroyNotify notify,
 397                                 GMainContext *context)
 398{
 399    GSource *source;
 400    guint id;
 401
 402    source = qio_channel_create_watch(ioc, condition);
 403
 404    g_source_set_callback(source, (GSourceFunc)func, user_data, notify);
 405
 406    id = g_source_attach(source, context);
 407    g_source_unref(source);
 408
 409    return id;
 410}
 411
 412guint qio_channel_add_watch(QIOChannel *ioc,
 413                            GIOCondition condition,
 414                            QIOChannelFunc func,
 415                            gpointer user_data,
 416                            GDestroyNotify notify)
 417{
 418    return qio_channel_add_watch_full(ioc, condition, func,
 419                                      user_data, notify, NULL);
 420}
 421
 422GSource *qio_channel_add_watch_source(QIOChannel *ioc,
 423                                      GIOCondition condition,
 424                                      QIOChannelFunc func,
 425                                      gpointer user_data,
 426                                      GDestroyNotify notify,
 427                                      GMainContext *context)
 428{
 429    GSource *source;
 430    guint id;
 431
 432    id = qio_channel_add_watch_full(ioc, condition, func,
 433                                    user_data, notify, context);
 434    source = g_main_context_find_source_by_id(context, id);
 435    g_source_ref(source);
 436    return source;
 437}
 438
 439
 440int qio_channel_shutdown(QIOChannel *ioc,
 441                         QIOChannelShutdown how,
 442                         Error **errp)
 443{
 444    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 445
 446    if (!klass->io_shutdown) {
 447        error_setg(errp, "Data path shutdown not supported");
 448        return -1;
 449    }
 450
 451    return klass->io_shutdown(ioc, how, errp);
 452}
 453
 454
 455void qio_channel_set_delay(QIOChannel *ioc,
 456                           bool enabled)
 457{
 458    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 459
 460    if (klass->io_set_delay) {
 461        klass->io_set_delay(ioc, enabled);
 462    }
 463}
 464
 465
 466void qio_channel_set_cork(QIOChannel *ioc,
 467                          bool enabled)
 468{
 469    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 470
 471    if (klass->io_set_cork) {
 472        klass->io_set_cork(ioc, enabled);
 473    }
 474}
 475
 476
 477off_t qio_channel_io_seek(QIOChannel *ioc,
 478                          off_t offset,
 479                          int whence,
 480                          Error **errp)
 481{
 482    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 483
 484    if (!klass->io_seek) {
 485        error_setg(errp, "Channel does not support random access");
 486        return -1;
 487    }
 488
 489    return klass->io_seek(ioc, offset, whence, errp);
 490}
 491
 492int qio_channel_flush(QIOChannel *ioc,
 493                                Error **errp)
 494{
 495    QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 496
 497    if (!klass->io_flush ||
 498        !qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY)) {
 499        return 0;
 500    }
 501
 502    return klass->io_flush(ioc, errp);
 503}
 504
 505
 506static void qio_channel_restart_read(void *opaque)
 507{
 508    QIOChannel *ioc = opaque;
 509    Coroutine *co = ioc->read_coroutine;
 510
 511    /* Assert that aio_co_wake() reenters the coroutine directly */
 512    assert(qemu_get_current_aio_context() ==
 513           qemu_coroutine_get_aio_context(co));
 514    aio_co_wake(co);
 515}
 516
 517static void qio_channel_restart_write(void *opaque)
 518{
 519    QIOChannel *ioc = opaque;
 520    Coroutine *co = ioc->write_coroutine;
 521
 522    /* Assert that aio_co_wake() reenters the coroutine directly */
 523    assert(qemu_get_current_aio_context() ==
 524           qemu_coroutine_get_aio_context(co));
 525    aio_co_wake(co);
 526}
 527
 528static void qio_channel_set_aio_fd_handlers(QIOChannel *ioc)
 529{
 530    IOHandler *rd_handler = NULL, *wr_handler = NULL;
 531    AioContext *ctx;
 532
 533    if (ioc->read_coroutine) {
 534        rd_handler = qio_channel_restart_read;
 535    }
 536    if (ioc->write_coroutine) {
 537        wr_handler = qio_channel_restart_write;
 538    }
 539
 540    ctx = ioc->ctx ? ioc->ctx : iohandler_get_aio_context();
 541    qio_channel_set_aio_fd_handler(ioc, ctx, rd_handler, wr_handler, ioc);
 542}
 543
 544void qio_channel_attach_aio_context(QIOChannel *ioc,
 545                                    AioContext *ctx)
 546{
 547    assert(!ioc->read_coroutine);
 548    assert(!ioc->write_coroutine);
 549    ioc->ctx = ctx;
 550}
 551
 552void qio_channel_detach_aio_context(QIOChannel *ioc)
 553{
 554    ioc->read_coroutine = NULL;
 555    ioc->write_coroutine = NULL;
 556    qio_channel_set_aio_fd_handlers(ioc);
 557    ioc->ctx = NULL;
 558}
 559
 560void coroutine_fn qio_channel_yield(QIOChannel *ioc,
 561                                    GIOCondition condition)
 562{
 563    assert(qemu_in_coroutine());
 564    if (condition == G_IO_IN) {
 565        assert(!ioc->read_coroutine);
 566        ioc->read_coroutine = qemu_coroutine_self();
 567    } else if (condition == G_IO_OUT) {
 568        assert(!ioc->write_coroutine);
 569        ioc->write_coroutine = qemu_coroutine_self();
 570    } else {
 571        abort();
 572    }
 573    qio_channel_set_aio_fd_handlers(ioc);
 574    qemu_coroutine_yield();
 575
 576    /* Allow interrupting the operation by reentering the coroutine other than
 577     * through the aio_fd_handlers. */
 578    if (condition == G_IO_IN && ioc->read_coroutine) {
 579        ioc->read_coroutine = NULL;
 580        qio_channel_set_aio_fd_handlers(ioc);
 581    } else if (condition == G_IO_OUT && ioc->write_coroutine) {
 582        ioc->write_coroutine = NULL;
 583        qio_channel_set_aio_fd_handlers(ioc);
 584    }
 585}
 586
 587
 588static gboolean qio_channel_wait_complete(QIOChannel *ioc,
 589                                          GIOCondition condition,
 590                                          gpointer opaque)
 591{
 592    GMainLoop *loop = opaque;
 593
 594    g_main_loop_quit(loop);
 595    return FALSE;
 596}
 597
 598
 599void qio_channel_wait(QIOChannel *ioc,
 600                      GIOCondition condition)
 601{
 602    GMainContext *ctxt = g_main_context_new();
 603    GMainLoop *loop = g_main_loop_new(ctxt, TRUE);
 604    GSource *source;
 605
 606    source = qio_channel_create_watch(ioc, condition);
 607
 608    g_source_set_callback(source,
 609                          (GSourceFunc)qio_channel_wait_complete,
 610                          loop,
 611                          NULL);
 612
 613    g_source_attach(source, ctxt);
 614
 615    g_main_loop_run(loop);
 616
 617    g_source_unref(source);
 618    g_main_loop_unref(loop);
 619    g_main_context_unref(ctxt);
 620}
 621
 622
 623static void qio_channel_finalize(Object *obj)
 624{
 625    QIOChannel *ioc = QIO_CHANNEL(obj);
 626
 627    g_free(ioc->name);
 628
 629#ifdef _WIN32
 630    if (ioc->event) {
 631        CloseHandle(ioc->event);
 632    }
 633#endif
 634}
 635
 636static const TypeInfo qio_channel_info = {
 637    .parent = TYPE_OBJECT,
 638    .name = TYPE_QIO_CHANNEL,
 639    .instance_size = sizeof(QIOChannel),
 640    .instance_finalize = qio_channel_finalize,
 641    .abstract = true,
 642    .class_size = sizeof(QIOChannelClass),
 643};
 644
 645
 646static void qio_channel_register_types(void)
 647{
 648    type_register_static(&qio_channel_info);
 649}
 650
 651
 652type_init(qio_channel_register_types);
 653