qemu/block/nbd-client.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for  NBD
   3 *
   4 * Copyright (C) 2016 Red Hat, Inc.
   5 * Copyright (C) 2008 Bull S.A.S.
   6 *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
   7 *
   8 * Some parts:
   9 *    Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
  10 *
  11 * Permission is hereby granted, free of charge, to any person obtaining a copy
  12 * of this software and associated documentation files (the "Software"), to deal
  13 * in the Software without restriction, including without limitation the rights
  14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  15 * copies of the Software, and to permit persons to whom the Software is
  16 * furnished to do so, subject to the following conditions:
  17 *
  18 * The above copyright notice and this permission notice shall be included in
  19 * all copies or substantial portions of the Software.
  20 *
  21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  24 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  27 * THE SOFTWARE.
  28 */
  29
  30#include "qemu/osdep.h"
  31#include "nbd-client.h"
  32
  33#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
  34#define INDEX_TO_HANDLE(bs, index)  ((index)  ^ ((uint64_t)(intptr_t)bs))
  35
  36static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
  37{
  38    int i;
  39
  40    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
  41        if (s->recv_coroutine[i]) {
  42            qemu_coroutine_enter(s->recv_coroutine[i]);
  43        }
  44    }
  45}
  46
  47static void nbd_teardown_connection(BlockDriverState *bs)
  48{
  49    NBDClientSession *client = nbd_get_client_session(bs);
  50
  51    if (!client->ioc) { /* Already closed */
  52        return;
  53    }
  54
  55    /* finish any pending coroutines */
  56    qio_channel_shutdown(client->ioc,
  57                         QIO_CHANNEL_SHUTDOWN_BOTH,
  58                         NULL);
  59    nbd_recv_coroutines_enter_all(client);
  60
  61    nbd_client_detach_aio_context(bs);
  62    object_unref(OBJECT(client->sioc));
  63    client->sioc = NULL;
  64    object_unref(OBJECT(client->ioc));
  65    client->ioc = NULL;
  66}
  67
  68static void nbd_reply_ready(void *opaque)
  69{
  70    BlockDriverState *bs = opaque;
  71    NBDClientSession *s = nbd_get_client_session(bs);
  72    uint64_t i;
  73    int ret;
  74
  75    if (!s->ioc) { /* Already closed */
  76        return;
  77    }
  78
  79    if (s->reply.handle == 0) {
  80        /* No reply already in flight.  Fetch a header.  It is possible
  81         * that another thread has done the same thing in parallel, so
  82         * the socket is not readable anymore.
  83         */
  84        ret = nbd_receive_reply(s->ioc, &s->reply);
  85        if (ret == -EAGAIN) {
  86            return;
  87        }
  88        if (ret < 0) {
  89            s->reply.handle = 0;
  90            goto fail;
  91        }
  92    }
  93
  94    /* There's no need for a mutex on the receive side, because the
  95     * handler acts as a synchronization point and ensures that only
  96     * one coroutine is called until the reply finishes.  */
  97    i = HANDLE_TO_INDEX(s, s->reply.handle);
  98    if (i >= MAX_NBD_REQUESTS) {
  99        goto fail;
 100    }
 101
 102    if (s->recv_coroutine[i]) {
 103        qemu_coroutine_enter(s->recv_coroutine[i]);
 104        return;
 105    }
 106
 107fail:
 108    nbd_teardown_connection(bs);
 109}
 110
 111static void nbd_restart_write(void *opaque)
 112{
 113    BlockDriverState *bs = opaque;
 114
 115    qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine);
 116}
 117
 118static int nbd_co_send_request(BlockDriverState *bs,
 119                               NBDRequest *request,
 120                               QEMUIOVector *qiov)
 121{
 122    NBDClientSession *s = nbd_get_client_session(bs);
 123    AioContext *aio_context;
 124    int rc, ret, i;
 125
 126    qemu_co_mutex_lock(&s->send_mutex);
 127
 128    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
 129        if (s->recv_coroutine[i] == NULL) {
 130            s->recv_coroutine[i] = qemu_coroutine_self();
 131            break;
 132        }
 133    }
 134
 135    g_assert(qemu_in_coroutine());
 136    assert(i < MAX_NBD_REQUESTS);
 137    request->handle = INDEX_TO_HANDLE(s, i);
 138
 139    if (!s->ioc) {
 140        qemu_co_mutex_unlock(&s->send_mutex);
 141        return -EPIPE;
 142    }
 143
 144    s->send_coroutine = qemu_coroutine_self();
 145    aio_context = bdrv_get_aio_context(bs);
 146
 147    aio_set_fd_handler(aio_context, s->sioc->fd, false,
 148                       nbd_reply_ready, nbd_restart_write, bs);
 149    if (qiov) {
 150        qio_channel_set_cork(s->ioc, true);
 151        rc = nbd_send_request(s->ioc, request);
 152        if (rc >= 0) {
 153            ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov, request->len,
 154                               false);
 155            if (ret != request->len) {
 156                rc = -EIO;
 157            }
 158        }
 159        qio_channel_set_cork(s->ioc, false);
 160    } else {
 161        rc = nbd_send_request(s->ioc, request);
 162    }
 163    aio_set_fd_handler(aio_context, s->sioc->fd, false,
 164                       nbd_reply_ready, NULL, bs);
 165    s->send_coroutine = NULL;
 166    qemu_co_mutex_unlock(&s->send_mutex);
 167    return rc;
 168}
 169
 170static void nbd_co_receive_reply(NBDClientSession *s,
 171                                 NBDRequest *request,
 172                                 NBDReply *reply,
 173                                 QEMUIOVector *qiov)
 174{
 175    int ret;
 176
 177    /* Wait until we're woken up by the read handler.  TODO: perhaps
 178     * peek at the next reply and avoid yielding if it's ours?  */
 179    qemu_coroutine_yield();
 180    *reply = s->reply;
 181    if (reply->handle != request->handle ||
 182        !s->ioc) {
 183        reply->error = EIO;
 184    } else {
 185        if (qiov && reply->error == 0) {
 186            ret = nbd_wr_syncv(s->ioc, qiov->iov, qiov->niov, request->len,
 187                               true);
 188            if (ret != request->len) {
 189                reply->error = EIO;
 190            }
 191        }
 192
 193        /* Tell the read handler to read another header.  */
 194        s->reply.handle = 0;
 195    }
 196}
 197
 198static void nbd_coroutine_start(NBDClientSession *s,
 199                                NBDRequest *request)
 200{
 201    /* Poor man semaphore.  The free_sema is locked when no other request
 202     * can be accepted, and unlocked after receiving one reply.  */
 203    if (s->in_flight == MAX_NBD_REQUESTS) {
 204        qemu_co_queue_wait(&s->free_sema);
 205        assert(s->in_flight < MAX_NBD_REQUESTS);
 206    }
 207    s->in_flight++;
 208
 209    /* s->recv_coroutine[i] is set as soon as we get the send_lock.  */
 210}
 211
 212static void nbd_coroutine_end(NBDClientSession *s,
 213                              NBDRequest *request)
 214{
 215    int i = HANDLE_TO_INDEX(s, request->handle);
 216    s->recv_coroutine[i] = NULL;
 217    if (s->in_flight-- == MAX_NBD_REQUESTS) {
 218        qemu_co_queue_next(&s->free_sema);
 219    }
 220}
 221
 222int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
 223                         uint64_t bytes, QEMUIOVector *qiov, int flags)
 224{
 225    NBDClientSession *client = nbd_get_client_session(bs);
 226    NBDRequest request = {
 227        .type = NBD_CMD_READ,
 228        .from = offset,
 229        .len = bytes,
 230    };
 231    NBDReply reply;
 232    ssize_t ret;
 233
 234    assert(bytes <= NBD_MAX_BUFFER_SIZE);
 235    assert(!flags);
 236
 237    nbd_coroutine_start(client, &request);
 238    ret = nbd_co_send_request(bs, &request, NULL);
 239    if (ret < 0) {
 240        reply.error = -ret;
 241    } else {
 242        nbd_co_receive_reply(client, &request, &reply, qiov);
 243    }
 244    nbd_coroutine_end(client, &request);
 245    return -reply.error;
 246}
 247
 248int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
 249                          uint64_t bytes, QEMUIOVector *qiov, int flags)
 250{
 251    NBDClientSession *client = nbd_get_client_session(bs);
 252    NBDRequest request = {
 253        .type = NBD_CMD_WRITE,
 254        .from = offset,
 255        .len = bytes,
 256    };
 257    NBDReply reply;
 258    ssize_t ret;
 259
 260    if (flags & BDRV_REQ_FUA) {
 261        assert(client->nbdflags & NBD_FLAG_SEND_FUA);
 262        request.flags |= NBD_CMD_FLAG_FUA;
 263    }
 264
 265    assert(bytes <= NBD_MAX_BUFFER_SIZE);
 266
 267    nbd_coroutine_start(client, &request);
 268    ret = nbd_co_send_request(bs, &request, qiov);
 269    if (ret < 0) {
 270        reply.error = -ret;
 271    } else {
 272        nbd_co_receive_reply(client, &request, &reply, NULL);
 273    }
 274    nbd_coroutine_end(client, &request);
 275    return -reply.error;
 276}
 277
 278int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
 279                                int count, BdrvRequestFlags flags)
 280{
 281    ssize_t ret;
 282    NBDClientSession *client = nbd_get_client_session(bs);
 283    NBDRequest request = {
 284        .type = NBD_CMD_WRITE_ZEROES,
 285        .from = offset,
 286        .len = count,
 287    };
 288    NBDReply reply;
 289
 290    if (!(client->nbdflags & NBD_FLAG_SEND_WRITE_ZEROES)) {
 291        return -ENOTSUP;
 292    }
 293
 294    if (flags & BDRV_REQ_FUA) {
 295        assert(client->nbdflags & NBD_FLAG_SEND_FUA);
 296        request.flags |= NBD_CMD_FLAG_FUA;
 297    }
 298    if (!(flags & BDRV_REQ_MAY_UNMAP)) {
 299        request.flags |= NBD_CMD_FLAG_NO_HOLE;
 300    }
 301
 302    nbd_coroutine_start(client, &request);
 303    ret = nbd_co_send_request(bs, &request, NULL);
 304    if (ret < 0) {
 305        reply.error = -ret;
 306    } else {
 307        nbd_co_receive_reply(client, &request, &reply, NULL);
 308    }
 309    nbd_coroutine_end(client, &request);
 310    return -reply.error;
 311}
 312
 313int nbd_client_co_flush(BlockDriverState *bs)
 314{
 315    NBDClientSession *client = nbd_get_client_session(bs);
 316    NBDRequest request = { .type = NBD_CMD_FLUSH };
 317    NBDReply reply;
 318    ssize_t ret;
 319
 320    if (!(client->nbdflags & NBD_FLAG_SEND_FLUSH)) {
 321        return 0;
 322    }
 323
 324    request.from = 0;
 325    request.len = 0;
 326
 327    nbd_coroutine_start(client, &request);
 328    ret = nbd_co_send_request(bs, &request, NULL);
 329    if (ret < 0) {
 330        reply.error = -ret;
 331    } else {
 332        nbd_co_receive_reply(client, &request, &reply, NULL);
 333    }
 334    nbd_coroutine_end(client, &request);
 335    return -reply.error;
 336}
 337
 338int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int count)
 339{
 340    NBDClientSession *client = nbd_get_client_session(bs);
 341    NBDRequest request = {
 342        .type = NBD_CMD_TRIM,
 343        .from = offset,
 344        .len = count,
 345    };
 346    NBDReply reply;
 347    ssize_t ret;
 348
 349    if (!(client->nbdflags & NBD_FLAG_SEND_TRIM)) {
 350        return 0;
 351    }
 352
 353    nbd_coroutine_start(client, &request);
 354    ret = nbd_co_send_request(bs, &request, NULL);
 355    if (ret < 0) {
 356        reply.error = -ret;
 357    } else {
 358        nbd_co_receive_reply(client, &request, &reply, NULL);
 359    }
 360    nbd_coroutine_end(client, &request);
 361    return -reply.error;
 362
 363}
 364
 365void nbd_client_detach_aio_context(BlockDriverState *bs)
 366{
 367    aio_set_fd_handler(bdrv_get_aio_context(bs),
 368                       nbd_get_client_session(bs)->sioc->fd,
 369                       false, NULL, NULL, NULL);
 370}
 371
 372void nbd_client_attach_aio_context(BlockDriverState *bs,
 373                                   AioContext *new_context)
 374{
 375    aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd,
 376                       false, nbd_reply_ready, NULL, bs);
 377}
 378
 379void nbd_client_close(BlockDriverState *bs)
 380{
 381    NBDClientSession *client = nbd_get_client_session(bs);
 382    NBDRequest request = { .type = NBD_CMD_DISC };
 383
 384    if (client->ioc == NULL) {
 385        return;
 386    }
 387
 388    nbd_send_request(client->ioc, &request);
 389
 390    nbd_teardown_connection(bs);
 391}
 392
 393int nbd_client_init(BlockDriverState *bs,
 394                    QIOChannelSocket *sioc,
 395                    const char *export,
 396                    QCryptoTLSCreds *tlscreds,
 397                    const char *hostname,
 398                    Error **errp)
 399{
 400    NBDClientSession *client = nbd_get_client_session(bs);
 401    int ret;
 402
 403    /* NBD handshake */
 404    logout("session init %s\n", export);
 405    qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
 406
 407    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), export,
 408                                &client->nbdflags,
 409                                tlscreds, hostname,
 410                                &client->ioc,
 411                                &client->size, errp);
 412    if (ret < 0) {
 413        logout("Failed to negotiate with the NBD server\n");
 414        return ret;
 415    }
 416    if (client->nbdflags & NBD_FLAG_SEND_FUA) {
 417        bs->supported_write_flags = BDRV_REQ_FUA;
 418        bs->supported_zero_flags |= BDRV_REQ_FUA;
 419    }
 420    if (client->nbdflags & NBD_FLAG_SEND_WRITE_ZEROES) {
 421        bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
 422    }
 423
 424    qemu_co_mutex_init(&client->send_mutex);
 425    qemu_co_queue_init(&client->free_sema);
 426    client->sioc = sioc;
 427    object_ref(OBJECT(client->sioc));
 428
 429    if (!client->ioc) {
 430        client->ioc = QIO_CHANNEL(sioc);
 431        object_ref(OBJECT(client->ioc));
 432    }
 433
 434    /* Now that we're connected, set the socket to be non-blocking and
 435     * kick the reply mechanism.  */
 436    qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
 437
 438    nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
 439
 440    logout("Established connection with NBD server\n");
 441    return 0;
 442}
 443