qemu/block/nbd-client.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for  NBD
   3 *
   4 * Copyright (C) 2008 Bull S.A.S.
   5 *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
   6 *
   7 * Some parts:
   8 *    Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
   9 *
  10 * Permission is hereby granted, free of charge, to any person obtaining a copy
  11 * of this software and associated documentation files (the "Software"), to deal
  12 * in the Software without restriction, including without limitation the rights
  13 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  14 * copies of the Software, and to permit persons to whom the Software is
  15 * furnished to do so, subject to the following conditions:
  16 *
  17 * The above copyright notice and this permission notice shall be included in
  18 * all copies or substantial portions of the Software.
  19 *
  20 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  21 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  22 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  23 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  24 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  25 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  26 * THE SOFTWARE.
  27 */
  28
  29#include "qemu/osdep.h"
  30#include "nbd-client.h"
  31
  32#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
  33#define INDEX_TO_HANDLE(bs, index)  ((index)  ^ ((uint64_t)(intptr_t)bs))
  34
  35static void nbd_recv_coroutines_enter_all(NbdClientSession *s)
  36{
  37    int i;
  38
  39    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
  40        if (s->recv_coroutine[i]) {
  41            qemu_coroutine_enter(s->recv_coroutine[i], NULL);
  42        }
  43    }
  44}
  45
  46static void nbd_teardown_connection(BlockDriverState *bs)
  47{
  48    NbdClientSession *client = nbd_get_client_session(bs);
  49
  50    if (!client->ioc) { /* Already closed */
  51        return;
  52    }
  53
  54    /* finish any pending coroutines */
  55    qio_channel_shutdown(client->ioc,
  56                         QIO_CHANNEL_SHUTDOWN_BOTH,
  57                         NULL);
  58    nbd_recv_coroutines_enter_all(client);
  59
  60    nbd_client_detach_aio_context(bs);
  61    object_unref(OBJECT(client->sioc));
  62    client->sioc = NULL;
  63    object_unref(OBJECT(client->ioc));
  64    client->ioc = NULL;
  65}
  66
  67static void nbd_reply_ready(void *opaque)
  68{
  69    BlockDriverState *bs = opaque;
  70    NbdClientSession *s = nbd_get_client_session(bs);
  71    uint64_t i;
  72    int ret;
  73
  74    if (!s->ioc) { /* Already closed */
  75        return;
  76    }
  77
  78    if (s->reply.handle == 0) {
  79        /* No reply already in flight.  Fetch a header.  It is possible
  80         * that another thread has done the same thing in parallel, so
  81         * the socket is not readable anymore.
  82         */
  83        ret = nbd_receive_reply(s->ioc, &s->reply);
  84        if (ret == -EAGAIN) {
  85            return;
  86        }
  87        if (ret < 0) {
  88            s->reply.handle = 0;
  89            goto fail;
  90        }
  91    }
  92
  93    /* There's no need for a mutex on the receive side, because the
  94     * handler acts as a synchronization point and ensures that only
  95     * one coroutine is called until the reply finishes.  */
  96    i = HANDLE_TO_INDEX(s, s->reply.handle);
  97    if (i >= MAX_NBD_REQUESTS) {
  98        goto fail;
  99    }
 100
 101    if (s->recv_coroutine[i]) {
 102        qemu_coroutine_enter(s->recv_coroutine[i], NULL);
 103        return;
 104    }
 105
 106fail:
 107    nbd_teardown_connection(bs);
 108}
 109
 110static void nbd_restart_write(void *opaque)
 111{
 112    BlockDriverState *bs = opaque;
 113
 114    qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine, NULL);
 115}
 116
 117static int nbd_co_send_request(BlockDriverState *bs,
 118                               struct nbd_request *request,
 119                               QEMUIOVector *qiov, int offset)
 120{
 121    NbdClientSession *s = nbd_get_client_session(bs);
 122    AioContext *aio_context;
 123    int rc, ret, i;
 124
 125    qemu_co_mutex_lock(&s->send_mutex);
 126
 127    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
 128        if (s->recv_coroutine[i] == NULL) {
 129            s->recv_coroutine[i] = qemu_coroutine_self();
 130            break;
 131        }
 132    }
 133
 134    g_assert(qemu_in_coroutine());
 135    assert(i < MAX_NBD_REQUESTS);
 136    request->handle = INDEX_TO_HANDLE(s, i);
 137
 138    if (!s->ioc) {
 139        qemu_co_mutex_unlock(&s->send_mutex);
 140        return -EPIPE;
 141    }
 142
 143    s->send_coroutine = qemu_coroutine_self();
 144    aio_context = bdrv_get_aio_context(bs);
 145
 146    aio_set_fd_handler(aio_context, s->sioc->fd, false,
 147                       nbd_reply_ready, nbd_restart_write, bs);
 148    if (qiov) {
 149        qio_channel_set_cork(s->ioc, true);
 150        rc = nbd_send_request(s->ioc, request);
 151        if (rc >= 0) {
 152            ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov,
 153                               offset, request->len, 0);
 154            if (ret != request->len) {
 155                rc = -EIO;
 156            }
 157        }
 158        qio_channel_set_cork(s->ioc, false);
 159    } else {
 160        rc = nbd_send_request(s->ioc, request);
 161    }
 162    aio_set_fd_handler(aio_context, s->sioc->fd, false,
 163                       nbd_reply_ready, NULL, bs);
 164    s->send_coroutine = NULL;
 165    qemu_co_mutex_unlock(&s->send_mutex);
 166    return rc;
 167}
 168
 169static void nbd_co_receive_reply(NbdClientSession *s,
 170    struct nbd_request *request, struct nbd_reply *reply,
 171    QEMUIOVector *qiov, int offset)
 172{
 173    int ret;
 174
 175    /* Wait until we're woken up by the read handler.  TODO: perhaps
 176     * peek at the next reply and avoid yielding if it's ours?  */
 177    qemu_coroutine_yield();
 178    *reply = s->reply;
 179    if (reply->handle != request->handle ||
 180        !s->ioc) {
 181        reply->error = EIO;
 182    } else {
 183        if (qiov && reply->error == 0) {
 184            ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov,
 185                               offset, request->len, 1);
 186            if (ret != request->len) {
 187                reply->error = EIO;
 188            }
 189        }
 190
 191        /* Tell the read handler to read another header.  */
 192        s->reply.handle = 0;
 193    }
 194}
 195
 196static void nbd_coroutine_start(NbdClientSession *s,
 197   struct nbd_request *request)
 198{
 199    /* Poor man semaphore.  The free_sema is locked when no other request
 200     * can be accepted, and unlocked after receiving one reply.  */
 201    if (s->in_flight >= MAX_NBD_REQUESTS - 1) {
 202        qemu_co_mutex_lock(&s->free_sema);
 203        assert(s->in_flight < MAX_NBD_REQUESTS);
 204    }
 205    s->in_flight++;
 206
 207    /* s->recv_coroutine[i] is set as soon as we get the send_lock.  */
 208}
 209
 210static void nbd_coroutine_end(NbdClientSession *s,
 211    struct nbd_request *request)
 212{
 213    int i = HANDLE_TO_INDEX(s, request->handle);
 214    s->recv_coroutine[i] = NULL;
 215    if (s->in_flight-- == MAX_NBD_REQUESTS) {
 216        qemu_co_mutex_unlock(&s->free_sema);
 217    }
 218}
 219
 220static int nbd_co_readv_1(BlockDriverState *bs, int64_t sector_num,
 221                          int nb_sectors, QEMUIOVector *qiov,
 222                          int offset)
 223{
 224    NbdClientSession *client = nbd_get_client_session(bs);
 225    struct nbd_request request = { .type = NBD_CMD_READ };
 226    struct nbd_reply reply;
 227    ssize_t ret;
 228
 229    request.from = sector_num * 512;
 230    request.len = nb_sectors * 512;
 231
 232    nbd_coroutine_start(client, &request);
 233    ret = nbd_co_send_request(bs, &request, NULL, 0);
 234    if (ret < 0) {
 235        reply.error = -ret;
 236    } else {
 237        nbd_co_receive_reply(client, &request, &reply, qiov, offset);
 238    }
 239    nbd_coroutine_end(client, &request);
 240    return -reply.error;
 241
 242}
 243
 244static int nbd_co_writev_1(BlockDriverState *bs, int64_t sector_num,
 245                           int nb_sectors, QEMUIOVector *qiov,
 246                           int offset, int *flags)
 247{
 248    NbdClientSession *client = nbd_get_client_session(bs);
 249    struct nbd_request request = { .type = NBD_CMD_WRITE };
 250    struct nbd_reply reply;
 251    ssize_t ret;
 252
 253    if ((*flags & BDRV_REQ_FUA) && (client->nbdflags & NBD_FLAG_SEND_FUA)) {
 254        *flags &= ~BDRV_REQ_FUA;
 255        request.type |= NBD_CMD_FLAG_FUA;
 256    }
 257
 258    request.from = sector_num * 512;
 259    request.len = nb_sectors * 512;
 260
 261    nbd_coroutine_start(client, &request);
 262    ret = nbd_co_send_request(bs, &request, qiov, offset);
 263    if (ret < 0) {
 264        reply.error = -ret;
 265    } else {
 266        nbd_co_receive_reply(client, &request, &reply, NULL, 0);
 267    }
 268    nbd_coroutine_end(client, &request);
 269    return -reply.error;
 270}
 271
 272/* qemu-nbd has a limit of slightly less than 1M per request.  Try to
 273 * remain aligned to 4K. */
 274#define NBD_MAX_SECTORS 2040
 275
 276int nbd_client_co_readv(BlockDriverState *bs, int64_t sector_num,
 277                        int nb_sectors, QEMUIOVector *qiov)
 278{
 279    int offset = 0;
 280    int ret;
 281    while (nb_sectors > NBD_MAX_SECTORS) {
 282        ret = nbd_co_readv_1(bs, sector_num, NBD_MAX_SECTORS, qiov, offset);
 283        if (ret < 0) {
 284            return ret;
 285        }
 286        offset += NBD_MAX_SECTORS * 512;
 287        sector_num += NBD_MAX_SECTORS;
 288        nb_sectors -= NBD_MAX_SECTORS;
 289    }
 290    return nbd_co_readv_1(bs, sector_num, nb_sectors, qiov, offset);
 291}
 292
 293int nbd_client_co_writev(BlockDriverState *bs, int64_t sector_num,
 294                         int nb_sectors, QEMUIOVector *qiov, int *flags)
 295{
 296    int offset = 0;
 297    int ret;
 298    while (nb_sectors > NBD_MAX_SECTORS) {
 299        ret = nbd_co_writev_1(bs, sector_num, NBD_MAX_SECTORS, qiov, offset,
 300                              flags);
 301        if (ret < 0) {
 302            return ret;
 303        }
 304        offset += NBD_MAX_SECTORS * 512;
 305        sector_num += NBD_MAX_SECTORS;
 306        nb_sectors -= NBD_MAX_SECTORS;
 307    }
 308    return nbd_co_writev_1(bs, sector_num, nb_sectors, qiov, offset, flags);
 309}
 310
 311int nbd_client_co_flush(BlockDriverState *bs)
 312{
 313    NbdClientSession *client = nbd_get_client_session(bs);
 314    struct nbd_request request = { .type = NBD_CMD_FLUSH };
 315    struct nbd_reply reply;
 316    ssize_t ret;
 317
 318    if (!(client->nbdflags & NBD_FLAG_SEND_FLUSH)) {
 319        return 0;
 320    }
 321
 322    request.from = 0;
 323    request.len = 0;
 324
 325    nbd_coroutine_start(client, &request);
 326    ret = nbd_co_send_request(bs, &request, NULL, 0);
 327    if (ret < 0) {
 328        reply.error = -ret;
 329    } else {
 330        nbd_co_receive_reply(client, &request, &reply, NULL, 0);
 331    }
 332    nbd_coroutine_end(client, &request);
 333    return -reply.error;
 334}
 335
 336int nbd_client_co_discard(BlockDriverState *bs, int64_t sector_num,
 337                          int nb_sectors)
 338{
 339    NbdClientSession *client = nbd_get_client_session(bs);
 340    struct nbd_request request = { .type = NBD_CMD_TRIM };
 341    struct nbd_reply reply;
 342    ssize_t ret;
 343
 344    if (!(client->nbdflags & NBD_FLAG_SEND_TRIM)) {
 345        return 0;
 346    }
 347    request.from = sector_num * 512;
 348    request.len = nb_sectors * 512;
 349
 350    nbd_coroutine_start(client, &request);
 351    ret = nbd_co_send_request(bs, &request, NULL, 0);
 352    if (ret < 0) {
 353        reply.error = -ret;
 354    } else {
 355        nbd_co_receive_reply(client, &request, &reply, NULL, 0);
 356    }
 357    nbd_coroutine_end(client, &request);
 358    return -reply.error;
 359
 360}
 361
 362void nbd_client_detach_aio_context(BlockDriverState *bs)
 363{
 364    aio_set_fd_handler(bdrv_get_aio_context(bs),
 365                       nbd_get_client_session(bs)->sioc->fd,
 366                       false, NULL, NULL, NULL);
 367}
 368
 369void nbd_client_attach_aio_context(BlockDriverState *bs,
 370                                   AioContext *new_context)
 371{
 372    aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd,
 373                       false, nbd_reply_ready, NULL, bs);
 374}
 375
 376void nbd_client_close(BlockDriverState *bs)
 377{
 378    NbdClientSession *client = nbd_get_client_session(bs);
 379    struct nbd_request request = {
 380        .type = NBD_CMD_DISC,
 381        .from = 0,
 382        .len = 0
 383    };
 384
 385    if (client->ioc == NULL) {
 386        return;
 387    }
 388
 389    nbd_send_request(client->ioc, &request);
 390
 391    nbd_teardown_connection(bs);
 392}
 393
 394int nbd_client_init(BlockDriverState *bs,
 395                    QIOChannelSocket *sioc,
 396                    const char *export,
 397                    QCryptoTLSCreds *tlscreds,
 398                    const char *hostname,
 399                    Error **errp)
 400{
 401    NbdClientSession *client = nbd_get_client_session(bs);
 402    int ret;
 403
 404    /* NBD handshake */
 405    logout("session init %s\n", export);
 406    qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
 407
 408    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), export,
 409                                &client->nbdflags,
 410                                tlscreds, hostname,
 411                                &client->ioc,
 412                                &client->size, errp);
 413    if (ret < 0) {
 414        logout("Failed to negotiate with the NBD server\n");
 415        return ret;
 416    }
 417
 418    qemu_co_mutex_init(&client->send_mutex);
 419    qemu_co_mutex_init(&client->free_sema);
 420    client->sioc = sioc;
 421    object_ref(OBJECT(client->sioc));
 422
 423    if (!client->ioc) {
 424        client->ioc = QIO_CHANNEL(sioc);
 425        object_ref(OBJECT(client->ioc));
 426    }
 427
 428    /* Now that we're connected, set the socket to be non-blocking and
 429     * kick the reply mechanism.  */
 430    qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
 431
 432    nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
 433
 434    logout("Established connection with NBD server\n");
 435    return 0;
 436}
 437