qemu/net/queue.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2003-2008 Fabrice Bellard
   3 * Copyright (c) 2009 Red Hat, Inc.
   4 *
   5 * Permission is hereby granted, free of charge, to any person obtaining a copy
   6 * of this software and associated documentation files (the "Software"), to deal
   7 * in the Software without restriction, including without limitation the rights
   8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
   9 * copies of the Software, and to permit persons to whom the Software is
  10 * furnished to do so, subject to the following conditions:
  11 *
  12 * The above copyright notice and this permission notice shall be included in
  13 * all copies or substantial portions of the Software.
  14 *
  15 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  17 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  18 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  19 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  20 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  21 * THE SOFTWARE.
  22 */
  23
  24#include "qemu/osdep.h"
  25#include "net/queue.h"
  26#include "qemu/queue.h"
  27#include "net/net.h"
  28
  29/* The delivery handler may only return zero if it will call
  30 * qemu_net_queue_flush() when it determines that it is once again able
  31 * to deliver packets. It must also call qemu_net_queue_purge() in its
  32 * cleanup path.
  33 *
  34 * If a sent callback is provided to send(), the caller must handle a
  35 * zero return from the delivery handler by not sending any more packets
  36 * until we have invoked the callback. Only in that case will we queue
  37 * the packet.
  38 *
  39 * If a sent callback isn't provided, we just drop the packet to avoid
  40 * unbounded queueing.
  41 */
  42
  43struct NetPacket {
  44    QTAILQ_ENTRY(NetPacket) entry;
  45    NetClientState *sender;
  46    unsigned flags;
  47    int size;
  48    NetPacketSent *sent_cb;
  49    uint8_t data[];
  50};
  51
  52struct NetQueue {
  53    void *opaque;
  54    uint32_t nq_maxlen;
  55    uint32_t nq_count;
  56    NetQueueDeliverFunc *deliver;
  57
  58    QTAILQ_HEAD(, NetPacket) packets;
  59
  60    unsigned delivering : 1;
  61};
  62
  63NetQueue *qemu_new_net_queue(NetQueueDeliverFunc *deliver, void *opaque)
  64{
  65    NetQueue *queue;
  66
  67    queue = g_new0(NetQueue, 1);
  68
  69    queue->opaque = opaque;
  70    queue->nq_maxlen = 10000;
  71    queue->nq_count = 0;
  72    queue->deliver = deliver;
  73
  74    QTAILQ_INIT(&queue->packets);
  75
  76    queue->delivering = 0;
  77
  78    return queue;
  79}
  80
  81void qemu_del_net_queue(NetQueue *queue)
  82{
  83    NetPacket *packet, *next;
  84
  85    QTAILQ_FOREACH_SAFE(packet, &queue->packets, entry, next) {
  86        QTAILQ_REMOVE(&queue->packets, packet, entry);
  87        g_free(packet);
  88    }
  89
  90    g_free(queue);
  91}
  92
  93static void qemu_net_queue_append(NetQueue *queue,
  94                                  NetClientState *sender,
  95                                  unsigned flags,
  96                                  const uint8_t *buf,
  97                                  size_t size,
  98                                  NetPacketSent *sent_cb)
  99{
 100    NetPacket *packet;
 101
 102    if (queue->nq_count >= queue->nq_maxlen && !sent_cb) {
 103        return; /* drop if queue full and no callback */
 104    }
 105    packet = g_malloc(sizeof(NetPacket) + size);
 106    packet->sender = sender;
 107    packet->flags = flags;
 108    packet->size = size;
 109    packet->sent_cb = sent_cb;
 110    memcpy(packet->data, buf, size);
 111
 112    queue->nq_count++;
 113    QTAILQ_INSERT_TAIL(&queue->packets, packet, entry);
 114}
 115
 116void qemu_net_queue_append_iov(NetQueue *queue,
 117                               NetClientState *sender,
 118                               unsigned flags,
 119                               const struct iovec *iov,
 120                               int iovcnt,
 121                               NetPacketSent *sent_cb)
 122{
 123    NetPacket *packet;
 124    size_t max_len = 0;
 125    int i;
 126
 127    if (queue->nq_count >= queue->nq_maxlen && !sent_cb) {
 128        return; /* drop if queue full and no callback */
 129    }
 130    for (i = 0; i < iovcnt; i++) {
 131        max_len += iov[i].iov_len;
 132    }
 133
 134    packet = g_malloc(sizeof(NetPacket) + max_len);
 135    packet->sender = sender;
 136    packet->sent_cb = sent_cb;
 137    packet->flags = flags;
 138    packet->size = 0;
 139
 140    for (i = 0; i < iovcnt; i++) {
 141        size_t len = iov[i].iov_len;
 142
 143        memcpy(packet->data + packet->size, iov[i].iov_base, len);
 144        packet->size += len;
 145    }
 146
 147    queue->nq_count++;
 148    QTAILQ_INSERT_TAIL(&queue->packets, packet, entry);
 149}
 150
 151static ssize_t qemu_net_queue_deliver(NetQueue *queue,
 152                                      NetClientState *sender,
 153                                      unsigned flags,
 154                                      const uint8_t *data,
 155                                      size_t size)
 156{
 157    ssize_t ret = -1;
 158    struct iovec iov = {
 159        .iov_base = (void *)data,
 160        .iov_len = size
 161    };
 162
 163    queue->delivering = 1;
 164    ret = queue->deliver(sender, flags, &iov, 1, queue->opaque);
 165    queue->delivering = 0;
 166
 167    return ret;
 168}
 169
 170static ssize_t qemu_net_queue_deliver_iov(NetQueue *queue,
 171                                          NetClientState *sender,
 172                                          unsigned flags,
 173                                          const struct iovec *iov,
 174                                          int iovcnt)
 175{
 176    ssize_t ret = -1;
 177
 178    queue->delivering = 1;
 179    ret = queue->deliver(sender, flags, iov, iovcnt, queue->opaque);
 180    queue->delivering = 0;
 181
 182    return ret;
 183}
 184
 185ssize_t qemu_net_queue_receive(NetQueue *queue,
 186                               const uint8_t *data,
 187                               size_t size)
 188{
 189    if (queue->delivering) {
 190        return 0;
 191    }
 192
 193    return qemu_net_queue_deliver(queue, NULL, 0, data, size);
 194}
 195
 196ssize_t qemu_net_queue_receive_iov(NetQueue *queue,
 197                                   const struct iovec *iov,
 198                                   int iovcnt)
 199{
 200    if (queue->delivering) {
 201        return 0;
 202    }
 203
 204    return qemu_net_queue_deliver_iov(queue, NULL, 0, iov, iovcnt);
 205}
 206
 207ssize_t qemu_net_queue_send(NetQueue *queue,
 208                            NetClientState *sender,
 209                            unsigned flags,
 210                            const uint8_t *data,
 211                            size_t size,
 212                            NetPacketSent *sent_cb)
 213{
 214    ssize_t ret;
 215
 216    if (queue->delivering || !qemu_can_send_packet(sender)) {
 217        qemu_net_queue_append(queue, sender, flags, data, size, sent_cb);
 218        return 0;
 219    }
 220
 221    ret = qemu_net_queue_deliver(queue, sender, flags, data, size);
 222    if (ret == 0) {
 223        qemu_net_queue_append(queue, sender, flags, data, size, sent_cb);
 224        return 0;
 225    }
 226
 227    qemu_net_queue_flush(queue);
 228
 229    return ret;
 230}
 231
 232ssize_t qemu_net_queue_send_iov(NetQueue *queue,
 233                                NetClientState *sender,
 234                                unsigned flags,
 235                                const struct iovec *iov,
 236                                int iovcnt,
 237                                NetPacketSent *sent_cb)
 238{
 239    ssize_t ret;
 240
 241    if (queue->delivering || !qemu_can_send_packet(sender)) {
 242        qemu_net_queue_append_iov(queue, sender, flags, iov, iovcnt, sent_cb);
 243        return 0;
 244    }
 245
 246    ret = qemu_net_queue_deliver_iov(queue, sender, flags, iov, iovcnt);
 247    if (ret == 0) {
 248        qemu_net_queue_append_iov(queue, sender, flags, iov, iovcnt, sent_cb);
 249        return 0;
 250    }
 251
 252    qemu_net_queue_flush(queue);
 253
 254    return ret;
 255}
 256
 257void qemu_net_queue_purge(NetQueue *queue, NetClientState *from)
 258{
 259    NetPacket *packet, *next;
 260
 261    QTAILQ_FOREACH_SAFE(packet, &queue->packets, entry, next) {
 262        if (packet->sender == from) {
 263            QTAILQ_REMOVE(&queue->packets, packet, entry);
 264            queue->nq_count--;
 265            if (packet->sent_cb) {
 266                packet->sent_cb(packet->sender, 0);
 267            }
 268            g_free(packet);
 269        }
 270    }
 271}
 272
 273bool qemu_net_queue_flush(NetQueue *queue)
 274{
 275    if (queue->delivering)
 276        return false;
 277
 278    while (!QTAILQ_EMPTY(&queue->packets)) {
 279        NetPacket *packet;
 280        int ret;
 281
 282        packet = QTAILQ_FIRST(&queue->packets);
 283        QTAILQ_REMOVE(&queue->packets, packet, entry);
 284        queue->nq_count--;
 285
 286        ret = qemu_net_queue_deliver(queue,
 287                                     packet->sender,
 288                                     packet->flags,
 289                                     packet->data,
 290                                     packet->size);
 291        if (ret == 0) {
 292            queue->nq_count++;
 293            QTAILQ_INSERT_HEAD(&queue->packets, packet, entry);
 294            return false;
 295        }
 296
 297        if (packet->sent_cb) {
 298            packet->sent_cb(packet->sender, ret);
 299        }
 300
 301        g_free(packet);
 302    }
 303    return true;
 304}
 305