qemu/net/stream.c
<<
>>
Prefs
   1/*
   2 * QEMU System Emulator
   3 *
   4 * Copyright (c) 2003-2008 Fabrice Bellard
   5 * Copyright (c) 2022 Red Hat, Inc.
   6 *
   7 * Permission is hereby granted, free of charge, to any person obtaining a copy
   8 * of this software and associated documentation files (the "Software"), to deal
   9 * in the Software without restriction, including without limitation the rights
  10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 * copies of the Software, and to permit persons to whom the Software is
  12 * furnished to do so, subject to the following conditions:
  13 *
  14 * The above copyright notice and this permission notice shall be included in
  15 * all copies or substantial portions of the Software.
  16 *
  17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23 * THE SOFTWARE.
  24 */
  25
  26#include "qemu/osdep.h"
  27
  28#include "net/net.h"
  29#include "clients.h"
  30#include "monitor/monitor.h"
  31#include "qapi/error.h"
  32#include "qemu/error-report.h"
  33#include "qemu/option.h"
  34#include "qemu/sockets.h"
  35#include "qemu/iov.h"
  36#include "qemu/main-loop.h"
  37#include "qemu/cutils.h"
  38#include "io/channel.h"
  39#include "io/channel-socket.h"
  40#include "io/net-listener.h"
  41#include "qapi/qapi-events-net.h"
  42
  43typedef struct NetStreamState {
  44    NetClientState nc;
  45    QIOChannel *listen_ioc;
  46    QIONetListener *listener;
  47    QIOChannel *ioc;
  48    guint ioc_read_tag;
  49    guint ioc_write_tag;
  50    SocketReadState rs;
  51    unsigned int send_index;      /* number of bytes sent*/
  52} NetStreamState;
  53
  54static void net_stream_listen(QIONetListener *listener,
  55                              QIOChannelSocket *cioc,
  56                              void *opaque);
  57
  58static gboolean net_stream_writable(QIOChannel *ioc,
  59                                    GIOCondition condition,
  60                                    gpointer data)
  61{
  62    NetStreamState *s = data;
  63
  64    s->ioc_write_tag = 0;
  65
  66    qemu_flush_queued_packets(&s->nc);
  67
  68    return G_SOURCE_REMOVE;
  69}
  70
  71static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
  72                                  size_t size)
  73{
  74    NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
  75    uint32_t len = htonl(size);
  76    struct iovec iov[] = {
  77        {
  78            .iov_base = &len,
  79            .iov_len  = sizeof(len),
  80        }, {
  81            .iov_base = (void *)buf,
  82            .iov_len  = size,
  83        },
  84    };
  85    struct iovec local_iov[2];
  86    unsigned int nlocal_iov;
  87    size_t remaining;
  88    ssize_t ret;
  89
  90    remaining = iov_size(iov, 2) - s->send_index;
  91    nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining);
  92    ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL);
  93    if (ret == QIO_CHANNEL_ERR_BLOCK) {
  94        ret = 0; /* handled further down */
  95    }
  96    if (ret == -1) {
  97        s->send_index = 0;
  98        return -errno;
  99    }
 100    if (ret < (ssize_t)remaining) {
 101        s->send_index += ret;
 102        s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT,
 103                                                 net_stream_writable, s, NULL);
 104        return 0;
 105    }
 106    s->send_index = 0;
 107    return size;
 108}
 109
 110static gboolean net_stream_send(QIOChannel *ioc,
 111                                GIOCondition condition,
 112                                gpointer data);
 113
 114static void net_stream_send_completed(NetClientState *nc, ssize_t len)
 115{
 116    NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
 117
 118    if (!s->ioc_read_tag) {
 119        s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN,
 120                                                net_stream_send, s, NULL);
 121    }
 122}
 123
 124static void net_stream_rs_finalize(SocketReadState *rs)
 125{
 126    NetStreamState *s = container_of(rs, NetStreamState, rs);
 127
 128    if (qemu_send_packet_async(&s->nc, rs->buf,
 129                               rs->packet_len,
 130                               net_stream_send_completed) == 0) {
 131        if (s->ioc_read_tag) {
 132            g_source_remove(s->ioc_read_tag);
 133            s->ioc_read_tag = 0;
 134        }
 135    }
 136}
 137
 138static gboolean net_stream_send(QIOChannel *ioc,
 139                                GIOCondition condition,
 140                                gpointer data)
 141{
 142    NetStreamState *s = data;
 143    int size;
 144    int ret;
 145    char buf1[NET_BUFSIZE];
 146    const char *buf;
 147
 148    size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL);
 149    if (size < 0) {
 150        if (errno != EWOULDBLOCK) {
 151            goto eoc;
 152        }
 153    } else if (size == 0) {
 154        /* end of connection */
 155    eoc:
 156        s->ioc_read_tag = 0;
 157        if (s->ioc_write_tag) {
 158            g_source_remove(s->ioc_write_tag);
 159            s->ioc_write_tag = 0;
 160        }
 161        if (s->listener) {
 162            qio_net_listener_set_client_func(s->listener, net_stream_listen,
 163                                             s, NULL);
 164        }
 165        object_unref(OBJECT(s->ioc));
 166        s->ioc = NULL;
 167
 168        net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
 169        s->nc.link_down = true;
 170        qemu_set_info_str(&s->nc, "%s", "");
 171
 172        qapi_event_send_netdev_stream_disconnected(s->nc.name);
 173
 174        return G_SOURCE_REMOVE;
 175    }
 176    buf = buf1;
 177
 178    ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size);
 179
 180    if (ret == -1) {
 181        goto eoc;
 182    }
 183
 184    return G_SOURCE_CONTINUE;
 185}
 186
 187static void net_stream_cleanup(NetClientState *nc)
 188{
 189    NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
 190    if (s->ioc) {
 191        if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) {
 192            if (s->ioc_read_tag) {
 193                g_source_remove(s->ioc_read_tag);
 194                s->ioc_read_tag = 0;
 195            }
 196            if (s->ioc_write_tag) {
 197                g_source_remove(s->ioc_write_tag);
 198                s->ioc_write_tag = 0;
 199            }
 200        }
 201        object_unref(OBJECT(s->ioc));
 202        s->ioc = NULL;
 203    }
 204    if (s->listen_ioc) {
 205        if (s->listener) {
 206            qio_net_listener_disconnect(s->listener);
 207            object_unref(OBJECT(s->listener));
 208            s->listener = NULL;
 209        }
 210        object_unref(OBJECT(s->listen_ioc));
 211        s->listen_ioc = NULL;
 212    }
 213}
 214
 215static NetClientInfo net_stream_info = {
 216    .type = NET_CLIENT_DRIVER_STREAM,
 217    .size = sizeof(NetStreamState),
 218    .receive = net_stream_receive,
 219    .cleanup = net_stream_cleanup,
 220};
 221
 222static void net_stream_listen(QIONetListener *listener,
 223                              QIOChannelSocket *cioc,
 224                              void *opaque)
 225{
 226    NetStreamState *s = opaque;
 227    SocketAddress *addr;
 228    char *uri;
 229
 230    object_ref(OBJECT(cioc));
 231
 232    qio_net_listener_set_client_func(s->listener, NULL, s, NULL);
 233
 234    s->ioc = QIO_CHANNEL(cioc);
 235    qio_channel_set_name(s->ioc, "stream-server");
 236    s->nc.link_down = false;
 237
 238    s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
 239                                            s, NULL);
 240
 241    if (cioc->localAddr.ss_family == AF_UNIX) {
 242        addr = qio_channel_socket_get_local_address(cioc, NULL);
 243    } else {
 244        addr = qio_channel_socket_get_remote_address(cioc, NULL);
 245    }
 246    g_assert(addr != NULL);
 247    uri = socket_uri(addr);
 248    qemu_set_info_str(&s->nc, "%s", uri);
 249    g_free(uri);
 250    qapi_event_send_netdev_stream_connected(s->nc.name, addr);
 251    qapi_free_SocketAddress(addr);
 252}
 253
 254static void net_stream_server_listening(QIOTask *task, gpointer opaque)
 255{
 256    NetStreamState *s = opaque;
 257    QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
 258    SocketAddress *addr;
 259    int ret;
 260
 261    if (listen_sioc->fd < 0) {
 262        qemu_set_info_str(&s->nc, "connection error");
 263        return;
 264    }
 265
 266    addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
 267    g_assert(addr != NULL);
 268    ret = qemu_socket_try_set_nonblock(listen_sioc->fd);
 269    if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
 270        qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
 271                          addr->u.fd.str, -ret);
 272        return;
 273    }
 274    g_assert(ret == 0);
 275    qapi_free_SocketAddress(addr);
 276
 277    s->nc.link_down = true;
 278    s->listener = qio_net_listener_new();
 279
 280    net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
 281    qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL);
 282    qio_net_listener_add(s->listener, listen_sioc);
 283}
 284
 285static int net_stream_server_init(NetClientState *peer,
 286                                  const char *model,
 287                                  const char *name,
 288                                  SocketAddress *addr,
 289                                  Error **errp)
 290{
 291    NetClientState *nc;
 292    NetStreamState *s;
 293    QIOChannelSocket *listen_sioc = qio_channel_socket_new();
 294
 295    nc = qemu_new_net_client(&net_stream_info, peer, model, name);
 296    s = DO_UPCAST(NetStreamState, nc, nc);
 297
 298    s->listen_ioc = QIO_CHANNEL(listen_sioc);
 299    qio_channel_socket_listen_async(listen_sioc, addr, 0,
 300                                    net_stream_server_listening, s,
 301                                    NULL, NULL);
 302
 303    return 0;
 304}
 305
 306static void net_stream_client_connected(QIOTask *task, gpointer opaque)
 307{
 308    NetStreamState *s = opaque;
 309    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc);
 310    SocketAddress *addr;
 311    gchar *uri;
 312    int ret;
 313
 314    if (sioc->fd < 0) {
 315        qemu_set_info_str(&s->nc, "connection error");
 316        goto error;
 317    }
 318
 319    addr = qio_channel_socket_get_remote_address(sioc, NULL);
 320    g_assert(addr != NULL);
 321    uri = socket_uri(addr);
 322    qemu_set_info_str(&s->nc, "%s", uri);
 323    g_free(uri);
 324
 325    ret = qemu_socket_try_set_nonblock(sioc->fd);
 326    if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
 327        qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
 328                          addr->u.fd.str, -ret);
 329        qapi_free_SocketAddress(addr);
 330        goto error;
 331    }
 332    g_assert(ret == 0);
 333
 334    net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
 335
 336    /* Disable Nagle algorithm on TCP sockets to reduce latency */
 337    qio_channel_set_delay(s->ioc, false);
 338
 339    s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
 340                                            s, NULL);
 341    s->nc.link_down = false;
 342    qapi_event_send_netdev_stream_connected(s->nc.name, addr);
 343    qapi_free_SocketAddress(addr);
 344
 345    return;
 346error:
 347    object_unref(OBJECT(s->ioc));
 348    s->ioc = NULL;
 349}
 350
 351static int net_stream_client_init(NetClientState *peer,
 352                                  const char *model,
 353                                  const char *name,
 354                                  SocketAddress *addr,
 355                                  Error **errp)
 356{
 357    NetStreamState *s;
 358    NetClientState *nc;
 359    QIOChannelSocket *sioc = qio_channel_socket_new();
 360
 361    nc = qemu_new_net_client(&net_stream_info, peer, model, name);
 362    s = DO_UPCAST(NetStreamState, nc, nc);
 363
 364    s->ioc = QIO_CHANNEL(sioc);
 365    s->nc.link_down = true;
 366
 367    qio_channel_socket_connect_async(sioc, addr,
 368                                     net_stream_client_connected, s,
 369                                     NULL, NULL);
 370
 371    return 0;
 372}
 373
 374int net_init_stream(const Netdev *netdev, const char *name,
 375                    NetClientState *peer, Error **errp)
 376{
 377    const NetdevStreamOptions *sock;
 378
 379    assert(netdev->type == NET_CLIENT_DRIVER_STREAM);
 380    sock = &netdev->u.stream;
 381
 382    if (!sock->has_server || !sock->server) {
 383        return net_stream_client_init(peer, "stream", name, sock->addr, errp);
 384    }
 385    return net_stream_server_init(peer, "stream", name, sock->addr, errp);
 386}
 387