qemu/block/nbd-client.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for  NBD
   3 *
   4 * Copyright (C) 2016 Red Hat, Inc.
   5 * Copyright (C) 2008 Bull S.A.S.
   6 *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
   7 *
   8 * Some parts:
   9 *    Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
  10 *
  11 * Permission is hereby granted, free of charge, to any person obtaining a copy
  12 * of this software and associated documentation files (the "Software"), to deal
  13 * in the Software without restriction, including without limitation the rights
  14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  15 * copies of the Software, and to permit persons to whom the Software is
  16 * furnished to do so, subject to the following conditions:
  17 *
  18 * The above copyright notice and this permission notice shall be included in
  19 * all copies or substantial portions of the Software.
  20 *
  21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  24 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  27 * THE SOFTWARE.
  28 */
  29
  30#include "qemu/osdep.h"
  31#include "qapi/error.h"
  32#include "nbd-client.h"
  33
  34#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
  35#define INDEX_TO_HANDLE(bs, index)  ((index)  ^ ((uint64_t)(intptr_t)bs))
  36
  37static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
  38{
  39    int i;
  40
  41    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
  42        NBDClientRequest *req = &s->requests[i];
  43
  44        if (req->coroutine && req->receiving) {
  45            aio_co_wake(req->coroutine);
  46        }
  47    }
  48}
  49
  50static void nbd_teardown_connection(BlockDriverState *bs)
  51{
  52    NBDClientSession *client = nbd_get_client_session(bs);
  53
  54    if (!client->ioc) { /* Already closed */
  55        return;
  56    }
  57
  58    /* finish any pending coroutines */
  59    qio_channel_shutdown(client->ioc,
  60                         QIO_CHANNEL_SHUTDOWN_BOTH,
  61                         NULL);
  62    BDRV_POLL_WHILE(bs, client->read_reply_co);
  63
  64    nbd_client_detach_aio_context(bs);
  65    object_unref(OBJECT(client->sioc));
  66    client->sioc = NULL;
  67    object_unref(OBJECT(client->ioc));
  68    client->ioc = NULL;
  69}
  70
  71static coroutine_fn void nbd_read_reply_entry(void *opaque)
  72{
  73    NBDClientSession *s = opaque;
  74    uint64_t i;
  75    int ret = 0;
  76    Error *local_err = NULL;
  77
  78    while (!s->quit) {
  79        assert(s->reply.handle == 0);
  80        ret = nbd_receive_reply(s->ioc, &s->reply, &local_err);
  81        if (ret < 0) {
  82            error_report_err(local_err);
  83        }
  84        if (ret <= 0) {
  85            break;
  86        }
  87
  88        /* There's no need for a mutex on the receive side, because the
  89         * handler acts as a synchronization point and ensures that only
  90         * one coroutine is called until the reply finishes.
  91         */
  92        i = HANDLE_TO_INDEX(s, s->reply.handle);
  93        if (i >= MAX_NBD_REQUESTS ||
  94            !s->requests[i].coroutine ||
  95            !s->requests[i].receiving) {
  96            break;
  97        }
  98
  99        /* We're woken up again by the request itself.  Note that there
 100         * is no race between yielding and reentering read_reply_co.  This
 101         * is because:
 102         *
 103         * - if the request runs on the same AioContext, it is only
 104         *   entered after we yield
 105         *
 106         * - if the request runs on a different AioContext, reentering
 107         *   read_reply_co happens through a bottom half, which can only
 108         *   run after we yield.
 109         */
 110        aio_co_wake(s->requests[i].coroutine);
 111        qemu_coroutine_yield();
 112    }
 113
 114    s->quit = true;
 115    nbd_recv_coroutines_enter_all(s);
 116    s->read_reply_co = NULL;
 117}
 118
 119static int nbd_co_send_request(BlockDriverState *bs,
 120                               NBDRequest *request,
 121                               QEMUIOVector *qiov)
 122{
 123    NBDClientSession *s = nbd_get_client_session(bs);
 124    int rc, ret, i;
 125
 126    qemu_co_mutex_lock(&s->send_mutex);
 127    while (s->in_flight == MAX_NBD_REQUESTS) {
 128        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
 129    }
 130    s->in_flight++;
 131
 132    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
 133        if (s->requests[i].coroutine == NULL) {
 134            break;
 135        }
 136    }
 137
 138    g_assert(qemu_in_coroutine());
 139    assert(i < MAX_NBD_REQUESTS);
 140
 141    s->requests[i].coroutine = qemu_coroutine_self();
 142    s->requests[i].receiving = false;
 143
 144    request->handle = INDEX_TO_HANDLE(s, i);
 145
 146    if (s->quit) {
 147        qemu_co_mutex_unlock(&s->send_mutex);
 148        return -EIO;
 149    }
 150    if (!s->ioc) {
 151        qemu_co_mutex_unlock(&s->send_mutex);
 152        return -EPIPE;
 153    }
 154
 155    if (qiov) {
 156        qio_channel_set_cork(s->ioc, true);
 157        rc = nbd_send_request(s->ioc, request);
 158        if (rc >= 0 && !s->quit) {
 159            ret = nbd_rwv(s->ioc, qiov->iov, qiov->niov, request->len, false,
 160                          NULL);
 161            if (ret != request->len) {
 162                rc = -EIO;
 163            }
 164        }
 165        qio_channel_set_cork(s->ioc, false);
 166    } else {
 167        rc = nbd_send_request(s->ioc, request);
 168    }
 169    if (rc < 0) {
 170        s->quit = true;
 171    }
 172    qemu_co_mutex_unlock(&s->send_mutex);
 173    return rc;
 174}
 175
 176static void nbd_co_receive_reply(NBDClientSession *s,
 177                                 NBDRequest *request,
 178                                 NBDReply *reply,
 179                                 QEMUIOVector *qiov)
 180{
 181    int i = HANDLE_TO_INDEX(s, request->handle);
 182    int ret;
 183
 184    /* Wait until we're woken up by nbd_read_reply_entry.  */
 185    s->requests[i].receiving = true;
 186    qemu_coroutine_yield();
 187    s->requests[i].receiving = false;
 188    *reply = s->reply;
 189    if (reply->handle != request->handle || !s->ioc || s->quit) {
 190        reply->error = EIO;
 191    } else {
 192        if (qiov && reply->error == 0) {
 193            ret = nbd_rwv(s->ioc, qiov->iov, qiov->niov, request->len, true,
 194                          NULL);
 195            if (ret != request->len) {
 196                reply->error = EIO;
 197                s->quit = true;
 198            }
 199        }
 200
 201        /* Tell the read handler to read another header.  */
 202        s->reply.handle = 0;
 203    }
 204}
 205
 206static void nbd_coroutine_end(BlockDriverState *bs,
 207                              NBDRequest *request)
 208{
 209    NBDClientSession *s = nbd_get_client_session(bs);
 210    int i = HANDLE_TO_INDEX(s, request->handle);
 211
 212    s->requests[i].coroutine = NULL;
 213
 214    /* Kick the read_reply_co to get the next reply.  */
 215    if (s->read_reply_co) {
 216        aio_co_wake(s->read_reply_co);
 217    }
 218
 219    qemu_co_mutex_lock(&s->send_mutex);
 220    s->in_flight--;
 221    qemu_co_queue_next(&s->free_sema);
 222    qemu_co_mutex_unlock(&s->send_mutex);
 223}
 224
 225int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
 226                         uint64_t bytes, QEMUIOVector *qiov, int flags)
 227{
 228    NBDClientSession *client = nbd_get_client_session(bs);
 229    NBDRequest request = {
 230        .type = NBD_CMD_READ,
 231        .from = offset,
 232        .len = bytes,
 233    };
 234    NBDReply reply;
 235    ssize_t ret;
 236
 237    assert(bytes <= NBD_MAX_BUFFER_SIZE);
 238    assert(!flags);
 239
 240    ret = nbd_co_send_request(bs, &request, NULL);
 241    if (ret < 0) {
 242        reply.error = -ret;
 243    } else {
 244        nbd_co_receive_reply(client, &request, &reply, qiov);
 245    }
 246    nbd_coroutine_end(bs, &request);
 247    return -reply.error;
 248}
 249
 250int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
 251                          uint64_t bytes, QEMUIOVector *qiov, int flags)
 252{
 253    NBDClientSession *client = nbd_get_client_session(bs);
 254    NBDRequest request = {
 255        .type = NBD_CMD_WRITE,
 256        .from = offset,
 257        .len = bytes,
 258    };
 259    NBDReply reply;
 260    ssize_t ret;
 261
 262    if (flags & BDRV_REQ_FUA) {
 263        assert(client->info.flags & NBD_FLAG_SEND_FUA);
 264        request.flags |= NBD_CMD_FLAG_FUA;
 265    }
 266
 267    assert(bytes <= NBD_MAX_BUFFER_SIZE);
 268
 269    ret = nbd_co_send_request(bs, &request, qiov);
 270    if (ret < 0) {
 271        reply.error = -ret;
 272    } else {
 273        nbd_co_receive_reply(client, &request, &reply, NULL);
 274    }
 275    nbd_coroutine_end(bs, &request);
 276    return -reply.error;
 277}
 278
 279int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
 280                                int bytes, BdrvRequestFlags flags)
 281{
 282    ssize_t ret;
 283    NBDClientSession *client = nbd_get_client_session(bs);
 284    NBDRequest request = {
 285        .type = NBD_CMD_WRITE_ZEROES,
 286        .from = offset,
 287        .len = bytes,
 288    };
 289    NBDReply reply;
 290
 291    if (!(client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
 292        return -ENOTSUP;
 293    }
 294
 295    if (flags & BDRV_REQ_FUA) {
 296        assert(client->info.flags & NBD_FLAG_SEND_FUA);
 297        request.flags |= NBD_CMD_FLAG_FUA;
 298    }
 299    if (!(flags & BDRV_REQ_MAY_UNMAP)) {
 300        request.flags |= NBD_CMD_FLAG_NO_HOLE;
 301    }
 302
 303    ret = nbd_co_send_request(bs, &request, NULL);
 304    if (ret < 0) {
 305        reply.error = -ret;
 306    } else {
 307        nbd_co_receive_reply(client, &request, &reply, NULL);
 308    }
 309    nbd_coroutine_end(bs, &request);
 310    return -reply.error;
 311}
 312
 313int nbd_client_co_flush(BlockDriverState *bs)
 314{
 315    NBDClientSession *client = nbd_get_client_session(bs);
 316    NBDRequest request = { .type = NBD_CMD_FLUSH };
 317    NBDReply reply;
 318    ssize_t ret;
 319
 320    if (!(client->info.flags & NBD_FLAG_SEND_FLUSH)) {
 321        return 0;
 322    }
 323
 324    request.from = 0;
 325    request.len = 0;
 326
 327    ret = nbd_co_send_request(bs, &request, NULL);
 328    if (ret < 0) {
 329        reply.error = -ret;
 330    } else {
 331        nbd_co_receive_reply(client, &request, &reply, NULL);
 332    }
 333    nbd_coroutine_end(bs, &request);
 334    return -reply.error;
 335}
 336
 337int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int bytes)
 338{
 339    NBDClientSession *client = nbd_get_client_session(bs);
 340    NBDRequest request = {
 341        .type = NBD_CMD_TRIM,
 342        .from = offset,
 343        .len = bytes,
 344    };
 345    NBDReply reply;
 346    ssize_t ret;
 347
 348    if (!(client->info.flags & NBD_FLAG_SEND_TRIM)) {
 349        return 0;
 350    }
 351
 352    ret = nbd_co_send_request(bs, &request, NULL);
 353    if (ret < 0) {
 354        reply.error = -ret;
 355    } else {
 356        nbd_co_receive_reply(client, &request, &reply, NULL);
 357    }
 358    nbd_coroutine_end(bs, &request);
 359    return -reply.error;
 360
 361}
 362
 363void nbd_client_detach_aio_context(BlockDriverState *bs)
 364{
 365    NBDClientSession *client = nbd_get_client_session(bs);
 366    qio_channel_detach_aio_context(QIO_CHANNEL(client->ioc));
 367}
 368
 369void nbd_client_attach_aio_context(BlockDriverState *bs,
 370                                   AioContext *new_context)
 371{
 372    NBDClientSession *client = nbd_get_client_session(bs);
 373    qio_channel_attach_aio_context(QIO_CHANNEL(client->ioc), new_context);
 374    aio_co_schedule(new_context, client->read_reply_co);
 375}
 376
 377void nbd_client_close(BlockDriverState *bs)
 378{
 379    NBDClientSession *client = nbd_get_client_session(bs);
 380    NBDRequest request = { .type = NBD_CMD_DISC };
 381
 382    if (client->ioc == NULL) {
 383        return;
 384    }
 385
 386    nbd_send_request(client->ioc, &request);
 387
 388    nbd_teardown_connection(bs);
 389}
 390
 391int nbd_client_init(BlockDriverState *bs,
 392                    QIOChannelSocket *sioc,
 393                    const char *export,
 394                    QCryptoTLSCreds *tlscreds,
 395                    const char *hostname,
 396                    Error **errp)
 397{
 398    NBDClientSession *client = nbd_get_client_session(bs);
 399    int ret;
 400
 401    /* NBD handshake */
 402    logout("session init %s\n", export);
 403    qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
 404
 405    client->info.request_sizes = true;
 406    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), export,
 407                                tlscreds, hostname,
 408                                &client->ioc, &client->info, errp);
 409    if (ret < 0) {
 410        logout("Failed to negotiate with the NBD server\n");
 411        return ret;
 412    }
 413    if (client->info.flags & NBD_FLAG_SEND_FUA) {
 414        bs->supported_write_flags = BDRV_REQ_FUA;
 415        bs->supported_zero_flags |= BDRV_REQ_FUA;
 416    }
 417    if (client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
 418        bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
 419    }
 420    if (client->info.min_block > bs->bl.request_alignment) {
 421        bs->bl.request_alignment = client->info.min_block;
 422    }
 423
 424    qemu_co_mutex_init(&client->send_mutex);
 425    qemu_co_queue_init(&client->free_sema);
 426    client->sioc = sioc;
 427    object_ref(OBJECT(client->sioc));
 428
 429    if (!client->ioc) {
 430        client->ioc = QIO_CHANNEL(sioc);
 431        object_ref(OBJECT(client->ioc));
 432    }
 433
 434    /* Now that we're connected, set the socket to be non-blocking and
 435     * kick the reply mechanism.  */
 436    qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
 437    client->read_reply_co = qemu_coroutine_create(nbd_read_reply_entry, client);
 438    nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
 439
 440    logout("Established connection with NBD server\n");
 441    return 0;
 442}
 443