qemu/block/nbd-client.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for  NBD
   3 *
   4 * Copyright (C) 2016 Red Hat, Inc.
   5 * Copyright (C) 2008 Bull S.A.S.
   6 *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
   7 *
   8 * Some parts:
   9 *    Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
  10 *
  11 * Permission is hereby granted, free of charge, to any person obtaining a copy
  12 * of this software and associated documentation files (the "Software"), to deal
  13 * in the Software without restriction, including without limitation the rights
  14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  15 * copies of the Software, and to permit persons to whom the Software is
  16 * furnished to do so, subject to the following conditions:
  17 *
  18 * The above copyright notice and this permission notice shall be included in
  19 * all copies or substantial portions of the Software.
  20 *
  21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  24 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  27 * THE SOFTWARE.
  28 */
  29
  30#include "qemu/osdep.h"
  31#include "qapi/error.h"
  32#include "nbd-client.h"
  33
  34#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
  35#define INDEX_TO_HANDLE(bs, index)  ((index)  ^ ((uint64_t)(intptr_t)bs))
  36
  37static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
  38{
  39    int i;
  40
  41    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
  42        NBDClientRequest *req = &s->requests[i];
  43
  44        if (req->coroutine && req->receiving) {
  45            aio_co_wake(req->coroutine);
  46        }
  47    }
  48}
  49
  50static void nbd_teardown_connection(BlockDriverState *bs)
  51{
  52    NBDClientSession *client = nbd_get_client_session(bs);
  53
  54    if (!client->ioc) { /* Already closed */
  55        return;
  56    }
  57
  58    /* finish any pending coroutines */
  59    qio_channel_shutdown(client->ioc,
  60                         QIO_CHANNEL_SHUTDOWN_BOTH,
  61                         NULL);
  62    BDRV_POLL_WHILE(bs, client->read_reply_co);
  63
  64    nbd_client_detach_aio_context(bs);
  65    object_unref(OBJECT(client->sioc));
  66    client->sioc = NULL;
  67    object_unref(OBJECT(client->ioc));
  68    client->ioc = NULL;
  69}
  70
  71static coroutine_fn void nbd_read_reply_entry(void *opaque)
  72{
  73    NBDClientSession *s = opaque;
  74    uint64_t i;
  75    int ret = 0;
  76    Error *local_err = NULL;
  77
  78    while (!s->quit) {
  79        assert(s->reply.handle == 0);
  80        ret = nbd_receive_reply(s->ioc, &s->reply, &local_err);
  81        if (ret < 0) {
  82            error_report_err(local_err);
  83        }
  84        if (ret <= 0) {
  85            break;
  86        }
  87
  88        /* There's no need for a mutex on the receive side, because the
  89         * handler acts as a synchronization point and ensures that only
  90         * one coroutine is called until the reply finishes.
  91         */
  92        i = HANDLE_TO_INDEX(s, s->reply.handle);
  93        if (i >= MAX_NBD_REQUESTS ||
  94            !s->requests[i].coroutine ||
  95            !s->requests[i].receiving) {
  96            break;
  97        }
  98
  99        /* We're woken up again by the request itself.  Note that there
 100         * is no race between yielding and reentering read_reply_co.  This
 101         * is because:
 102         *
 103         * - if the request runs on the same AioContext, it is only
 104         *   entered after we yield
 105         *
 106         * - if the request runs on a different AioContext, reentering
 107         *   read_reply_co happens through a bottom half, which can only
 108         *   run after we yield.
 109         */
 110        aio_co_wake(s->requests[i].coroutine);
 111        qemu_coroutine_yield();
 112    }
 113
 114    s->quit = true;
 115    nbd_recv_coroutines_enter_all(s);
 116    s->read_reply_co = NULL;
 117}
 118
 119static int nbd_co_send_request(BlockDriverState *bs,
 120                               NBDRequest *request,
 121                               QEMUIOVector *qiov)
 122{
 123    NBDClientSession *s = nbd_get_client_session(bs);
 124    int rc, ret, i;
 125
 126    qemu_co_mutex_lock(&s->send_mutex);
 127    while (s->in_flight == MAX_NBD_REQUESTS) {
 128        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
 129    }
 130    s->in_flight++;
 131
 132    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
 133        if (s->requests[i].coroutine == NULL) {
 134            break;
 135        }
 136    }
 137
 138    g_assert(qemu_in_coroutine());
 139    assert(i < MAX_NBD_REQUESTS);
 140
 141    s->requests[i].coroutine = qemu_coroutine_self();
 142    s->requests[i].receiving = false;
 143
 144    request->handle = INDEX_TO_HANDLE(s, i);
 145
 146    if (s->quit) {
 147        rc = -EIO;
 148        goto err;
 149    }
 150    if (!s->ioc) {
 151        rc = -EPIPE;
 152        goto err;
 153    }
 154
 155    if (qiov) {
 156        qio_channel_set_cork(s->ioc, true);
 157        rc = nbd_send_request(s->ioc, request);
 158        if (rc >= 0 && !s->quit) {
 159            ret = nbd_rwv(s->ioc, qiov->iov, qiov->niov, request->len, false,
 160                          NULL);
 161            if (ret != request->len) {
 162                rc = -EIO;
 163            }
 164        }
 165        qio_channel_set_cork(s->ioc, false);
 166    } else {
 167        rc = nbd_send_request(s->ioc, request);
 168    }
 169
 170err:
 171    if (rc < 0) {
 172        s->quit = true;
 173        s->requests[i].coroutine = NULL;
 174        s->in_flight--;
 175        qemu_co_queue_next(&s->free_sema);
 176    }
 177    qemu_co_mutex_unlock(&s->send_mutex);
 178    return rc;
 179}
 180
 181static void nbd_co_receive_reply(NBDClientSession *s,
 182                                 NBDRequest *request,
 183                                 NBDReply *reply,
 184                                 QEMUIOVector *qiov)
 185{
 186    int i = HANDLE_TO_INDEX(s, request->handle);
 187    int ret;
 188
 189    /* Wait until we're woken up by nbd_read_reply_entry.  */
 190    s->requests[i].receiving = true;
 191    qemu_coroutine_yield();
 192    s->requests[i].receiving = false;
 193    *reply = s->reply;
 194    if (reply->handle != request->handle || !s->ioc || s->quit) {
 195        reply->error = EIO;
 196    } else {
 197        if (qiov && reply->error == 0) {
 198            ret = nbd_rwv(s->ioc, qiov->iov, qiov->niov, request->len, true,
 199                          NULL);
 200            if (ret != request->len) {
 201                reply->error = EIO;
 202                s->quit = true;
 203            }
 204        }
 205
 206        /* Tell the read handler to read another header.  */
 207        s->reply.handle = 0;
 208    }
 209
 210    s->requests[i].coroutine = NULL;
 211
 212    /* Kick the read_reply_co to get the next reply.  */
 213    if (s->read_reply_co) {
 214        aio_co_wake(s->read_reply_co);
 215    }
 216
 217    qemu_co_mutex_lock(&s->send_mutex);
 218    s->in_flight--;
 219    qemu_co_queue_next(&s->free_sema);
 220    qemu_co_mutex_unlock(&s->send_mutex);
 221}
 222
 223int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
 224                         uint64_t bytes, QEMUIOVector *qiov, int flags)
 225{
 226    NBDClientSession *client = nbd_get_client_session(bs);
 227    NBDRequest request = {
 228        .type = NBD_CMD_READ,
 229        .from = offset,
 230        .len = bytes,
 231    };
 232    NBDReply reply;
 233    ssize_t ret;
 234
 235    assert(bytes <= NBD_MAX_BUFFER_SIZE);
 236    assert(!flags);
 237
 238    ret = nbd_co_send_request(bs, &request, NULL);
 239    if (ret < 0) {
 240        reply.error = -ret;
 241    } else {
 242        nbd_co_receive_reply(client, &request, &reply, qiov);
 243    }
 244    return -reply.error;
 245}
 246
 247int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
 248                          uint64_t bytes, QEMUIOVector *qiov, int flags)
 249{
 250    NBDClientSession *client = nbd_get_client_session(bs);
 251    NBDRequest request = {
 252        .type = NBD_CMD_WRITE,
 253        .from = offset,
 254        .len = bytes,
 255    };
 256    NBDReply reply;
 257    ssize_t ret;
 258
 259    if (flags & BDRV_REQ_FUA) {
 260        assert(client->info.flags & NBD_FLAG_SEND_FUA);
 261        request.flags |= NBD_CMD_FLAG_FUA;
 262    }
 263
 264    assert(bytes <= NBD_MAX_BUFFER_SIZE);
 265
 266    ret = nbd_co_send_request(bs, &request, qiov);
 267    if (ret < 0) {
 268        reply.error = -ret;
 269    } else {
 270        nbd_co_receive_reply(client, &request, &reply, NULL);
 271    }
 272    return -reply.error;
 273}
 274
 275int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
 276                                int bytes, BdrvRequestFlags flags)
 277{
 278    ssize_t ret;
 279    NBDClientSession *client = nbd_get_client_session(bs);
 280    NBDRequest request = {
 281        .type = NBD_CMD_WRITE_ZEROES,
 282        .from = offset,
 283        .len = bytes,
 284    };
 285    NBDReply reply;
 286
 287    if (!(client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
 288        return -ENOTSUP;
 289    }
 290
 291    if (flags & BDRV_REQ_FUA) {
 292        assert(client->info.flags & NBD_FLAG_SEND_FUA);
 293        request.flags |= NBD_CMD_FLAG_FUA;
 294    }
 295    if (!(flags & BDRV_REQ_MAY_UNMAP)) {
 296        request.flags |= NBD_CMD_FLAG_NO_HOLE;
 297    }
 298
 299    ret = nbd_co_send_request(bs, &request, NULL);
 300    if (ret < 0) {
 301        reply.error = -ret;
 302    } else {
 303        nbd_co_receive_reply(client, &request, &reply, NULL);
 304    }
 305    return -reply.error;
 306}
 307
 308int nbd_client_co_flush(BlockDriverState *bs)
 309{
 310    NBDClientSession *client = nbd_get_client_session(bs);
 311    NBDRequest request = { .type = NBD_CMD_FLUSH };
 312    NBDReply reply;
 313    ssize_t ret;
 314
 315    if (!(client->info.flags & NBD_FLAG_SEND_FLUSH)) {
 316        return 0;
 317    }
 318
 319    request.from = 0;
 320    request.len = 0;
 321
 322    ret = nbd_co_send_request(bs, &request, NULL);
 323    if (ret < 0) {
 324        reply.error = -ret;
 325    } else {
 326        nbd_co_receive_reply(client, &request, &reply, NULL);
 327    }
 328    return -reply.error;
 329}
 330
 331int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int bytes)
 332{
 333    NBDClientSession *client = nbd_get_client_session(bs);
 334    NBDRequest request = {
 335        .type = NBD_CMD_TRIM,
 336        .from = offset,
 337        .len = bytes,
 338    };
 339    NBDReply reply;
 340    ssize_t ret;
 341
 342    if (!(client->info.flags & NBD_FLAG_SEND_TRIM)) {
 343        return 0;
 344    }
 345
 346    ret = nbd_co_send_request(bs, &request, NULL);
 347    if (ret < 0) {
 348        reply.error = -ret;
 349    } else {
 350        nbd_co_receive_reply(client, &request, &reply, NULL);
 351    }
 352    return -reply.error;
 353
 354}
 355
 356void nbd_client_detach_aio_context(BlockDriverState *bs)
 357{
 358    NBDClientSession *client = nbd_get_client_session(bs);
 359    qio_channel_detach_aio_context(QIO_CHANNEL(client->ioc));
 360}
 361
 362void nbd_client_attach_aio_context(BlockDriverState *bs,
 363                                   AioContext *new_context)
 364{
 365    NBDClientSession *client = nbd_get_client_session(bs);
 366    qio_channel_attach_aio_context(QIO_CHANNEL(client->ioc), new_context);
 367    aio_co_schedule(new_context, client->read_reply_co);
 368}
 369
 370void nbd_client_close(BlockDriverState *bs)
 371{
 372    NBDClientSession *client = nbd_get_client_session(bs);
 373    NBDRequest request = { .type = NBD_CMD_DISC };
 374
 375    if (client->ioc == NULL) {
 376        return;
 377    }
 378
 379    nbd_send_request(client->ioc, &request);
 380
 381    nbd_teardown_connection(bs);
 382}
 383
 384int nbd_client_init(BlockDriverState *bs,
 385                    QIOChannelSocket *sioc,
 386                    const char *export,
 387                    QCryptoTLSCreds *tlscreds,
 388                    const char *hostname,
 389                    Error **errp)
 390{
 391    NBDClientSession *client = nbd_get_client_session(bs);
 392    int ret;
 393
 394    /* NBD handshake */
 395    logout("session init %s\n", export);
 396    qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
 397
 398    client->info.request_sizes = true;
 399    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), export,
 400                                tlscreds, hostname,
 401                                &client->ioc, &client->info, errp);
 402    if (ret < 0) {
 403        logout("Failed to negotiate with the NBD server\n");
 404        return ret;
 405    }
 406    if (client->info.flags & NBD_FLAG_SEND_FUA) {
 407        bs->supported_write_flags = BDRV_REQ_FUA;
 408        bs->supported_zero_flags |= BDRV_REQ_FUA;
 409    }
 410    if (client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
 411        bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
 412    }
 413    if (client->info.min_block > bs->bl.request_alignment) {
 414        bs->bl.request_alignment = client->info.min_block;
 415    }
 416
 417    qemu_co_mutex_init(&client->send_mutex);
 418    qemu_co_queue_init(&client->free_sema);
 419    client->sioc = sioc;
 420    object_ref(OBJECT(client->sioc));
 421
 422    if (!client->ioc) {
 423        client->ioc = QIO_CHANNEL(sioc);
 424        object_ref(OBJECT(client->ioc));
 425    }
 426
 427    /* Now that we're connected, set the socket to be non-blocking and
 428     * kick the reply mechanism.  */
 429    qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
 430    client->read_reply_co = qemu_coroutine_create(nbd_read_reply_entry, client);
 431    nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
 432
 433    logout("Established connection with NBD server\n");
 434    return 0;
 435}
 436