qemu/io/channel-socket.c
<<
>>
Prefs
   1/*
   2 * QEMU I/O channels sockets driver
   3 *
   4 * Copyright (c) 2015 Red Hat, Inc.
   5 *
   6 * This library is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU Lesser General Public
   8 * License as published by the Free Software Foundation; either
   9 * version 2.1 of the License, or (at your option) any later version.
  10 *
  11 * This library is distributed in the hope that it will be useful,
  12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14 * Lesser General Public License for more details.
  15 *
  16 * You should have received a copy of the GNU Lesser General Public
  17 * License along with this library; if not, see <http://www.gnu.org/licenses/>.
  18 */
  19
  20#include "qemu/osdep.h"
  21#include "qapi/error.h"
  22#include "qapi/qapi-visit-sockets.h"
  23#include "qemu/module.h"
  24#include "io/channel-socket.h"
  25#include "io/channel-watch.h"
  26#include "trace.h"
  27#include "qapi/clone-visitor.h"
  28#ifdef CONFIG_LINUX
  29#include <linux/errqueue.h>
  30#include <sys/socket.h>
  31
  32#if (defined(MSG_ZEROCOPY) && defined(SO_ZEROCOPY))
  33#define QEMU_MSG_ZEROCOPY
  34#endif
  35#endif
  36
  37#define SOCKET_MAX_FDS 16
  38
  39SocketAddress *
  40qio_channel_socket_get_local_address(QIOChannelSocket *ioc,
  41                                     Error **errp)
  42{
  43    return socket_sockaddr_to_address(&ioc->localAddr,
  44                                      ioc->localAddrLen,
  45                                      errp);
  46}
  47
  48SocketAddress *
  49qio_channel_socket_get_remote_address(QIOChannelSocket *ioc,
  50                                      Error **errp)
  51{
  52    return socket_sockaddr_to_address(&ioc->remoteAddr,
  53                                      ioc->remoteAddrLen,
  54                                      errp);
  55}
  56
  57QIOChannelSocket *
  58qio_channel_socket_new(void)
  59{
  60    QIOChannelSocket *sioc;
  61    QIOChannel *ioc;
  62
  63    sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
  64    sioc->fd = -1;
  65    sioc->zero_copy_queued = 0;
  66    sioc->zero_copy_sent = 0;
  67
  68    ioc = QIO_CHANNEL(sioc);
  69    qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
  70
  71#ifdef WIN32
  72    ioc->event = CreateEvent(NULL, FALSE, FALSE, NULL);
  73#endif
  74
  75    trace_qio_channel_socket_new(sioc);
  76
  77    return sioc;
  78}
  79
  80
  81static int
  82qio_channel_socket_set_fd(QIOChannelSocket *sioc,
  83                          int fd,
  84                          Error **errp)
  85{
  86    if (sioc->fd != -1) {
  87        error_setg(errp, "Socket is already open");
  88        return -1;
  89    }
  90
  91    sioc->fd = fd;
  92    sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
  93    sioc->localAddrLen = sizeof(sioc->localAddr);
  94
  95
  96    if (getpeername(fd, (struct sockaddr *)&sioc->remoteAddr,
  97                    &sioc->remoteAddrLen) < 0) {
  98        if (errno == ENOTCONN) {
  99            memset(&sioc->remoteAddr, 0, sizeof(sioc->remoteAddr));
 100            sioc->remoteAddrLen = sizeof(sioc->remoteAddr);
 101        } else {
 102            error_setg_errno(errp, errno,
 103                             "Unable to query remote socket address");
 104            goto error;
 105        }
 106    }
 107
 108    if (getsockname(fd, (struct sockaddr *)&sioc->localAddr,
 109                    &sioc->localAddrLen) < 0) {
 110        error_setg_errno(errp, errno,
 111                         "Unable to query local socket address");
 112        goto error;
 113    }
 114
 115#ifndef WIN32
 116    if (sioc->localAddr.ss_family == AF_UNIX) {
 117        QIOChannel *ioc = QIO_CHANNEL(sioc);
 118        qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_FD_PASS);
 119    }
 120#endif /* WIN32 */
 121
 122    return 0;
 123
 124 error:
 125    sioc->fd = -1; /* Let the caller close FD on failure */
 126    return -1;
 127}
 128
 129QIOChannelSocket *
 130qio_channel_socket_new_fd(int fd,
 131                          Error **errp)
 132{
 133    QIOChannelSocket *ioc;
 134
 135    ioc = qio_channel_socket_new();
 136    if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
 137        object_unref(OBJECT(ioc));
 138        return NULL;
 139    }
 140
 141    trace_qio_channel_socket_new_fd(ioc, fd);
 142
 143    return ioc;
 144}
 145
 146
 147int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
 148                                    SocketAddress *addr,
 149                                    Error **errp)
 150{
 151    int fd;
 152
 153    trace_qio_channel_socket_connect_sync(ioc, addr);
 154    fd = socket_connect(addr, errp);
 155    if (fd < 0) {
 156        trace_qio_channel_socket_connect_fail(ioc);
 157        return -1;
 158    }
 159
 160    trace_qio_channel_socket_connect_complete(ioc, fd);
 161    if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
 162        close(fd);
 163        return -1;
 164    }
 165
 166#ifdef QEMU_MSG_ZEROCOPY
 167    int ret, v = 1;
 168    ret = setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
 169    if (ret == 0) {
 170        /* Zero copy available on host */
 171        qio_channel_set_feature(QIO_CHANNEL(ioc),
 172                                QIO_CHANNEL_FEATURE_WRITE_ZERO_COPY);
 173    }
 174#endif
 175
 176    return 0;
 177}
 178
 179
 180static void qio_channel_socket_connect_worker(QIOTask *task,
 181                                              gpointer opaque)
 182{
 183    QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
 184    SocketAddress *addr = opaque;
 185    Error *err = NULL;
 186
 187    qio_channel_socket_connect_sync(ioc, addr, &err);
 188
 189    qio_task_set_error(task, err);
 190}
 191
 192
 193void qio_channel_socket_connect_async(QIOChannelSocket *ioc,
 194                                      SocketAddress *addr,
 195                                      QIOTaskFunc callback,
 196                                      gpointer opaque,
 197                                      GDestroyNotify destroy,
 198                                      GMainContext *context)
 199{
 200    QIOTask *task = qio_task_new(
 201        OBJECT(ioc), callback, opaque, destroy);
 202    SocketAddress *addrCopy;
 203
 204    addrCopy = QAPI_CLONE(SocketAddress, addr);
 205
 206    /* socket_connect() does a non-blocking connect(), but it
 207     * still blocks in DNS lookups, so we must use a thread */
 208    trace_qio_channel_socket_connect_async(ioc, addr);
 209    qio_task_run_in_thread(task,
 210                           qio_channel_socket_connect_worker,
 211                           addrCopy,
 212                           (GDestroyNotify)qapi_free_SocketAddress,
 213                           context);
 214}
 215
 216
 217int qio_channel_socket_listen_sync(QIOChannelSocket *ioc,
 218                                   SocketAddress *addr,
 219                                   int num,
 220                                   Error **errp)
 221{
 222    int fd;
 223
 224    trace_qio_channel_socket_listen_sync(ioc, addr, num);
 225    fd = socket_listen(addr, num, errp);
 226    if (fd < 0) {
 227        trace_qio_channel_socket_listen_fail(ioc);
 228        return -1;
 229    }
 230
 231    trace_qio_channel_socket_listen_complete(ioc, fd);
 232    if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
 233        close(fd);
 234        return -1;
 235    }
 236    qio_channel_set_feature(QIO_CHANNEL(ioc), QIO_CHANNEL_FEATURE_LISTEN);
 237
 238    return 0;
 239}
 240
 241
 242struct QIOChannelListenWorkerData {
 243    SocketAddress *addr;
 244    int num; /* amount of expected connections */
 245};
 246
 247static void qio_channel_listen_worker_free(gpointer opaque)
 248{
 249    struct QIOChannelListenWorkerData *data = opaque;
 250
 251    qapi_free_SocketAddress(data->addr);
 252    g_free(data);
 253}
 254
 255static void qio_channel_socket_listen_worker(QIOTask *task,
 256                                             gpointer opaque)
 257{
 258    QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
 259    struct QIOChannelListenWorkerData *data = opaque;
 260    Error *err = NULL;
 261
 262    qio_channel_socket_listen_sync(ioc, data->addr, data->num, &err);
 263
 264    qio_task_set_error(task, err);
 265}
 266
 267
 268void qio_channel_socket_listen_async(QIOChannelSocket *ioc,
 269                                     SocketAddress *addr,
 270                                     int num,
 271                                     QIOTaskFunc callback,
 272                                     gpointer opaque,
 273                                     GDestroyNotify destroy,
 274                                     GMainContext *context)
 275{
 276    QIOTask *task = qio_task_new(
 277        OBJECT(ioc), callback, opaque, destroy);
 278    struct QIOChannelListenWorkerData *data;
 279
 280    data = g_new0(struct QIOChannelListenWorkerData, 1);
 281    data->addr = QAPI_CLONE(SocketAddress, addr);
 282    data->num = num;
 283
 284    /* socket_listen() blocks in DNS lookups, so we must use a thread */
 285    trace_qio_channel_socket_listen_async(ioc, addr, num);
 286    qio_task_run_in_thread(task,
 287                           qio_channel_socket_listen_worker,
 288                           data,
 289                           qio_channel_listen_worker_free,
 290                           context);
 291}
 292
 293
 294int qio_channel_socket_dgram_sync(QIOChannelSocket *ioc,
 295                                  SocketAddress *localAddr,
 296                                  SocketAddress *remoteAddr,
 297                                  Error **errp)
 298{
 299    int fd;
 300
 301    trace_qio_channel_socket_dgram_sync(ioc, localAddr, remoteAddr);
 302    fd = socket_dgram(remoteAddr, localAddr, errp);
 303    if (fd < 0) {
 304        trace_qio_channel_socket_dgram_fail(ioc);
 305        return -1;
 306    }
 307
 308    trace_qio_channel_socket_dgram_complete(ioc, fd);
 309    if (qio_channel_socket_set_fd(ioc, fd, errp) < 0) {
 310        close(fd);
 311        return -1;
 312    }
 313
 314    return 0;
 315}
 316
 317
 318struct QIOChannelSocketDGramWorkerData {
 319    SocketAddress *localAddr;
 320    SocketAddress *remoteAddr;
 321};
 322
 323
 324static void qio_channel_socket_dgram_worker_free(gpointer opaque)
 325{
 326    struct QIOChannelSocketDGramWorkerData *data = opaque;
 327    qapi_free_SocketAddress(data->localAddr);
 328    qapi_free_SocketAddress(data->remoteAddr);
 329    g_free(data);
 330}
 331
 332static void qio_channel_socket_dgram_worker(QIOTask *task,
 333                                            gpointer opaque)
 334{
 335    QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task));
 336    struct QIOChannelSocketDGramWorkerData *data = opaque;
 337    Error *err = NULL;
 338
 339    /* socket_dgram() blocks in DNS lookups, so we must use a thread */
 340    qio_channel_socket_dgram_sync(ioc, data->localAddr,
 341                                  data->remoteAddr, &err);
 342
 343    qio_task_set_error(task, err);
 344}
 345
 346
 347void qio_channel_socket_dgram_async(QIOChannelSocket *ioc,
 348                                    SocketAddress *localAddr,
 349                                    SocketAddress *remoteAddr,
 350                                    QIOTaskFunc callback,
 351                                    gpointer opaque,
 352                                    GDestroyNotify destroy,
 353                                    GMainContext *context)
 354{
 355    QIOTask *task = qio_task_new(
 356        OBJECT(ioc), callback, opaque, destroy);
 357    struct QIOChannelSocketDGramWorkerData *data = g_new0(
 358        struct QIOChannelSocketDGramWorkerData, 1);
 359
 360    data->localAddr = QAPI_CLONE(SocketAddress, localAddr);
 361    data->remoteAddr = QAPI_CLONE(SocketAddress, remoteAddr);
 362
 363    trace_qio_channel_socket_dgram_async(ioc, localAddr, remoteAddr);
 364    qio_task_run_in_thread(task,
 365                           qio_channel_socket_dgram_worker,
 366                           data,
 367                           qio_channel_socket_dgram_worker_free,
 368                           context);
 369}
 370
 371
 372QIOChannelSocket *
 373qio_channel_socket_accept(QIOChannelSocket *ioc,
 374                          Error **errp)
 375{
 376    QIOChannelSocket *cioc;
 377
 378    cioc = qio_channel_socket_new();
 379    cioc->remoteAddrLen = sizeof(ioc->remoteAddr);
 380    cioc->localAddrLen = sizeof(ioc->localAddr);
 381
 382 retry:
 383    trace_qio_channel_socket_accept(ioc);
 384    cioc->fd = qemu_accept(ioc->fd, (struct sockaddr *)&cioc->remoteAddr,
 385                           &cioc->remoteAddrLen);
 386    if (cioc->fd < 0) {
 387        if (errno == EINTR) {
 388            goto retry;
 389        }
 390        error_setg_errno(errp, errno, "Unable to accept connection");
 391        trace_qio_channel_socket_accept_fail(ioc);
 392        goto error;
 393    }
 394
 395    if (getsockname(cioc->fd, (struct sockaddr *)&cioc->localAddr,
 396                    &cioc->localAddrLen) < 0) {
 397        error_setg_errno(errp, errno,
 398                         "Unable to query local socket address");
 399        goto error;
 400    }
 401
 402#ifndef WIN32
 403    if (cioc->localAddr.ss_family == AF_UNIX) {
 404        QIOChannel *ioc_local = QIO_CHANNEL(cioc);
 405        qio_channel_set_feature(ioc_local, QIO_CHANNEL_FEATURE_FD_PASS);
 406    }
 407#endif /* WIN32 */
 408
 409    trace_qio_channel_socket_accept_complete(ioc, cioc, cioc->fd);
 410    return cioc;
 411
 412 error:
 413    object_unref(OBJECT(cioc));
 414    return NULL;
 415}
 416
 417static void qio_channel_socket_init(Object *obj)
 418{
 419    QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
 420    ioc->fd = -1;
 421}
 422
 423static void qio_channel_socket_finalize(Object *obj)
 424{
 425    QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(obj);
 426
 427    if (ioc->fd != -1) {
 428        QIOChannel *ioc_local = QIO_CHANNEL(ioc);
 429        if (qio_channel_has_feature(ioc_local, QIO_CHANNEL_FEATURE_LISTEN)) {
 430            Error *err = NULL;
 431
 432            socket_listen_cleanup(ioc->fd, &err);
 433            if (err) {
 434                error_report_err(err);
 435                err = NULL;
 436            }
 437        }
 438#ifdef WIN32
 439        WSAEventSelect(ioc->fd, NULL, 0);
 440#endif
 441        closesocket(ioc->fd);
 442        ioc->fd = -1;
 443    }
 444}
 445
 446
 447#ifndef WIN32
 448static void qio_channel_socket_copy_fds(struct msghdr *msg,
 449                                        int **fds, size_t *nfds)
 450{
 451    struct cmsghdr *cmsg;
 452
 453    *nfds = 0;
 454    *fds = NULL;
 455
 456    for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
 457        int fd_size, i;
 458        int gotfds;
 459
 460        if (cmsg->cmsg_len < CMSG_LEN(sizeof(int)) ||
 461            cmsg->cmsg_level != SOL_SOCKET ||
 462            cmsg->cmsg_type != SCM_RIGHTS) {
 463            continue;
 464        }
 465
 466        fd_size = cmsg->cmsg_len - CMSG_LEN(0);
 467
 468        if (!fd_size) {
 469            continue;
 470        }
 471
 472        gotfds = fd_size / sizeof(int);
 473        *fds = g_renew(int, *fds, *nfds + gotfds);
 474        memcpy(*fds + *nfds, CMSG_DATA(cmsg), fd_size);
 475
 476        for (i = 0; i < gotfds; i++) {
 477            int fd = (*fds)[*nfds + i];
 478            if (fd < 0) {
 479                continue;
 480            }
 481
 482            /* O_NONBLOCK is preserved across SCM_RIGHTS so reset it */
 483            qemu_socket_set_block(fd);
 484
 485#ifndef MSG_CMSG_CLOEXEC
 486            qemu_set_cloexec(fd);
 487#endif
 488        }
 489        *nfds += gotfds;
 490    }
 491}
 492
 493
 494static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
 495                                        const struct iovec *iov,
 496                                        size_t niov,
 497                                        int **fds,
 498                                        size_t *nfds,
 499                                        Error **errp)
 500{
 501    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
 502    ssize_t ret;
 503    struct msghdr msg = { NULL, };
 504    char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
 505    int sflags = 0;
 506
 507    memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
 508
 509    msg.msg_iov = (struct iovec *)iov;
 510    msg.msg_iovlen = niov;
 511    if (fds && nfds) {
 512        msg.msg_control = control;
 513        msg.msg_controllen = sizeof(control);
 514#ifdef MSG_CMSG_CLOEXEC
 515        sflags |= MSG_CMSG_CLOEXEC;
 516#endif
 517
 518    }
 519
 520 retry:
 521    ret = recvmsg(sioc->fd, &msg, sflags);
 522    if (ret < 0) {
 523        if (errno == EAGAIN) {
 524            return QIO_CHANNEL_ERR_BLOCK;
 525        }
 526        if (errno == EINTR) {
 527            goto retry;
 528        }
 529
 530        error_setg_errno(errp, errno,
 531                         "Unable to read from socket");
 532        return -1;
 533    }
 534
 535    if (fds && nfds) {
 536        qio_channel_socket_copy_fds(&msg, fds, nfds);
 537    }
 538
 539    return ret;
 540}
 541
 542static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
 543                                         const struct iovec *iov,
 544                                         size_t niov,
 545                                         int *fds,
 546                                         size_t nfds,
 547                                         int flags,
 548                                         Error **errp)
 549{
 550    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
 551    ssize_t ret;
 552    struct msghdr msg = { NULL, };
 553    char control[CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS)];
 554    size_t fdsize = sizeof(int) * nfds;
 555    struct cmsghdr *cmsg;
 556    int sflags = 0;
 557
 558    memset(control, 0, CMSG_SPACE(sizeof(int) * SOCKET_MAX_FDS));
 559
 560    msg.msg_iov = (struct iovec *)iov;
 561    msg.msg_iovlen = niov;
 562
 563    if (nfds) {
 564        if (nfds > SOCKET_MAX_FDS) {
 565            error_setg_errno(errp, EINVAL,
 566                             "Only %d FDs can be sent, got %zu",
 567                             SOCKET_MAX_FDS, nfds);
 568            return -1;
 569        }
 570
 571        msg.msg_control = control;
 572        msg.msg_controllen = CMSG_SPACE(sizeof(int) * nfds);
 573
 574        cmsg = CMSG_FIRSTHDR(&msg);
 575        cmsg->cmsg_len = CMSG_LEN(fdsize);
 576        cmsg->cmsg_level = SOL_SOCKET;
 577        cmsg->cmsg_type = SCM_RIGHTS;
 578        memcpy(CMSG_DATA(cmsg), fds, fdsize);
 579    }
 580
 581    if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
 582#ifdef QEMU_MSG_ZEROCOPY
 583        sflags = MSG_ZEROCOPY;
 584#else
 585        /*
 586         * We expect QIOChannel class entry point to have
 587         * blocked this code path already
 588         */
 589        g_assert_not_reached();
 590#endif
 591    }
 592
 593 retry:
 594    ret = sendmsg(sioc->fd, &msg, sflags);
 595    if (ret <= 0) {
 596        switch (errno) {
 597        case EAGAIN:
 598            return QIO_CHANNEL_ERR_BLOCK;
 599        case EINTR:
 600            goto retry;
 601        case ENOBUFS:
 602            if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
 603                error_setg_errno(errp, errno,
 604                                 "Process can't lock enough memory for using MSG_ZEROCOPY");
 605                return -1;
 606            }
 607            break;
 608        }
 609
 610        error_setg_errno(errp, errno,
 611                         "Unable to write to socket");
 612        return -1;
 613    }
 614
 615    if (flags & QIO_CHANNEL_WRITE_FLAG_ZERO_COPY) {
 616        sioc->zero_copy_queued++;
 617    }
 618
 619    return ret;
 620}
 621#else /* WIN32 */
 622static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
 623                                        const struct iovec *iov,
 624                                        size_t niov,
 625                                        int **fds,
 626                                        size_t *nfds,
 627                                        Error **errp)
 628{
 629    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
 630    ssize_t done = 0;
 631    ssize_t i;
 632
 633    for (i = 0; i < niov; i++) {
 634        ssize_t ret;
 635    retry:
 636        ret = recv(sioc->fd,
 637                   iov[i].iov_base,
 638                   iov[i].iov_len,
 639                   0);
 640        if (ret < 0) {
 641            if (errno == EAGAIN) {
 642                if (done) {
 643                    return done;
 644                } else {
 645                    return QIO_CHANNEL_ERR_BLOCK;
 646                }
 647            } else if (errno == EINTR) {
 648                goto retry;
 649            } else {
 650                error_setg_errno(errp, errno,
 651                                 "Unable to read from socket");
 652                return -1;
 653            }
 654        }
 655        done += ret;
 656        if (ret < iov[i].iov_len) {
 657            return done;
 658        }
 659    }
 660
 661    return done;
 662}
 663
 664static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
 665                                         const struct iovec *iov,
 666                                         size_t niov,
 667                                         int *fds,
 668                                         size_t nfds,
 669                                         int flags,
 670                                         Error **errp)
 671{
 672    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
 673    ssize_t done = 0;
 674    ssize_t i;
 675
 676    for (i = 0; i < niov; i++) {
 677        ssize_t ret;
 678    retry:
 679        ret = send(sioc->fd,
 680                   iov[i].iov_base,
 681                   iov[i].iov_len,
 682                   0);
 683        if (ret < 0) {
 684            if (errno == EAGAIN) {
 685                if (done) {
 686                    return done;
 687                } else {
 688                    return QIO_CHANNEL_ERR_BLOCK;
 689                }
 690            } else if (errno == EINTR) {
 691                goto retry;
 692            } else {
 693                error_setg_errno(errp, errno,
 694                                 "Unable to write to socket");
 695                return -1;
 696            }
 697        }
 698        done += ret;
 699        if (ret < iov[i].iov_len) {
 700            return done;
 701        }
 702    }
 703
 704    return done;
 705}
 706#endif /* WIN32 */
 707
 708
 709#ifdef QEMU_MSG_ZEROCOPY
 710static int qio_channel_socket_flush(QIOChannel *ioc,
 711                                    Error **errp)
 712{
 713    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
 714    struct msghdr msg = {};
 715    struct sock_extended_err *serr;
 716    struct cmsghdr *cm;
 717    char control[CMSG_SPACE(sizeof(*serr))];
 718    int received;
 719    int ret;
 720
 721    if (sioc->zero_copy_queued == sioc->zero_copy_sent) {
 722        return 0;
 723    }
 724
 725    msg.msg_control = control;
 726    msg.msg_controllen = sizeof(control);
 727    memset(control, 0, sizeof(control));
 728
 729    ret = 1;
 730
 731    while (sioc->zero_copy_sent < sioc->zero_copy_queued) {
 732        received = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
 733        if (received < 0) {
 734            switch (errno) {
 735            case EAGAIN:
 736                /* Nothing on errqueue, wait until something is available */
 737                qio_channel_wait(ioc, G_IO_ERR);
 738                continue;
 739            case EINTR:
 740                continue;
 741            default:
 742                error_setg_errno(errp, errno,
 743                                 "Unable to read errqueue");
 744                return -1;
 745            }
 746        }
 747
 748        cm = CMSG_FIRSTHDR(&msg);
 749        if (cm->cmsg_level != SOL_IP   && cm->cmsg_type != IP_RECVERR &&
 750            cm->cmsg_level != SOL_IPV6 && cm->cmsg_type != IPV6_RECVERR) {
 751            error_setg_errno(errp, EPROTOTYPE,
 752                             "Wrong cmsg in errqueue");
 753            return -1;
 754        }
 755
 756        serr = (void *) CMSG_DATA(cm);
 757        if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
 758            error_setg_errno(errp, serr->ee_errno,
 759                             "Error on socket");
 760            return -1;
 761        }
 762        if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
 763            error_setg_errno(errp, serr->ee_origin,
 764                             "Error not from zero copy");
 765            return -1;
 766        }
 767
 768        /* No errors, count successfully finished sendmsg()*/
 769        sioc->zero_copy_sent += serr->ee_data - serr->ee_info + 1;
 770
 771        /* If any sendmsg() succeeded using zero copy, return 0 at the end */
 772        if (serr->ee_code != SO_EE_CODE_ZEROCOPY_COPIED) {
 773            ret = 0;
 774        }
 775    }
 776
 777    return ret;
 778}
 779
 780#endif /* QEMU_MSG_ZEROCOPY */
 781
 782static int
 783qio_channel_socket_set_blocking(QIOChannel *ioc,
 784                                bool enabled,
 785                                Error **errp)
 786{
 787    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
 788
 789    if (enabled) {
 790        qemu_socket_set_block(sioc->fd);
 791    } else {
 792        qemu_socket_set_nonblock(sioc->fd);
 793    }
 794    return 0;
 795}
 796
 797
 798static void
 799qio_channel_socket_set_delay(QIOChannel *ioc,
 800                             bool enabled)
 801{
 802    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
 803    int v = enabled ? 0 : 1;
 804
 805    setsockopt(sioc->fd,
 806               IPPROTO_TCP, TCP_NODELAY,
 807               &v, sizeof(v));
 808}
 809
 810
 811static void
 812qio_channel_socket_set_cork(QIOChannel *ioc,
 813                            bool enabled)
 814{
 815    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
 816    int v = enabled ? 1 : 0;
 817
 818    socket_set_cork(sioc->fd, v);
 819}
 820
 821
 822static int
 823qio_channel_socket_close(QIOChannel *ioc,
 824                         Error **errp)
 825{
 826    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
 827    int rc = 0;
 828    Error *err = NULL;
 829
 830    if (sioc->fd != -1) {
 831#ifdef WIN32
 832        WSAEventSelect(sioc->fd, NULL, 0);
 833#endif
 834        if (qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_LISTEN)) {
 835            socket_listen_cleanup(sioc->fd, errp);
 836        }
 837
 838        if (closesocket(sioc->fd) < 0) {
 839            sioc->fd = -1;
 840            error_setg_errno(&err, errno, "Unable to close socket");
 841            error_propagate(errp, err);
 842            return -1;
 843        }
 844        sioc->fd = -1;
 845    }
 846    return rc;
 847}
 848
 849static int
 850qio_channel_socket_shutdown(QIOChannel *ioc,
 851                            QIOChannelShutdown how,
 852                            Error **errp)
 853{
 854    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
 855    int sockhow;
 856
 857    switch (how) {
 858    case QIO_CHANNEL_SHUTDOWN_READ:
 859        sockhow = SHUT_RD;
 860        break;
 861    case QIO_CHANNEL_SHUTDOWN_WRITE:
 862        sockhow = SHUT_WR;
 863        break;
 864    case QIO_CHANNEL_SHUTDOWN_BOTH:
 865    default:
 866        sockhow = SHUT_RDWR;
 867        break;
 868    }
 869
 870    if (shutdown(sioc->fd, sockhow) < 0) {
 871        error_setg_errno(errp, errno,
 872                         "Unable to shutdown socket");
 873        return -1;
 874    }
 875    return 0;
 876}
 877
 878static void qio_channel_socket_set_aio_fd_handler(QIOChannel *ioc,
 879                                                  AioContext *ctx,
 880                                                  IOHandler *io_read,
 881                                                  IOHandler *io_write,
 882                                                  void *opaque)
 883{
 884    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
 885    aio_set_fd_handler(ctx, sioc->fd, false,
 886                       io_read, io_write, NULL, NULL, opaque);
 887}
 888
 889static GSource *qio_channel_socket_create_watch(QIOChannel *ioc,
 890                                                GIOCondition condition)
 891{
 892    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
 893    return qio_channel_create_socket_watch(ioc,
 894                                           sioc->fd,
 895                                           condition);
 896}
 897
 898static void qio_channel_socket_class_init(ObjectClass *klass,
 899                                          void *class_data G_GNUC_UNUSED)
 900{
 901    QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
 902
 903    ioc_klass->io_writev = qio_channel_socket_writev;
 904    ioc_klass->io_readv = qio_channel_socket_readv;
 905    ioc_klass->io_set_blocking = qio_channel_socket_set_blocking;
 906    ioc_klass->io_close = qio_channel_socket_close;
 907    ioc_klass->io_shutdown = qio_channel_socket_shutdown;
 908    ioc_klass->io_set_cork = qio_channel_socket_set_cork;
 909    ioc_klass->io_set_delay = qio_channel_socket_set_delay;
 910    ioc_klass->io_create_watch = qio_channel_socket_create_watch;
 911    ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
 912#ifdef QEMU_MSG_ZEROCOPY
 913    ioc_klass->io_flush = qio_channel_socket_flush;
 914#endif
 915}
 916
 917static const TypeInfo qio_channel_socket_info = {
 918    .parent = TYPE_QIO_CHANNEL,
 919    .name = TYPE_QIO_CHANNEL_SOCKET,
 920    .instance_size = sizeof(QIOChannelSocket),
 921    .instance_init = qio_channel_socket_init,
 922    .instance_finalize = qio_channel_socket_finalize,
 923    .class_init = qio_channel_socket_class_init,
 924};
 925
 926static void qio_channel_socket_register_types(void)
 927{
 928    type_register_static(&qio_channel_socket_info);
 929}
 930
 931type_init(qio_channel_socket_register_types);
 932