qemu/block/nbd-client.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for  NBD
   3 *
   4 * Copyright (C) 2008 Bull S.A.S.
   5 *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
   6 *
   7 * Some parts:
   8 *    Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
   9 *
  10 * Permission is hereby granted, free of charge, to any person obtaining a copy
  11 * of this software and associated documentation files (the "Software"), to deal
  12 * in the Software without restriction, including without limitation the rights
  13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  14 * copies of the Software, and to permit persons to whom the Software is
  15 * furnished to do so, subject to the following conditions:
  16 *
  17 * The above copyright notice and this permission notice shall be included in
  18 * all copies or substantial portions of the Software.
  19 *
  20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  26 * THE SOFTWARE.
  27 */
  28
  29#include "qemu/osdep.h"
  30#include "nbd-client.h"
  31
  32#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
  33#define INDEX_TO_HANDLE(bs, index)  ((index)  ^ ((uint64_t)(intptr_t)bs))
  34
  35static void nbd_recv_coroutines_enter_all(NbdClientSession *s)
  36{
  37    int i;
  38
  39    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
  40        if (s->recv_coroutine[i]) {
  41            qemu_coroutine_enter(s->recv_coroutine[i]);
  42        }
  43    }
  44}
  45
  46static void nbd_teardown_connection(BlockDriverState *bs)
  47{
  48    NbdClientSession *client = nbd_get_client_session(bs);
  49
  50    if (!client->ioc) { /* Already closed */
  51        return;
  52    }
  53
  54    /* finish any pending coroutines */
  55    qio_channel_shutdown(client->ioc,
  56                         QIO_CHANNEL_SHUTDOWN_BOTH,
  57                         NULL);
  58    nbd_recv_coroutines_enter_all(client);
  59
  60    nbd_client_detach_aio_context(bs);
  61    object_unref(OBJECT(client->sioc));
  62    client->sioc = NULL;
  63    object_unref(OBJECT(client->ioc));
  64    client->ioc = NULL;
  65}
  66
  67static void nbd_reply_ready(void *opaque)
  68{
  69    BlockDriverState *bs = opaque;
  70    NbdClientSession *s = nbd_get_client_session(bs);
  71    uint64_t i;
  72    int ret;
  73
  74    if (!s->ioc) { /* Already closed */
  75        return;
  76    }
  77
  78    if (s->reply.handle == 0) {
  79        /* No reply already in flight.  Fetch a header.  It is possible
  80         * that another thread has done the same thing in parallel, so
  81         * the socket is not readable anymore.
  82         */
  83        ret = nbd_receive_reply(s->ioc, &s->reply);
  84        if (ret == -EAGAIN) {
  85            return;
  86        }
  87        if (ret < 0) {
  88            s->reply.handle = 0;
  89            goto fail;
  90        }
  91    }
  92
  93    /* There's no need for a mutex on the receive side, because the
  94     * handler acts as a synchronization point and ensures that only
  95     * one coroutine is called until the reply finishes.  */
  96    i = HANDLE_TO_INDEX(s, s->reply.handle);
  97    if (i >= MAX_NBD_REQUESTS) {
  98        goto fail;
  99    }
 100
 101    if (s->recv_coroutine[i]) {
 102        qemu_coroutine_enter(s->recv_coroutine[i]);
 103        return;
 104    }
 105
 106fail:
 107    nbd_teardown_connection(bs);
 108}
 109
 110static void nbd_restart_write(void *opaque)
 111{
 112    BlockDriverState *bs = opaque;
 113
 114    qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine);
 115}
 116
 117static int nbd_co_send_request(BlockDriverState *bs,
 118                               struct nbd_request *request,
 119                               QEMUIOVector *qiov)
 120{
 121    NbdClientSession *s = nbd_get_client_session(bs);
 122    AioContext *aio_context;
 123    int rc, ret, i;
 124
 125    qemu_co_mutex_lock(&s->send_mutex);
 126
 127    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
 128        if (s->recv_coroutine[i] == NULL) {
 129            s->recv_coroutine[i] = qemu_coroutine_self();
 130            break;
 131        }
 132    }
 133
 134    g_assert(qemu_in_coroutine());
 135    assert(i < MAX_NBD_REQUESTS);
 136    request->handle = INDEX_TO_HANDLE(s, i);
 137
 138    if (!s->ioc) {
 139        qemu_co_mutex_unlock(&s->send_mutex);
 140        return -EPIPE;
 141    }
 142
 143    s->send_coroutine = qemu_coroutine_self();
 144    aio_context = bdrv_get_aio_context(bs);
 145
 146    aio_set_fd_handler(aio_context, s->sioc->fd, false,
 147                       nbd_reply_ready, nbd_restart_write, bs);
 148    if (qiov) {
 149        qio_channel_set_cork(s->ioc, true);
 150        rc = nbd_send_request(s->ioc, request);
 151        if (rc >= 0) {
 152            ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov, request->len,
 153                               false);
 154            if (ret != request->len) {
 155                rc = -EIO;
 156            }
 157        }
 158        qio_channel_set_cork(s->ioc, false);
 159    } else {
 160        rc = nbd_send_request(s->ioc, request);
 161    }
 162    aio_set_fd_handler(aio_context, s->sioc->fd, false,
 163                       nbd_reply_ready, NULL, bs);
 164    s->send_coroutine = NULL;
 165    qemu_co_mutex_unlock(&s->send_mutex);
 166    return rc;
 167}
 168
 169static void nbd_co_receive_reply(NbdClientSession *s,
 170                                 struct nbd_request *request,
 171                                 struct nbd_reply *reply,
 172                                 QEMUIOVector *qiov)
 173{
 174    int ret;
 175
 176    /* Wait until we're woken up by the read handler.  TODO: perhaps
 177     * peek at the next reply and avoid yielding if it's ours?  */
 178    qemu_coroutine_yield();
 179    *reply = s->reply;
 180    if (reply->handle != request->handle ||
 181        !s->ioc) {
 182        reply->error = EIO;
 183    } else {
 184        if (qiov && reply->error == 0) {
 185            ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov, request->len,
 186                               true);
 187            if (ret != request->len) {
 188                reply->error = EIO;
 189            }
 190        }
 191
 192        /* Tell the read handler to read another header.  */
 193        s->reply.handle = 0;
 194    }
 195}
 196
 197static void nbd_coroutine_start(NbdClientSession *s,
 198   struct nbd_request *request)
 199{
 200    /* Poor man semaphore.  The free_sema is locked when no other request
 201     * can be accepted, and unlocked after receiving one reply.  */
 202    if (s->in_flight >= MAX_NBD_REQUESTS - 1) {
 203        qemu_co_mutex_lock(&s->free_sema);
 204        assert(s->in_flight < MAX_NBD_REQUESTS);
 205    }
 206    s->in_flight++;
 207
 208    /* s->recv_coroutine[i] is set as soon as we get the send_lock.  */
 209}
 210
 211static void nbd_coroutine_end(NbdClientSession *s,
 212    struct nbd_request *request)
 213{
 214    int i = HANDLE_TO_INDEX(s, request->handle);
 215    s->recv_coroutine[i] = NULL;
 216    if (s->in_flight-- == MAX_NBD_REQUESTS) {
 217        qemu_co_mutex_unlock(&s->free_sema);
 218    }
 219}
 220
 221int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
 222                         uint64_t bytes, QEMUIOVector *qiov, int flags)
 223{
 224    NbdClientSession *client = nbd_get_client_session(bs);
 225    struct nbd_request request = {
 226        .type = NBD_CMD_READ,
 227        .from = offset,
 228        .len = bytes,
 229    };
 230    struct nbd_reply reply;
 231    ssize_t ret;
 232
 233    assert(bytes <= NBD_MAX_BUFFER_SIZE);
 234    assert(!flags);
 235
 236    nbd_coroutine_start(client, &request);
 237    ret = nbd_co_send_request(bs, &request, NULL);
 238    if (ret < 0) {
 239        reply.error = -ret;
 240    } else {
 241        nbd_co_receive_reply(client, &request, &reply, qiov);
 242    }
 243    nbd_coroutine_end(client, &request);
 244    return -reply.error;
 245}
 246
 247int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
 248                          uint64_t bytes, QEMUIOVector *qiov, int flags)
 249{
 250    NbdClientSession *client = nbd_get_client_session(bs);
 251    struct nbd_request request = {
 252        .type = NBD_CMD_WRITE,
 253        .from = offset,
 254        .len = bytes,
 255    };
 256    struct nbd_reply reply;
 257    ssize_t ret;
 258
 259    if (flags & BDRV_REQ_FUA) {
 260        assert(client->nbdflags & NBD_FLAG_SEND_FUA);
 261        request.type |= NBD_CMD_FLAG_FUA;
 262    }
 263
 264    assert(bytes <= NBD_MAX_BUFFER_SIZE);
 265
 266    nbd_coroutine_start(client, &request);
 267    ret = nbd_co_send_request(bs, &request, qiov);
 268    if (ret < 0) {
 269        reply.error = -ret;
 270    } else {
 271        nbd_co_receive_reply(client, &request, &reply, NULL);
 272    }
 273    nbd_coroutine_end(client, &request);
 274    return -reply.error;
 275}
 276
 277int nbd_client_co_flush(BlockDriverState *bs)
 278{
 279    NbdClientSession *client = nbd_get_client_session(bs);
 280    struct nbd_request request = { .type = NBD_CMD_FLUSH };
 281    struct nbd_reply reply;
 282    ssize_t ret;
 283
 284    if (!(client->nbdflags & NBD_FLAG_SEND_FLUSH)) {
 285        return 0;
 286    }
 287
 288    request.from = 0;
 289    request.len = 0;
 290
 291    nbd_coroutine_start(client, &request);
 292    ret = nbd_co_send_request(bs, &request, NULL);
 293    if (ret < 0) {
 294        reply.error = -ret;
 295    } else {
 296        nbd_co_receive_reply(client, &request, &reply, NULL);
 297    }
 298    nbd_coroutine_end(client, &request);
 299    return -reply.error;
 300}
 301
 302int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int count)
 303{
 304    NbdClientSession *client = nbd_get_client_session(bs);
 305    struct nbd_request request = {
 306        .type = NBD_CMD_TRIM,
 307        .from = offset,
 308        .len = count,
 309    };
 310    struct nbd_reply reply;
 311    ssize_t ret;
 312
 313    if (!(client->nbdflags & NBD_FLAG_SEND_TRIM)) {
 314        return 0;
 315    }
 316
 317    nbd_coroutine_start(client, &request);
 318    ret = nbd_co_send_request(bs, &request, NULL);
 319    if (ret < 0) {
 320        reply.error = -ret;
 321    } else {
 322        nbd_co_receive_reply(client, &request, &reply, NULL);
 323    }
 324    nbd_coroutine_end(client, &request);
 325    return -reply.error;
 326
 327}
 328
 329void nbd_client_detach_aio_context(BlockDriverState *bs)
 330{
 331    aio_set_fd_handler(bdrv_get_aio_context(bs),
 332                       nbd_get_client_session(bs)->sioc->fd,
 333                       false, NULL, NULL, NULL);
 334}
 335
 336void nbd_client_attach_aio_context(BlockDriverState *bs,
 337                                   AioContext *new_context)
 338{
 339    aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd,
 340                       false, nbd_reply_ready, NULL, bs);
 341}
 342
 343void nbd_client_close(BlockDriverState *bs)
 344{
 345    NbdClientSession *client = nbd_get_client_session(bs);
 346    struct nbd_request request = {
 347        .type = NBD_CMD_DISC,
 348        .from = 0,
 349        .len = 0
 350    };
 351
 352    if (client->ioc == NULL) {
 353        return;
 354    }
 355
 356    nbd_send_request(client->ioc, &request);
 357
 358    nbd_teardown_connection(bs);
 359}
 360
 361int nbd_client_init(BlockDriverState *bs,
 362                    QIOChannelSocket *sioc,
 363                    const char *export,
 364                    QCryptoTLSCreds *tlscreds,
 365                    const char *hostname,
 366                    Error **errp)
 367{
 368    NbdClientSession *client = nbd_get_client_session(bs);
 369    int ret;
 370
 371    /* NBD handshake */
 372    logout("session init %s\n", export);
 373    qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
 374
 375    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), export,
 376                                &client->nbdflags,
 377                                tlscreds, hostname,
 378                                &client->ioc,
 379                                &client->size, errp);
 380    if (ret < 0) {
 381        logout("Failed to negotiate with the NBD server\n");
 382        return ret;
 383    }
 384    if (client->nbdflags & NBD_FLAG_SEND_FUA) {
 385        bs->supported_write_flags = BDRV_REQ_FUA;
 386    }
 387
 388    qemu_co_mutex_init(&client->send_mutex);
 389    qemu_co_mutex_init(&client->free_sema);
 390    client->sioc = sioc;
 391    object_ref(OBJECT(client->sioc));
 392
 393    if (!client->ioc) {
 394        client->ioc = QIO_CHANNEL(sioc);
 395        object_ref(OBJECT(client->ioc));
 396    }
 397
 398    /* Now that we're connected, set the socket to be non-blocking and
 399     * kick the reply mechanism.  */
 400    qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
 401
 402    nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
 403
 404    logout("Established connection with NBD server\n");
 405    return 0;
 406}
 407