qemu/block/nbd.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for  NBD
   3 *
   4 * Copyright (c) 2019 Virtuozzo International GmbH.
   5 * Copyright (C) 2016 Red Hat, Inc.
   6 * Copyright (C) 2008 Bull S.A.S.
   7 *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
   8 *
   9 * Some parts:
  10 *    Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
  11 *
  12 * Permission is hereby granted, free of charge, to any person obtaining a copy
  13 * of this software and associated documentation files (the "Software"), to deal
  14 * in the Software without restriction, including without limitation the rights
  15 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  16 * copies of the Software, and to permit persons to whom the Software is
  17 * furnished to do so, subject to the following conditions:
  18 *
  19 * The above copyright notice and this permission notice shall be included in
  20 * all copies or substantial portions of the Software.
  21 *
  22 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  23 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  24 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  25 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  26 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  27 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  28 * THE SOFTWARE.
  29 */
  30
  31#include "qemu/osdep.h"
  32
  33#include "trace.h"
  34#include "qemu/uri.h"
  35#include "qemu/option.h"
  36#include "qemu/cutils.h"
  37#include "qemu/main-loop.h"
  38
  39#include "qapi/qapi-visit-sockets.h"
  40#include "qapi/qmp/qstring.h"
  41
  42#include "block/qdict.h"
  43#include "block/nbd.h"
  44#include "block/block_int.h"
  45
  46#define EN_OPTSTR ":exportname="
  47#define MAX_NBD_REQUESTS    16
  48
  49#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ (uint64_t)(intptr_t)(bs))
  50#define INDEX_TO_HANDLE(bs, index)  ((index)  ^ (uint64_t)(intptr_t)(bs))
  51
  52typedef struct {
  53    Coroutine *coroutine;
  54    uint64_t offset;        /* original offset of the request */
  55    bool receiving;         /* waiting for connection_co? */
  56} NBDClientRequest;
  57
  58typedef enum NBDClientState {
  59    NBD_CLIENT_CONNECTING_WAIT,
  60    NBD_CLIENT_CONNECTING_NOWAIT,
  61    NBD_CLIENT_CONNECTED,
  62    NBD_CLIENT_QUIT
  63} NBDClientState;
  64
  65typedef struct BDRVNBDState {
  66    QIOChannelSocket *sioc; /* The master data channel */
  67    QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
  68    NBDExportInfo info;
  69
  70    CoMutex send_mutex;
  71    CoQueue free_sema;
  72    Coroutine *connection_co;
  73    QemuCoSleepState *connection_co_sleep_ns_state;
  74    bool drained;
  75    bool wait_drained_end;
  76    int in_flight;
  77    NBDClientState state;
  78    int connect_status;
  79    Error *connect_err;
  80    bool wait_in_flight;
  81
  82    NBDClientRequest requests[MAX_NBD_REQUESTS];
  83    NBDReply reply;
  84    BlockDriverState *bs;
  85
  86    /* Connection parameters */
  87    uint32_t reconnect_delay;
  88    SocketAddress *saddr;
  89    char *export, *tlscredsid;
  90    QCryptoTLSCreds *tlscreds;
  91    const char *hostname;
  92    char *x_dirty_bitmap;
  93} BDRVNBDState;
  94
  95static int nbd_client_connect(BlockDriverState *bs, Error **errp);
  96
  97static void nbd_clear_bdrvstate(BDRVNBDState *s)
  98{
  99    object_unref(OBJECT(s->tlscreds));
 100    qapi_free_SocketAddress(s->saddr);
 101    s->saddr = NULL;
 102    g_free(s->export);
 103    s->export = NULL;
 104    g_free(s->tlscredsid);
 105    s->tlscredsid = NULL;
 106    g_free(s->x_dirty_bitmap);
 107    s->x_dirty_bitmap = NULL;
 108}
 109
 110static void nbd_channel_error(BDRVNBDState *s, int ret)
 111{
 112    if (ret == -EIO) {
 113        if (s->state == NBD_CLIENT_CONNECTED) {
 114            s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
 115                                            NBD_CLIENT_CONNECTING_NOWAIT;
 116        }
 117    } else {
 118        if (s->state == NBD_CLIENT_CONNECTED) {
 119            qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
 120        }
 121        s->state = NBD_CLIENT_QUIT;
 122    }
 123}
 124
 125static void nbd_recv_coroutines_wake_all(BDRVNBDState *s)
 126{
 127    int i;
 128
 129    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
 130        NBDClientRequest *req = &s->requests[i];
 131
 132        if (req->coroutine && req->receiving) {
 133            aio_co_wake(req->coroutine);
 134        }
 135    }
 136}
 137
 138static void nbd_client_detach_aio_context(BlockDriverState *bs)
 139{
 140    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 141
 142    qio_channel_detach_aio_context(QIO_CHANNEL(s->ioc));
 143}
 144
 145static void nbd_client_attach_aio_context_bh(void *opaque)
 146{
 147    BlockDriverState *bs = opaque;
 148    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 149
 150    /*
 151     * The node is still drained, so we know the coroutine has yielded in
 152     * nbd_read_eof(), the only place where bs->in_flight can reach 0, or it is
 153     * entered for the first time. Both places are safe for entering the
 154     * coroutine.
 155     */
 156    qemu_aio_coroutine_enter(bs->aio_context, s->connection_co);
 157    bdrv_dec_in_flight(bs);
 158}
 159
 160static void nbd_client_attach_aio_context(BlockDriverState *bs,
 161                                          AioContext *new_context)
 162{
 163    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 164
 165    /*
 166     * s->connection_co is either yielded from nbd_receive_reply or from
 167     * nbd_co_reconnect_loop()
 168     */
 169    if (s->state == NBD_CLIENT_CONNECTED) {
 170        qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
 171    }
 172
 173    bdrv_inc_in_flight(bs);
 174
 175    /*
 176     * Need to wait here for the BH to run because the BH must run while the
 177     * node is still drained.
 178     */
 179    aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs);
 180}
 181
 182static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
 183{
 184    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 185
 186    s->drained = true;
 187    if (s->connection_co_sleep_ns_state) {
 188        qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
 189    }
 190}
 191
 192static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs)
 193{
 194    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 195
 196    s->drained = false;
 197    if (s->wait_drained_end) {
 198        s->wait_drained_end = false;
 199        aio_co_wake(s->connection_co);
 200    }
 201}
 202
 203
 204static void nbd_teardown_connection(BlockDriverState *bs)
 205{
 206    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 207
 208    if (s->state == NBD_CLIENT_CONNECTED) {
 209        /* finish any pending coroutines */
 210        assert(s->ioc);
 211        qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
 212    }
 213    s->state = NBD_CLIENT_QUIT;
 214    if (s->connection_co) {
 215        if (s->connection_co_sleep_ns_state) {
 216            qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
 217        }
 218    }
 219    BDRV_POLL_WHILE(bs, s->connection_co);
 220}
 221
 222static bool nbd_client_connecting(BDRVNBDState *s)
 223{
 224    return s->state == NBD_CLIENT_CONNECTING_WAIT ||
 225        s->state == NBD_CLIENT_CONNECTING_NOWAIT;
 226}
 227
 228static bool nbd_client_connecting_wait(BDRVNBDState *s)
 229{
 230    return s->state == NBD_CLIENT_CONNECTING_WAIT;
 231}
 232
 233static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
 234{
 235    Error *local_err = NULL;
 236
 237    if (!nbd_client_connecting(s)) {
 238        return;
 239    }
 240
 241    /* Wait for completion of all in-flight requests */
 242
 243    qemu_co_mutex_lock(&s->send_mutex);
 244
 245    while (s->in_flight > 0) {
 246        qemu_co_mutex_unlock(&s->send_mutex);
 247        nbd_recv_coroutines_wake_all(s);
 248        s->wait_in_flight = true;
 249        qemu_coroutine_yield();
 250        s->wait_in_flight = false;
 251        qemu_co_mutex_lock(&s->send_mutex);
 252    }
 253
 254    qemu_co_mutex_unlock(&s->send_mutex);
 255
 256    if (!nbd_client_connecting(s)) {
 257        return;
 258    }
 259
 260    /*
 261     * Now we are sure that nobody is accessing the channel, and no one will
 262     * try until we set the state to CONNECTED.
 263     */
 264
 265    /* Finalize previous connection if any */
 266    if (s->ioc) {
 267        nbd_client_detach_aio_context(s->bs);
 268        object_unref(OBJECT(s->sioc));
 269        s->sioc = NULL;
 270        object_unref(OBJECT(s->ioc));
 271        s->ioc = NULL;
 272    }
 273
 274    s->connect_status = nbd_client_connect(s->bs, &local_err);
 275    error_free(s->connect_err);
 276    s->connect_err = NULL;
 277    error_propagate(&s->connect_err, local_err);
 278
 279    if (s->connect_status < 0) {
 280        /* failed attempt */
 281        return;
 282    }
 283
 284    /* successfully connected */
 285    s->state = NBD_CLIENT_CONNECTED;
 286    qemu_co_queue_restart_all(&s->free_sema);
 287}
 288
 289static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s)
 290{
 291    uint64_t start_time_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
 292    uint64_t delay_ns = s->reconnect_delay * NANOSECONDS_PER_SECOND;
 293    uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
 294    uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
 295
 296    nbd_reconnect_attempt(s);
 297
 298    while (nbd_client_connecting(s)) {
 299        if (s->state == NBD_CLIENT_CONNECTING_WAIT &&
 300            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time_ns > delay_ns)
 301        {
 302            s->state = NBD_CLIENT_CONNECTING_NOWAIT;
 303            qemu_co_queue_restart_all(&s->free_sema);
 304        }
 305
 306        qemu_co_sleep_ns_wakeable(QEMU_CLOCK_REALTIME, timeout,
 307                                  &s->connection_co_sleep_ns_state);
 308        if (s->drained) {
 309            bdrv_dec_in_flight(s->bs);
 310            s->wait_drained_end = true;
 311            while (s->drained) {
 312                /*
 313                 * We may be entered once from nbd_client_attach_aio_context_bh
 314                 * and then from nbd_client_co_drain_end. So here is a loop.
 315                 */
 316                qemu_coroutine_yield();
 317            }
 318            bdrv_inc_in_flight(s->bs);
 319        }
 320        if (timeout < max_timeout) {
 321            timeout *= 2;
 322        }
 323
 324        nbd_reconnect_attempt(s);
 325    }
 326}
 327
 328static coroutine_fn void nbd_connection_entry(void *opaque)
 329{
 330    BDRVNBDState *s = opaque;
 331    uint64_t i;
 332    int ret = 0;
 333    Error *local_err = NULL;
 334
 335    while (s->state != NBD_CLIENT_QUIT) {
 336        /*
 337         * The NBD client can only really be considered idle when it has
 338         * yielded from qio_channel_readv_all_eof(), waiting for data. This is
 339         * the point where the additional scheduled coroutine entry happens
 340         * after nbd_client_attach_aio_context().
 341         *
 342         * Therefore we keep an additional in_flight reference all the time and
 343         * only drop it temporarily here.
 344         */
 345
 346        if (nbd_client_connecting(s)) {
 347            nbd_co_reconnect_loop(s);
 348        }
 349
 350        if (s->state != NBD_CLIENT_CONNECTED) {
 351            continue;
 352        }
 353
 354        assert(s->reply.handle == 0);
 355        ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
 356
 357        if (local_err) {
 358            trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
 359            error_free(local_err);
 360            local_err = NULL;
 361        }
 362        if (ret <= 0) {
 363            nbd_channel_error(s, ret ? ret : -EIO);
 364            continue;
 365        }
 366
 367        /*
 368         * There's no need for a mutex on the receive side, because the
 369         * handler acts as a synchronization point and ensures that only
 370         * one coroutine is called until the reply finishes.
 371         */
 372        i = HANDLE_TO_INDEX(s, s->reply.handle);
 373        if (i >= MAX_NBD_REQUESTS ||
 374            !s->requests[i].coroutine ||
 375            !s->requests[i].receiving ||
 376            (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
 377        {
 378            nbd_channel_error(s, -EINVAL);
 379            continue;
 380        }
 381
 382        /*
 383         * We're woken up again by the request itself.  Note that there
 384         * is no race between yielding and reentering connection_co.  This
 385         * is because:
 386         *
 387         * - if the request runs on the same AioContext, it is only
 388         *   entered after we yield
 389         *
 390         * - if the request runs on a different AioContext, reentering
 391         *   connection_co happens through a bottom half, which can only
 392         *   run after we yield.
 393         */
 394        aio_co_wake(s->requests[i].coroutine);
 395        qemu_coroutine_yield();
 396    }
 397
 398    qemu_co_queue_restart_all(&s->free_sema);
 399    nbd_recv_coroutines_wake_all(s);
 400    bdrv_dec_in_flight(s->bs);
 401
 402    s->connection_co = NULL;
 403    if (s->ioc) {
 404        nbd_client_detach_aio_context(s->bs);
 405        object_unref(OBJECT(s->sioc));
 406        s->sioc = NULL;
 407        object_unref(OBJECT(s->ioc));
 408        s->ioc = NULL;
 409    }
 410
 411    aio_wait_kick();
 412}
 413
 414static int nbd_co_send_request(BlockDriverState *bs,
 415                               NBDRequest *request,
 416                               QEMUIOVector *qiov)
 417{
 418    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 419    int rc, i = -1;
 420
 421    qemu_co_mutex_lock(&s->send_mutex);
 422    while (s->in_flight == MAX_NBD_REQUESTS || nbd_client_connecting_wait(s)) {
 423        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
 424    }
 425
 426    if (s->state != NBD_CLIENT_CONNECTED) {
 427        rc = -EIO;
 428        goto err;
 429    }
 430
 431    s->in_flight++;
 432
 433    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
 434        if (s->requests[i].coroutine == NULL) {
 435            break;
 436        }
 437    }
 438
 439    g_assert(qemu_in_coroutine());
 440    assert(i < MAX_NBD_REQUESTS);
 441
 442    s->requests[i].coroutine = qemu_coroutine_self();
 443    s->requests[i].offset = request->from;
 444    s->requests[i].receiving = false;
 445
 446    request->handle = INDEX_TO_HANDLE(s, i);
 447
 448    assert(s->ioc);
 449
 450    if (qiov) {
 451        qio_channel_set_cork(s->ioc, true);
 452        rc = nbd_send_request(s->ioc, request);
 453        if (rc >= 0 && s->state == NBD_CLIENT_CONNECTED) {
 454            if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
 455                                       NULL) < 0) {
 456                rc = -EIO;
 457            }
 458        } else if (rc >= 0) {
 459            rc = -EIO;
 460        }
 461        qio_channel_set_cork(s->ioc, false);
 462    } else {
 463        rc = nbd_send_request(s->ioc, request);
 464    }
 465
 466err:
 467    if (rc < 0) {
 468        nbd_channel_error(s, rc);
 469        if (i != -1) {
 470            s->requests[i].coroutine = NULL;
 471            s->in_flight--;
 472        }
 473        if (s->in_flight == 0 && s->wait_in_flight) {
 474            aio_co_wake(s->connection_co);
 475        } else {
 476            qemu_co_queue_next(&s->free_sema);
 477        }
 478    }
 479    qemu_co_mutex_unlock(&s->send_mutex);
 480    return rc;
 481}
 482
 483static inline uint16_t payload_advance16(uint8_t **payload)
 484{
 485    *payload += 2;
 486    return lduw_be_p(*payload - 2);
 487}
 488
 489static inline uint32_t payload_advance32(uint8_t **payload)
 490{
 491    *payload += 4;
 492    return ldl_be_p(*payload - 4);
 493}
 494
 495static inline uint64_t payload_advance64(uint8_t **payload)
 496{
 497    *payload += 8;
 498    return ldq_be_p(*payload - 8);
 499}
 500
 501static int nbd_parse_offset_hole_payload(BDRVNBDState *s,
 502                                         NBDStructuredReplyChunk *chunk,
 503                                         uint8_t *payload, uint64_t orig_offset,
 504                                         QEMUIOVector *qiov, Error **errp)
 505{
 506    uint64_t offset;
 507    uint32_t hole_size;
 508
 509    if (chunk->length != sizeof(offset) + sizeof(hole_size)) {
 510        error_setg(errp, "Protocol error: invalid payload for "
 511                         "NBD_REPLY_TYPE_OFFSET_HOLE");
 512        return -EINVAL;
 513    }
 514
 515    offset = payload_advance64(&payload);
 516    hole_size = payload_advance32(&payload);
 517
 518    if (!hole_size || offset < orig_offset || hole_size > qiov->size ||
 519        offset > orig_offset + qiov->size - hole_size) {
 520        error_setg(errp, "Protocol error: server sent chunk exceeding requested"
 521                         " region");
 522        return -EINVAL;
 523    }
 524    if (s->info.min_block &&
 525        !QEMU_IS_ALIGNED(hole_size, s->info.min_block)) {
 526        trace_nbd_structured_read_compliance("hole");
 527    }
 528
 529    qemu_iovec_memset(qiov, offset - orig_offset, 0, hole_size);
 530
 531    return 0;
 532}
 533
 534/*
 535 * nbd_parse_blockstatus_payload
 536 * Based on our request, we expect only one extent in reply, for the
 537 * base:allocation context.
 538 */
 539static int nbd_parse_blockstatus_payload(BDRVNBDState *s,
 540                                         NBDStructuredReplyChunk *chunk,
 541                                         uint8_t *payload, uint64_t orig_length,
 542                                         NBDExtent *extent, Error **errp)
 543{
 544    uint32_t context_id;
 545
 546    /* The server succeeded, so it must have sent [at least] one extent */
 547    if (chunk->length < sizeof(context_id) + sizeof(*extent)) {
 548        error_setg(errp, "Protocol error: invalid payload for "
 549                         "NBD_REPLY_TYPE_BLOCK_STATUS");
 550        return -EINVAL;
 551    }
 552
 553    context_id = payload_advance32(&payload);
 554    if (s->info.context_id != context_id) {
 555        error_setg(errp, "Protocol error: unexpected context id %d for "
 556                         "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
 557                         "id is %d", context_id,
 558                         s->info.context_id);
 559        return -EINVAL;
 560    }
 561
 562    extent->length = payload_advance32(&payload);
 563    extent->flags = payload_advance32(&payload);
 564
 565    if (extent->length == 0) {
 566        error_setg(errp, "Protocol error: server sent status chunk with "
 567                   "zero length");
 568        return -EINVAL;
 569    }
 570
 571    /*
 572     * A server sending unaligned block status is in violation of the
 573     * protocol, but as qemu-nbd 3.1 is such a server (at least for
 574     * POSIX files that are not a multiple of 512 bytes, since qemu
 575     * rounds files up to 512-byte multiples but lseek(SEEK_HOLE)
 576     * still sees an implicit hole beyond the real EOF), it's nicer to
 577     * work around the misbehaving server. If the request included
 578     * more than the final unaligned block, truncate it back to an
 579     * aligned result; if the request was only the final block, round
 580     * up to the full block and change the status to fully-allocated
 581     * (always a safe status, even if it loses information).
 582     */
 583    if (s->info.min_block && !QEMU_IS_ALIGNED(extent->length,
 584                                                   s->info.min_block)) {
 585        trace_nbd_parse_blockstatus_compliance("extent length is unaligned");
 586        if (extent->length > s->info.min_block) {
 587            extent->length = QEMU_ALIGN_DOWN(extent->length,
 588                                             s->info.min_block);
 589        } else {
 590            extent->length = s->info.min_block;
 591            extent->flags = 0;
 592        }
 593    }
 594
 595    /*
 596     * We used NBD_CMD_FLAG_REQ_ONE, so the server should not have
 597     * sent us any more than one extent, nor should it have included
 598     * status beyond our request in that extent. However, it's easy
 599     * enough to ignore the server's noncompliance without killing the
 600     * connection; just ignore trailing extents, and clamp things to
 601     * the length of our request.
 602     */
 603    if (chunk->length > sizeof(context_id) + sizeof(*extent)) {
 604        trace_nbd_parse_blockstatus_compliance("more than one extent");
 605    }
 606    if (extent->length > orig_length) {
 607        extent->length = orig_length;
 608        trace_nbd_parse_blockstatus_compliance("extent length too large");
 609    }
 610
 611    return 0;
 612}
 613
 614/*
 615 * nbd_parse_error_payload
 616 * on success @errp contains message describing nbd error reply
 617 */
 618static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
 619                                   uint8_t *payload, int *request_ret,
 620                                   Error **errp)
 621{
 622    uint32_t error;
 623    uint16_t message_size;
 624
 625    assert(chunk->type & (1 << 15));
 626
 627    if (chunk->length < sizeof(error) + sizeof(message_size)) {
 628        error_setg(errp,
 629                   "Protocol error: invalid payload for structured error");
 630        return -EINVAL;
 631    }
 632
 633    error = nbd_errno_to_system_errno(payload_advance32(&payload));
 634    if (error == 0) {
 635        error_setg(errp, "Protocol error: server sent structured error chunk "
 636                         "with error = 0");
 637        return -EINVAL;
 638    }
 639
 640    *request_ret = -error;
 641    message_size = payload_advance16(&payload);
 642
 643    if (message_size > chunk->length - sizeof(error) - sizeof(message_size)) {
 644        error_setg(errp, "Protocol error: server sent structured error chunk "
 645                         "with incorrect message size");
 646        return -EINVAL;
 647    }
 648
 649    /* TODO: Add a trace point to mention the server complaint */
 650
 651    /* TODO handle ERROR_OFFSET */
 652
 653    return 0;
 654}
 655
 656static int nbd_co_receive_offset_data_payload(BDRVNBDState *s,
 657                                              uint64_t orig_offset,
 658                                              QEMUIOVector *qiov, Error **errp)
 659{
 660    QEMUIOVector sub_qiov;
 661    uint64_t offset;
 662    size_t data_size;
 663    int ret;
 664    NBDStructuredReplyChunk *chunk = &s->reply.structured;
 665
 666    assert(nbd_reply_is_structured(&s->reply));
 667
 668    /* The NBD spec requires at least one byte of payload */
 669    if (chunk->length <= sizeof(offset)) {
 670        error_setg(errp, "Protocol error: invalid payload for "
 671                         "NBD_REPLY_TYPE_OFFSET_DATA");
 672        return -EINVAL;
 673    }
 674
 675    if (nbd_read64(s->ioc, &offset, "OFFSET_DATA offset", errp) < 0) {
 676        return -EIO;
 677    }
 678
 679    data_size = chunk->length - sizeof(offset);
 680    assert(data_size);
 681    if (offset < orig_offset || data_size > qiov->size ||
 682        offset > orig_offset + qiov->size - data_size) {
 683        error_setg(errp, "Protocol error: server sent chunk exceeding requested"
 684                         " region");
 685        return -EINVAL;
 686    }
 687    if (s->info.min_block && !QEMU_IS_ALIGNED(data_size, s->info.min_block)) {
 688        trace_nbd_structured_read_compliance("data");
 689    }
 690
 691    qemu_iovec_init(&sub_qiov, qiov->niov);
 692    qemu_iovec_concat(&sub_qiov, qiov, offset - orig_offset, data_size);
 693    ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, errp);
 694    qemu_iovec_destroy(&sub_qiov);
 695
 696    return ret < 0 ? -EIO : 0;
 697}
 698
 699#define NBD_MAX_MALLOC_PAYLOAD 1000
 700static coroutine_fn int nbd_co_receive_structured_payload(
 701        BDRVNBDState *s, void **payload, Error **errp)
 702{
 703    int ret;
 704    uint32_t len;
 705
 706    assert(nbd_reply_is_structured(&s->reply));
 707
 708    len = s->reply.structured.length;
 709
 710    if (len == 0) {
 711        return 0;
 712    }
 713
 714    if (payload == NULL) {
 715        error_setg(errp, "Unexpected structured payload");
 716        return -EINVAL;
 717    }
 718
 719    if (len > NBD_MAX_MALLOC_PAYLOAD) {
 720        error_setg(errp, "Payload too large");
 721        return -EINVAL;
 722    }
 723
 724    *payload = g_new(char, len);
 725    ret = nbd_read(s->ioc, *payload, len, "structured payload", errp);
 726    if (ret < 0) {
 727        g_free(*payload);
 728        *payload = NULL;
 729        return ret;
 730    }
 731
 732    return 0;
 733}
 734
 735/*
 736 * nbd_co_do_receive_one_chunk
 737 * for simple reply:
 738 *   set request_ret to received reply error
 739 *   if qiov is not NULL: read payload to @qiov
 740 * for structured reply chunk:
 741 *   if error chunk: read payload, set @request_ret, do not set @payload
 742 *   else if offset_data chunk: read payload data to @qiov, do not set @payload
 743 *   else: read payload to @payload
 744 *
 745 * If function fails, @errp contains corresponding error message, and the
 746 * connection with the server is suspect.  If it returns 0, then the
 747 * transaction succeeded (although @request_ret may be a negative errno
 748 * corresponding to the server's error reply), and errp is unchanged.
 749 */
 750static coroutine_fn int nbd_co_do_receive_one_chunk(
 751        BDRVNBDState *s, uint64_t handle, bool only_structured,
 752        int *request_ret, QEMUIOVector *qiov, void **payload, Error **errp)
 753{
 754    int ret;
 755    int i = HANDLE_TO_INDEX(s, handle);
 756    void *local_payload = NULL;
 757    NBDStructuredReplyChunk *chunk;
 758
 759    if (payload) {
 760        *payload = NULL;
 761    }
 762    *request_ret = 0;
 763
 764    /* Wait until we're woken up by nbd_connection_entry.  */
 765    s->requests[i].receiving = true;
 766    qemu_coroutine_yield();
 767    s->requests[i].receiving = false;
 768    if (s->state != NBD_CLIENT_CONNECTED) {
 769        error_setg(errp, "Connection closed");
 770        return -EIO;
 771    }
 772    assert(s->ioc);
 773
 774    assert(s->reply.handle == handle);
 775
 776    if (nbd_reply_is_simple(&s->reply)) {
 777        if (only_structured) {
 778            error_setg(errp, "Protocol error: simple reply when structured "
 779                             "reply chunk was expected");
 780            return -EINVAL;
 781        }
 782
 783        *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error);
 784        if (*request_ret < 0 || !qiov) {
 785            return 0;
 786        }
 787
 788        return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
 789                                     errp) < 0 ? -EIO : 0;
 790    }
 791
 792    /* handle structured reply chunk */
 793    assert(s->info.structured_reply);
 794    chunk = &s->reply.structured;
 795
 796    if (chunk->type == NBD_REPLY_TYPE_NONE) {
 797        if (!(chunk->flags & NBD_REPLY_FLAG_DONE)) {
 798            error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
 799                       " NBD_REPLY_FLAG_DONE flag set");
 800            return -EINVAL;
 801        }
 802        if (chunk->length) {
 803            error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
 804                       " nonzero length");
 805            return -EINVAL;
 806        }
 807        return 0;
 808    }
 809
 810    if (chunk->type == NBD_REPLY_TYPE_OFFSET_DATA) {
 811        if (!qiov) {
 812            error_setg(errp, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
 813            return -EINVAL;
 814        }
 815
 816        return nbd_co_receive_offset_data_payload(s, s->requests[i].offset,
 817                                                  qiov, errp);
 818    }
 819
 820    if (nbd_reply_type_is_error(chunk->type)) {
 821        payload = &local_payload;
 822    }
 823
 824    ret = nbd_co_receive_structured_payload(s, payload, errp);
 825    if (ret < 0) {
 826        return ret;
 827    }
 828
 829    if (nbd_reply_type_is_error(chunk->type)) {
 830        ret = nbd_parse_error_payload(chunk, local_payload, request_ret, errp);
 831        g_free(local_payload);
 832        return ret;
 833    }
 834
 835    return 0;
 836}
 837
 838/*
 839 * nbd_co_receive_one_chunk
 840 * Read reply, wake up connection_co and set s->quit if needed.
 841 * Return value is a fatal error code or normal nbd reply error code
 842 */
 843static coroutine_fn int nbd_co_receive_one_chunk(
 844        BDRVNBDState *s, uint64_t handle, bool only_structured,
 845        int *request_ret, QEMUIOVector *qiov, NBDReply *reply, void **payload,
 846        Error **errp)
 847{
 848    int ret = nbd_co_do_receive_one_chunk(s, handle, only_structured,
 849                                          request_ret, qiov, payload, errp);
 850
 851    if (ret < 0) {
 852        memset(reply, 0, sizeof(*reply));
 853        nbd_channel_error(s, ret);
 854    } else {
 855        /* For assert at loop start in nbd_connection_entry */
 856        *reply = s->reply;
 857    }
 858    s->reply.handle = 0;
 859
 860    if (s->connection_co && !s->wait_in_flight) {
 861        /*
 862         * We must check s->wait_in_flight, because we may entered by
 863         * nbd_recv_coroutines_wake_all(), in this case we should not
 864         * wake connection_co here, it will woken by last request.
 865         */
 866        aio_co_wake(s->connection_co);
 867    }
 868
 869    return ret;
 870}
 871
 872typedef struct NBDReplyChunkIter {
 873    int ret;
 874    int request_ret;
 875    Error *err;
 876    bool done, only_structured;
 877} NBDReplyChunkIter;
 878
 879static void nbd_iter_channel_error(NBDReplyChunkIter *iter,
 880                                   int ret, Error **local_err)
 881{
 882    assert(ret < 0);
 883
 884    if (!iter->ret) {
 885        iter->ret = ret;
 886        error_propagate(&iter->err, *local_err);
 887    } else {
 888        error_free(*local_err);
 889    }
 890
 891    *local_err = NULL;
 892}
 893
 894static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret)
 895{
 896    assert(ret < 0);
 897
 898    if (!iter->request_ret) {
 899        iter->request_ret = ret;
 900    }
 901}
 902
 903/*
 904 * NBD_FOREACH_REPLY_CHUNK
 905 * The pointer stored in @payload requires g_free() to free it.
 906 */
 907#define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \
 908                                qiov, reply, payload) \
 909    for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
 910         nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);)
 911
 912/*
 913 * nbd_reply_chunk_iter_receive
 914 * The pointer stored in @payload requires g_free() to free it.
 915 */
 916static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
 917                                         NBDReplyChunkIter *iter,
 918                                         uint64_t handle,
 919                                         QEMUIOVector *qiov, NBDReply *reply,
 920                                         void **payload)
 921{
 922    int ret, request_ret;
 923    NBDReply local_reply;
 924    NBDStructuredReplyChunk *chunk;
 925    Error *local_err = NULL;
 926    if (s->state != NBD_CLIENT_CONNECTED) {
 927        error_setg(&local_err, "Connection closed");
 928        nbd_iter_channel_error(iter, -EIO, &local_err);
 929        goto break_loop;
 930    }
 931
 932    if (iter->done) {
 933        /* Previous iteration was last. */
 934        goto break_loop;
 935    }
 936
 937    if (reply == NULL) {
 938        reply = &local_reply;
 939    }
 940
 941    ret = nbd_co_receive_one_chunk(s, handle, iter->only_structured,
 942                                   &request_ret, qiov, reply, payload,
 943                                   &local_err);
 944    if (ret < 0) {
 945        nbd_iter_channel_error(iter, ret, &local_err);
 946    } else if (request_ret < 0) {
 947        nbd_iter_request_error(iter, request_ret);
 948    }
 949
 950    /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
 951    if (nbd_reply_is_simple(reply) || s->state != NBD_CLIENT_CONNECTED) {
 952        goto break_loop;
 953    }
 954
 955    chunk = &reply->structured;
 956    iter->only_structured = true;
 957
 958    if (chunk->type == NBD_REPLY_TYPE_NONE) {
 959        /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
 960        assert(chunk->flags & NBD_REPLY_FLAG_DONE);
 961        goto break_loop;
 962    }
 963
 964    if (chunk->flags & NBD_REPLY_FLAG_DONE) {
 965        /* This iteration is last. */
 966        iter->done = true;
 967    }
 968
 969    /* Execute the loop body */
 970    return true;
 971
 972break_loop:
 973    s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
 974
 975    qemu_co_mutex_lock(&s->send_mutex);
 976    s->in_flight--;
 977    if (s->in_flight == 0 && s->wait_in_flight) {
 978        aio_co_wake(s->connection_co);
 979    } else {
 980        qemu_co_queue_next(&s->free_sema);
 981    }
 982    qemu_co_mutex_unlock(&s->send_mutex);
 983
 984    return false;
 985}
 986
 987static int nbd_co_receive_return_code(BDRVNBDState *s, uint64_t handle,
 988                                      int *request_ret, Error **errp)
 989{
 990    NBDReplyChunkIter iter;
 991
 992    NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, NULL, NULL) {
 993        /* nbd_reply_chunk_iter_receive does all the work */
 994    }
 995
 996    error_propagate(errp, iter.err);
 997    *request_ret = iter.request_ret;
 998    return iter.ret;
 999}
1000
1001static int nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t handle,
1002                                        uint64_t offset, QEMUIOVector *qiov,
1003                                        int *request_ret, Error **errp)
1004{
1005    NBDReplyChunkIter iter;
1006    NBDReply reply;
1007    void *payload = NULL;
1008    Error *local_err = NULL;
1009
1010    NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
1011                            qiov, &reply, &payload)
1012    {
1013        int ret;
1014        NBDStructuredReplyChunk *chunk = &reply.structured;
1015
1016        assert(nbd_reply_is_structured(&reply));
1017
1018        switch (chunk->type) {
1019        case NBD_REPLY_TYPE_OFFSET_DATA:
1020            /*
1021             * special cased in nbd_co_receive_one_chunk, data is already
1022             * in qiov
1023             */
1024            break;
1025        case NBD_REPLY_TYPE_OFFSET_HOLE:
1026            ret = nbd_parse_offset_hole_payload(s, &reply.structured, payload,
1027                                                offset, qiov, &local_err);
1028            if (ret < 0) {
1029                nbd_channel_error(s, ret);
1030                nbd_iter_channel_error(&iter, ret, &local_err);
1031            }
1032            break;
1033        default:
1034            if (!nbd_reply_type_is_error(chunk->type)) {
1035                /* not allowed reply type */
1036                nbd_channel_error(s, -EINVAL);
1037                error_setg(&local_err,
1038                           "Unexpected reply type: %d (%s) for CMD_READ",
1039                           chunk->type, nbd_reply_type_lookup(chunk->type));
1040                nbd_iter_channel_error(&iter, -EINVAL, &local_err);
1041            }
1042        }
1043
1044        g_free(payload);
1045        payload = NULL;
1046    }
1047
1048    error_propagate(errp, iter.err);
1049    *request_ret = iter.request_ret;
1050    return iter.ret;
1051}
1052
1053static int nbd_co_receive_blockstatus_reply(BDRVNBDState *s,
1054                                            uint64_t handle, uint64_t length,
1055                                            NBDExtent *extent,
1056                                            int *request_ret, Error **errp)
1057{
1058    NBDReplyChunkIter iter;
1059    NBDReply reply;
1060    void *payload = NULL;
1061    Error *local_err = NULL;
1062    bool received = false;
1063
1064    assert(!extent->length);
1065    NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, &reply, &payload) {
1066        int ret;
1067        NBDStructuredReplyChunk *chunk = &reply.structured;
1068
1069        assert(nbd_reply_is_structured(&reply));
1070
1071        switch (chunk->type) {
1072        case NBD_REPLY_TYPE_BLOCK_STATUS:
1073            if (received) {
1074                nbd_channel_error(s, -EINVAL);
1075                error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
1076                nbd_iter_channel_error(&iter, -EINVAL, &local_err);
1077            }
1078            received = true;
1079
1080            ret = nbd_parse_blockstatus_payload(s, &reply.structured,
1081                                                payload, length, extent,
1082                                                &local_err);
1083            if (ret < 0) {
1084                nbd_channel_error(s, ret);
1085                nbd_iter_channel_error(&iter, ret, &local_err);
1086            }
1087            break;
1088        default:
1089            if (!nbd_reply_type_is_error(chunk->type)) {
1090                nbd_channel_error(s, -EINVAL);
1091                error_setg(&local_err,
1092                           "Unexpected reply type: %d (%s) "
1093                           "for CMD_BLOCK_STATUS",
1094                           chunk->type, nbd_reply_type_lookup(chunk->type));
1095                nbd_iter_channel_error(&iter, -EINVAL, &local_err);
1096            }
1097        }
1098
1099        g_free(payload);
1100        payload = NULL;
1101    }
1102
1103    if (!extent->length && !iter.request_ret) {
1104        error_setg(&local_err, "Server did not reply with any status extents");
1105        nbd_iter_channel_error(&iter, -EIO, &local_err);
1106    }
1107
1108    error_propagate(errp, iter.err);
1109    *request_ret = iter.request_ret;
1110    return iter.ret;
1111}
1112
1113static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
1114                          QEMUIOVector *write_qiov)
1115{
1116    int ret, request_ret;
1117    Error *local_err = NULL;
1118    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1119
1120    assert(request->type != NBD_CMD_READ);
1121    if (write_qiov) {
1122        assert(request->type == NBD_CMD_WRITE);
1123        assert(request->len == iov_size(write_qiov->iov, write_qiov->niov));
1124    } else {
1125        assert(request->type != NBD_CMD_WRITE);
1126    }
1127
1128    do {
1129        ret = nbd_co_send_request(bs, request, write_qiov);
1130        if (ret < 0) {
1131            continue;
1132        }
1133
1134        ret = nbd_co_receive_return_code(s, request->handle,
1135                                         &request_ret, &local_err);
1136        if (local_err) {
1137            trace_nbd_co_request_fail(request->from, request->len,
1138                                      request->handle, request->flags,
1139                                      request->type,
1140                                      nbd_cmd_lookup(request->type),
1141                                      ret, error_get_pretty(local_err));
1142            error_free(local_err);
1143            local_err = NULL;
1144        }
1145    } while (ret < 0 && nbd_client_connecting_wait(s));
1146
1147    return ret ? ret : request_ret;
1148}
1149
1150static int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
1151                                uint64_t bytes, QEMUIOVector *qiov, int flags)
1152{
1153    int ret, request_ret;
1154    Error *local_err = NULL;
1155    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1156    NBDRequest request = {
1157        .type = NBD_CMD_READ,
1158        .from = offset,
1159        .len = bytes,
1160    };
1161
1162    assert(bytes <= NBD_MAX_BUFFER_SIZE);
1163    assert(!flags);
1164
1165    if (!bytes) {
1166        return 0;
1167    }
1168    /*
1169     * Work around the fact that the block layer doesn't do
1170     * byte-accurate sizing yet - if the read exceeds the server's
1171     * advertised size because the block layer rounded size up, then
1172     * truncate the request to the server and tail-pad with zero.
1173     */
1174    if (offset >= s->info.size) {
1175        assert(bytes < BDRV_SECTOR_SIZE);
1176        qemu_iovec_memset(qiov, 0, 0, bytes);
1177        return 0;
1178    }
1179    if (offset + bytes > s->info.size) {
1180        uint64_t slop = offset + bytes - s->info.size;
1181
1182        assert(slop < BDRV_SECTOR_SIZE);
1183        qemu_iovec_memset(qiov, bytes - slop, 0, slop);
1184        request.len -= slop;
1185    }
1186
1187    do {
1188        ret = nbd_co_send_request(bs, &request, NULL);
1189        if (ret < 0) {
1190            continue;
1191        }
1192
1193        ret = nbd_co_receive_cmdread_reply(s, request.handle, offset, qiov,
1194                                           &request_ret, &local_err);
1195        if (local_err) {
1196            trace_nbd_co_request_fail(request.from, request.len, request.handle,
1197                                      request.flags, request.type,
1198                                      nbd_cmd_lookup(request.type),
1199                                      ret, error_get_pretty(local_err));
1200            error_free(local_err);
1201            local_err = NULL;
1202        }
1203    } while (ret < 0 && nbd_client_connecting_wait(s));
1204
1205    return ret ? ret : request_ret;
1206}
1207
1208static int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
1209                                 uint64_t bytes, QEMUIOVector *qiov, int flags)
1210{
1211    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1212    NBDRequest request = {
1213        .type = NBD_CMD_WRITE,
1214        .from = offset,
1215        .len = bytes,
1216    };
1217
1218    assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
1219    if (flags & BDRV_REQ_FUA) {
1220        assert(s->info.flags & NBD_FLAG_SEND_FUA);
1221        request.flags |= NBD_CMD_FLAG_FUA;
1222    }
1223
1224    assert(bytes <= NBD_MAX_BUFFER_SIZE);
1225
1226    if (!bytes) {
1227        return 0;
1228    }
1229    return nbd_co_request(bs, &request, qiov);
1230}
1231
1232static int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
1233                                       int bytes, BdrvRequestFlags flags)
1234{
1235    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1236    NBDRequest request = {
1237        .type = NBD_CMD_WRITE_ZEROES,
1238        .from = offset,
1239        .len = bytes,
1240    };
1241
1242    assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
1243    if (!(s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
1244        return -ENOTSUP;
1245    }
1246
1247    if (flags & BDRV_REQ_FUA) {
1248        assert(s->info.flags & NBD_FLAG_SEND_FUA);
1249        request.flags |= NBD_CMD_FLAG_FUA;
1250    }
1251    if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1252        request.flags |= NBD_CMD_FLAG_NO_HOLE;
1253    }
1254    if (flags & BDRV_REQ_NO_FALLBACK) {
1255        assert(s->info.flags & NBD_FLAG_SEND_FAST_ZERO);
1256        request.flags |= NBD_CMD_FLAG_FAST_ZERO;
1257    }
1258
1259    if (!bytes) {
1260        return 0;
1261    }
1262    return nbd_co_request(bs, &request, NULL);
1263}
1264
1265static int nbd_client_co_flush(BlockDriverState *bs)
1266{
1267    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1268    NBDRequest request = { .type = NBD_CMD_FLUSH };
1269
1270    if (!(s->info.flags & NBD_FLAG_SEND_FLUSH)) {
1271        return 0;
1272    }
1273
1274    request.from = 0;
1275    request.len = 0;
1276
1277    return nbd_co_request(bs, &request, NULL);
1278}
1279
1280static int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset,
1281                                  int bytes)
1282{
1283    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1284    NBDRequest request = {
1285        .type = NBD_CMD_TRIM,
1286        .from = offset,
1287        .len = bytes,
1288    };
1289
1290    assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
1291    if (!(s->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
1292        return 0;
1293    }
1294
1295    return nbd_co_request(bs, &request, NULL);
1296}
1297
1298static int coroutine_fn nbd_client_co_block_status(
1299        BlockDriverState *bs, bool want_zero, int64_t offset, int64_t bytes,
1300        int64_t *pnum, int64_t *map, BlockDriverState **file)
1301{
1302    int ret, request_ret;
1303    NBDExtent extent = { 0 };
1304    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1305    Error *local_err = NULL;
1306
1307    NBDRequest request = {
1308        .type = NBD_CMD_BLOCK_STATUS,
1309        .from = offset,
1310        .len = MIN(MIN_NON_ZERO(QEMU_ALIGN_DOWN(INT_MAX,
1311                                                bs->bl.request_alignment),
1312                                s->info.max_block),
1313                   MIN(bytes, s->info.size - offset)),
1314        .flags = NBD_CMD_FLAG_REQ_ONE,
1315    };
1316
1317    if (!s->info.base_allocation) {
1318        *pnum = bytes;
1319        *map = offset;
1320        *file = bs;
1321        return BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1322    }
1323
1324    /*
1325     * Work around the fact that the block layer doesn't do
1326     * byte-accurate sizing yet - if the status request exceeds the
1327     * server's advertised size because the block layer rounded size
1328     * up, we truncated the request to the server (above), or are
1329     * called on just the hole.
1330     */
1331    if (offset >= s->info.size) {
1332        *pnum = bytes;
1333        assert(bytes < BDRV_SECTOR_SIZE);
1334        /* Intentionally don't report offset_valid for the hole */
1335        return BDRV_BLOCK_ZERO;
1336    }
1337
1338    if (s->info.min_block) {
1339        assert(QEMU_IS_ALIGNED(request.len, s->info.min_block));
1340    }
1341    do {
1342        ret = nbd_co_send_request(bs, &request, NULL);
1343        if (ret < 0) {
1344            continue;
1345        }
1346
1347        ret = nbd_co_receive_blockstatus_reply(s, request.handle, bytes,
1348                                               &extent, &request_ret,
1349                                               &local_err);
1350        if (local_err) {
1351            trace_nbd_co_request_fail(request.from, request.len, request.handle,
1352                                      request.flags, request.type,
1353                                      nbd_cmd_lookup(request.type),
1354                                      ret, error_get_pretty(local_err));
1355            error_free(local_err);
1356            local_err = NULL;
1357        }
1358    } while (ret < 0 && nbd_client_connecting_wait(s));
1359
1360    if (ret < 0 || request_ret < 0) {
1361        return ret ? ret : request_ret;
1362    }
1363
1364    assert(extent.length);
1365    *pnum = extent.length;
1366    *map = offset;
1367    *file = bs;
1368    return (extent.flags & NBD_STATE_HOLE ? 0 : BDRV_BLOCK_DATA) |
1369        (extent.flags & NBD_STATE_ZERO ? BDRV_BLOCK_ZERO : 0) |
1370        BDRV_BLOCK_OFFSET_VALID;
1371}
1372
1373static int nbd_client_reopen_prepare(BDRVReopenState *state,
1374                                     BlockReopenQueue *queue, Error **errp)
1375{
1376    BDRVNBDState *s = (BDRVNBDState *)state->bs->opaque;
1377
1378    if ((state->flags & BDRV_O_RDWR) && (s->info.flags & NBD_FLAG_READ_ONLY)) {
1379        error_setg(errp, "Can't reopen read-only NBD mount as read/write");
1380        return -EACCES;
1381    }
1382    return 0;
1383}
1384
1385static void nbd_client_close(BlockDriverState *bs)
1386{
1387    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1388    NBDRequest request = { .type = NBD_CMD_DISC };
1389
1390    if (s->ioc) {
1391        nbd_send_request(s->ioc, &request);
1392    }
1393
1394    nbd_teardown_connection(bs);
1395}
1396
1397static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
1398                                                  Error **errp)
1399{
1400    QIOChannelSocket *sioc;
1401    Error *local_err = NULL;
1402
1403    sioc = qio_channel_socket_new();
1404    qio_channel_set_name(QIO_CHANNEL(sioc), "nbd-client");
1405
1406    qio_channel_socket_connect_sync(sioc, saddr, &local_err);
1407    if (local_err) {
1408        object_unref(OBJECT(sioc));
1409        error_propagate(errp, local_err);
1410        return NULL;
1411    }
1412
1413    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
1414
1415    return sioc;
1416}
1417
1418static int nbd_client_connect(BlockDriverState *bs, Error **errp)
1419{
1420    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1421    AioContext *aio_context = bdrv_get_aio_context(bs);
1422    int ret;
1423
1424    /*
1425     * establish TCP connection, return error if it fails
1426     * TODO: Configurable retry-until-timeout behaviour.
1427     */
1428    QIOChannelSocket *sioc = nbd_establish_connection(s->saddr, errp);
1429
1430    if (!sioc) {
1431        return -ECONNREFUSED;
1432    }
1433
1434    /* NBD handshake */
1435    trace_nbd_client_connect(s->export);
1436    qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
1437    qio_channel_attach_aio_context(QIO_CHANNEL(sioc), aio_context);
1438
1439    s->info.request_sizes = true;
1440    s->info.structured_reply = true;
1441    s->info.base_allocation = true;
1442    s->info.x_dirty_bitmap = g_strdup(s->x_dirty_bitmap);
1443    s->info.name = g_strdup(s->export ?: "");
1444    ret = nbd_receive_negotiate(aio_context, QIO_CHANNEL(sioc), s->tlscreds,
1445                                s->hostname, &s->ioc, &s->info, errp);
1446    g_free(s->info.x_dirty_bitmap);
1447    g_free(s->info.name);
1448    if (ret < 0) {
1449        object_unref(OBJECT(sioc));
1450        return ret;
1451    }
1452    if (s->x_dirty_bitmap && !s->info.base_allocation) {
1453        error_setg(errp, "requested x-dirty-bitmap %s not found",
1454                   s->x_dirty_bitmap);
1455        ret = -EINVAL;
1456        goto fail;
1457    }
1458    if (s->info.flags & NBD_FLAG_READ_ONLY) {
1459        ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
1460        if (ret < 0) {
1461            goto fail;
1462        }
1463    }
1464    if (s->info.flags & NBD_FLAG_SEND_FUA) {
1465        bs->supported_write_flags = BDRV_REQ_FUA;
1466        bs->supported_zero_flags |= BDRV_REQ_FUA;
1467    }
1468    if (s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
1469        bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
1470        if (s->info.flags & NBD_FLAG_SEND_FAST_ZERO) {
1471            bs->supported_zero_flags |= BDRV_REQ_NO_FALLBACK;
1472        }
1473    }
1474
1475    s->sioc = sioc;
1476
1477    if (!s->ioc) {
1478        s->ioc = QIO_CHANNEL(sioc);
1479        object_ref(OBJECT(s->ioc));
1480    }
1481
1482    trace_nbd_client_connect_success(s->export);
1483
1484    return 0;
1485
1486 fail:
1487    /*
1488     * We have connected, but must fail for other reasons.
1489     * Send NBD_CMD_DISC as a courtesy to the server.
1490     */
1491    {
1492        NBDRequest request = { .type = NBD_CMD_DISC };
1493
1494        nbd_send_request(s->ioc ?: QIO_CHANNEL(sioc), &request);
1495
1496        object_unref(OBJECT(sioc));
1497
1498        return ret;
1499    }
1500}
1501
1502/*
1503 * Parse nbd_open options
1504 */
1505
1506static int nbd_parse_uri(const char *filename, QDict *options)
1507{
1508    URI *uri;
1509    const char *p;
1510    QueryParams *qp = NULL;
1511    int ret = 0;
1512    bool is_unix;
1513
1514    uri = uri_parse(filename);
1515    if (!uri) {
1516        return -EINVAL;
1517    }
1518
1519    /* transport */
1520    if (!g_strcmp0(uri->scheme, "nbd")) {
1521        is_unix = false;
1522    } else if (!g_strcmp0(uri->scheme, "nbd+tcp")) {
1523        is_unix = false;
1524    } else if (!g_strcmp0(uri->scheme, "nbd+unix")) {
1525        is_unix = true;
1526    } else {
1527        ret = -EINVAL;
1528        goto out;
1529    }
1530
1531    p = uri->path ? uri->path : "/";
1532    p += strspn(p, "/");
1533    if (p[0]) {
1534        qdict_put_str(options, "export", p);
1535    }
1536
1537    qp = query_params_parse(uri->query);
1538    if (qp->n > 1 || (is_unix && !qp->n) || (!is_unix && qp->n)) {
1539        ret = -EINVAL;
1540        goto out;
1541    }
1542
1543    if (is_unix) {
1544        /* nbd+unix:///export?socket=path */
1545        if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) {
1546            ret = -EINVAL;
1547            goto out;
1548        }
1549        qdict_put_str(options, "server.type", "unix");
1550        qdict_put_str(options, "server.path", qp->p[0].value);
1551    } else {
1552        QString *host;
1553        char *port_str;
1554
1555        /* nbd[+tcp]://host[:port]/export */
1556        if (!uri->server) {
1557            ret = -EINVAL;
1558            goto out;
1559        }
1560
1561        /* strip braces from literal IPv6 address */
1562        if (uri->server[0] == '[') {
1563            host = qstring_from_substr(uri->server, 1,
1564                                       strlen(uri->server) - 1);
1565        } else {
1566            host = qstring_from_str(uri->server);
1567        }
1568
1569        qdict_put_str(options, "server.type", "inet");
1570        qdict_put(options, "server.host", host);
1571
1572        port_str = g_strdup_printf("%d", uri->port ?: NBD_DEFAULT_PORT);
1573        qdict_put_str(options, "server.port", port_str);
1574        g_free(port_str);
1575    }
1576
1577out:
1578    if (qp) {
1579        query_params_free(qp);
1580    }
1581    uri_free(uri);
1582    return ret;
1583}
1584
1585static bool nbd_has_filename_options_conflict(QDict *options, Error **errp)
1586{
1587    const QDictEntry *e;
1588
1589    for (e = qdict_first(options); e; e = qdict_next(options, e)) {
1590        if (!strcmp(e->key, "host") ||
1591            !strcmp(e->key, "port") ||
1592            !strcmp(e->key, "path") ||
1593            !strcmp(e->key, "export") ||
1594            strstart(e->key, "server.", NULL))
1595        {
1596            error_setg(errp, "Option '%s' cannot be used with a file name",
1597                       e->key);
1598            return true;
1599        }
1600    }
1601
1602    return false;
1603}
1604
1605static void nbd_parse_filename(const char *filename, QDict *options,
1606                               Error **errp)
1607{
1608    g_autofree char *file = NULL;
1609    char *export_name;
1610    const char *host_spec;
1611    const char *unixpath;
1612
1613    if (nbd_has_filename_options_conflict(options, errp)) {
1614        return;
1615    }
1616
1617    if (strstr(filename, "://")) {
1618        int ret = nbd_parse_uri(filename, options);
1619        if (ret < 0) {
1620            error_setg(errp, "No valid URL specified");
1621        }
1622        return;
1623    }
1624
1625    file = g_strdup(filename);
1626
1627    export_name = strstr(file, EN_OPTSTR);
1628    if (export_name) {
1629        if (export_name[strlen(EN_OPTSTR)] == 0) {
1630            return;
1631        }
1632        export_name[0] = 0; /* truncate 'file' */
1633        export_name += strlen(EN_OPTSTR);
1634
1635        qdict_put_str(options, "export", export_name);
1636    }
1637
1638    /* extract the host_spec - fail if it's not nbd:... */
1639    if (!strstart(file, "nbd:", &host_spec)) {
1640        error_setg(errp, "File name string for NBD must start with 'nbd:'");
1641        return;
1642    }
1643
1644    if (!*host_spec) {
1645        return;
1646    }
1647
1648    /* are we a UNIX or TCP socket? */
1649    if (strstart(host_spec, "unix:", &unixpath)) {
1650        qdict_put_str(options, "server.type", "unix");
1651        qdict_put_str(options, "server.path", unixpath);
1652    } else {
1653        InetSocketAddress *addr = g_new(InetSocketAddress, 1);
1654
1655        if (inet_parse(addr, host_spec, errp)) {
1656            goto out_inet;
1657        }
1658
1659        qdict_put_str(options, "server.type", "inet");
1660        qdict_put_str(options, "server.host", addr->host);
1661        qdict_put_str(options, "server.port", addr->port);
1662    out_inet:
1663        qapi_free_InetSocketAddress(addr);
1664    }
1665}
1666
1667static bool nbd_process_legacy_socket_options(QDict *output_options,
1668                                              QemuOpts *legacy_opts,
1669                                              Error **errp)
1670{
1671    const char *path = qemu_opt_get(legacy_opts, "path");
1672    const char *host = qemu_opt_get(legacy_opts, "host");
1673    const char *port = qemu_opt_get(legacy_opts, "port");
1674    const QDictEntry *e;
1675
1676    if (!path && !host && !port) {
1677        return true;
1678    }
1679
1680    for (e = qdict_first(output_options); e; e = qdict_next(output_options, e))
1681    {
1682        if (strstart(e->key, "server.", NULL)) {
1683            error_setg(errp, "Cannot use 'server' and path/host/port at the "
1684                       "same time");
1685            return false;
1686        }
1687    }
1688
1689    if (path && host) {
1690        error_setg(errp, "path and host may not be used at the same time");
1691        return false;
1692    } else if (path) {
1693        if (port) {
1694            error_setg(errp, "port may not be used without host");
1695            return false;
1696        }
1697
1698        qdict_put_str(output_options, "server.type", "unix");
1699        qdict_put_str(output_options, "server.path", path);
1700    } else if (host) {
1701        qdict_put_str(output_options, "server.type", "inet");
1702        qdict_put_str(output_options, "server.host", host);
1703        qdict_put_str(output_options, "server.port",
1704                      port ?: stringify(NBD_DEFAULT_PORT));
1705    }
1706
1707    return true;
1708}
1709
1710static SocketAddress *nbd_config(BDRVNBDState *s, QDict *options,
1711                                 Error **errp)
1712{
1713    SocketAddress *saddr = NULL;
1714    QDict *addr = NULL;
1715    Visitor *iv = NULL;
1716    Error *local_err = NULL;
1717
1718    qdict_extract_subqdict(options, &addr, "server.");
1719    if (!qdict_size(addr)) {
1720        error_setg(errp, "NBD server address missing");
1721        goto done;
1722    }
1723
1724    iv = qobject_input_visitor_new_flat_confused(addr, errp);
1725    if (!iv) {
1726        goto done;
1727    }
1728
1729    visit_type_SocketAddress(iv, NULL, &saddr, &local_err);
1730    if (local_err) {
1731        error_propagate(errp, local_err);
1732        goto done;
1733    }
1734
1735done:
1736    qobject_unref(addr);
1737    visit_free(iv);
1738    return saddr;
1739}
1740
1741static QCryptoTLSCreds *nbd_get_tls_creds(const char *id, Error **errp)
1742{
1743    Object *obj;
1744    QCryptoTLSCreds *creds;
1745
1746    obj = object_resolve_path_component(
1747        object_get_objects_root(), id);
1748    if (!obj) {
1749        error_setg(errp, "No TLS credentials with id '%s'",
1750                   id);
1751        return NULL;
1752    }
1753    creds = (QCryptoTLSCreds *)
1754        object_dynamic_cast(obj, TYPE_QCRYPTO_TLS_CREDS);
1755    if (!creds) {
1756        error_setg(errp, "Object with id '%s' is not TLS credentials",
1757                   id);
1758        return NULL;
1759    }
1760
1761    if (creds->endpoint != QCRYPTO_TLS_CREDS_ENDPOINT_CLIENT) {
1762        error_setg(errp,
1763                   "Expecting TLS credentials with a client endpoint");
1764        return NULL;
1765    }
1766    object_ref(obj);
1767    return creds;
1768}
1769
1770
1771static QemuOptsList nbd_runtime_opts = {
1772    .name = "nbd",
1773    .head = QTAILQ_HEAD_INITIALIZER(nbd_runtime_opts.head),
1774    .desc = {
1775        {
1776            .name = "host",
1777            .type = QEMU_OPT_STRING,
1778            .help = "TCP host to connect to",
1779        },
1780        {
1781            .name = "port",
1782            .type = QEMU_OPT_STRING,
1783            .help = "TCP port to connect to",
1784        },
1785        {
1786            .name = "path",
1787            .type = QEMU_OPT_STRING,
1788            .help = "Unix socket path to connect to",
1789        },
1790        {
1791            .name = "export",
1792            .type = QEMU_OPT_STRING,
1793            .help = "Name of the NBD export to open",
1794        },
1795        {
1796            .name = "tls-creds",
1797            .type = QEMU_OPT_STRING,
1798            .help = "ID of the TLS credentials to use",
1799        },
1800        {
1801            .name = "x-dirty-bitmap",
1802            .type = QEMU_OPT_STRING,
1803            .help = "experimental: expose named dirty bitmap in place of "
1804                    "block status",
1805        },
1806        {
1807            .name = "reconnect-delay",
1808            .type = QEMU_OPT_NUMBER,
1809            .help = "On an unexpected disconnect, the nbd client tries to "
1810                    "connect again until succeeding or encountering a serious "
1811                    "error.  During the first @reconnect-delay seconds, all "
1812                    "requests are paused and will be rerun on a successful "
1813                    "reconnect. After that time, any delayed requests and all "
1814                    "future requests before a successful reconnect will "
1815                    "immediately fail. Default 0",
1816        },
1817        { /* end of list */ }
1818    },
1819};
1820
1821static int nbd_process_options(BlockDriverState *bs, QDict *options,
1822                               Error **errp)
1823{
1824    BDRVNBDState *s = bs->opaque;
1825    QemuOpts *opts;
1826    Error *local_err = NULL;
1827    int ret = -EINVAL;
1828
1829    opts = qemu_opts_create(&nbd_runtime_opts, NULL, 0, &error_abort);
1830    qemu_opts_absorb_qdict(opts, options, &local_err);
1831    if (local_err) {
1832        error_propagate(errp, local_err);
1833        goto error;
1834    }
1835
1836    /* Translate @host, @port, and @path to a SocketAddress */
1837    if (!nbd_process_legacy_socket_options(options, opts, errp)) {
1838        goto error;
1839    }
1840
1841    /* Pop the config into our state object. Exit if invalid. */
1842    s->saddr = nbd_config(s, options, errp);
1843    if (!s->saddr) {
1844        goto error;
1845    }
1846
1847    s->export = g_strdup(qemu_opt_get(opts, "export"));
1848    if (s->export && strlen(s->export) > NBD_MAX_STRING_SIZE) {
1849        error_setg(errp, "export name too long to send to server");
1850        goto error;
1851    }
1852
1853    s->tlscredsid = g_strdup(qemu_opt_get(opts, "tls-creds"));
1854    if (s->tlscredsid) {
1855        s->tlscreds = nbd_get_tls_creds(s->tlscredsid, errp);
1856        if (!s->tlscreds) {
1857            goto error;
1858        }
1859
1860        /* TODO SOCKET_ADDRESS_KIND_FD where fd has AF_INET or AF_INET6 */
1861        if (s->saddr->type != SOCKET_ADDRESS_TYPE_INET) {
1862            error_setg(errp, "TLS only supported over IP sockets");
1863            goto error;
1864        }
1865        s->hostname = s->saddr->u.inet.host;
1866    }
1867
1868    s->x_dirty_bitmap = g_strdup(qemu_opt_get(opts, "x-dirty-bitmap"));
1869    if (s->x_dirty_bitmap && strlen(s->x_dirty_bitmap) > NBD_MAX_STRING_SIZE) {
1870        error_setg(errp, "x-dirty-bitmap query too long to send to server");
1871        goto error;
1872    }
1873
1874    s->reconnect_delay = qemu_opt_get_number(opts, "reconnect-delay", 0);
1875
1876    ret = 0;
1877
1878 error:
1879    if (ret < 0) {
1880        nbd_clear_bdrvstate(s);
1881    }
1882    qemu_opts_del(opts);
1883    return ret;
1884}
1885
1886static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
1887                    Error **errp)
1888{
1889    int ret;
1890    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1891
1892    ret = nbd_process_options(bs, options, errp);
1893    if (ret < 0) {
1894        return ret;
1895    }
1896
1897    s->bs = bs;
1898    qemu_co_mutex_init(&s->send_mutex);
1899    qemu_co_queue_init(&s->free_sema);
1900
1901    ret = nbd_client_connect(bs, errp);
1902    if (ret < 0) {
1903        nbd_clear_bdrvstate(s);
1904        return ret;
1905    }
1906    /* successfully connected */
1907    s->state = NBD_CLIENT_CONNECTED;
1908
1909    s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
1910    bdrv_inc_in_flight(bs);
1911    aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
1912
1913    return 0;
1914}
1915
1916static int nbd_co_flush(BlockDriverState *bs)
1917{
1918    return nbd_client_co_flush(bs);
1919}
1920
1921static void nbd_refresh_limits(BlockDriverState *bs, Error **errp)
1922{
1923    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1924    uint32_t min = s->info.min_block;
1925    uint32_t max = MIN_NON_ZERO(NBD_MAX_BUFFER_SIZE, s->info.max_block);
1926
1927    /*
1928     * If the server did not advertise an alignment:
1929     * - a size that is not sector-aligned implies that an alignment
1930     *   of 1 can be used to access those tail bytes
1931     * - advertisement of block status requires an alignment of 1, so
1932     *   that we don't violate block layer constraints that block
1933     *   status is always aligned (as we can't control whether the
1934     *   server will report sub-sector extents, such as a hole at EOF
1935     *   on an unaligned POSIX file)
1936     * - otherwise, assume the server is so old that we are safer avoiding
1937     *   sub-sector requests
1938     */
1939    if (!min) {
1940        min = (!QEMU_IS_ALIGNED(s->info.size, BDRV_SECTOR_SIZE) ||
1941               s->info.base_allocation) ? 1 : BDRV_SECTOR_SIZE;
1942    }
1943
1944    bs->bl.request_alignment = min;
1945    bs->bl.max_pdiscard = max;
1946    bs->bl.max_pwrite_zeroes = max;
1947    bs->bl.max_transfer = max;
1948
1949    if (s->info.opt_block &&
1950        s->info.opt_block > bs->bl.opt_transfer) {
1951        bs->bl.opt_transfer = s->info.opt_block;
1952    }
1953}
1954
1955static void nbd_close(BlockDriverState *bs)
1956{
1957    BDRVNBDState *s = bs->opaque;
1958
1959    nbd_client_close(bs);
1960    nbd_clear_bdrvstate(s);
1961}
1962
1963static int64_t nbd_getlength(BlockDriverState *bs)
1964{
1965    BDRVNBDState *s = bs->opaque;
1966
1967    return s->info.size;
1968}
1969
1970static void nbd_refresh_filename(BlockDriverState *bs)
1971{
1972    BDRVNBDState *s = bs->opaque;
1973    const char *host = NULL, *port = NULL, *path = NULL;
1974    size_t len = 0;
1975
1976    if (s->saddr->type == SOCKET_ADDRESS_TYPE_INET) {
1977        const InetSocketAddress *inet = &s->saddr->u.inet;
1978        if (!inet->has_ipv4 && !inet->has_ipv6 && !inet->has_to) {
1979            host = inet->host;
1980            port = inet->port;
1981        }
1982    } else if (s->saddr->type == SOCKET_ADDRESS_TYPE_UNIX) {
1983        path = s->saddr->u.q_unix.path;
1984    } /* else can't represent as pseudo-filename */
1985
1986    if (path && s->export) {
1987        len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
1988                       "nbd+unix:///%s?socket=%s", s->export, path);
1989    } else if (path && !s->export) {
1990        len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
1991                       "nbd+unix://?socket=%s", path);
1992    } else if (host && s->export) {
1993        len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
1994                       "nbd://%s:%s/%s", host, port, s->export);
1995    } else if (host && !s->export) {
1996        len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
1997                       "nbd://%s:%s", host, port);
1998    }
1999    if (len > sizeof(bs->exact_filename)) {
2000        /* Name is too long to represent exactly, so leave it empty. */
2001        bs->exact_filename[0] = '\0';
2002    }
2003}
2004
2005static char *nbd_dirname(BlockDriverState *bs, Error **errp)
2006{
2007    /* The generic bdrv_dirname() implementation is able to work out some
2008     * directory name for NBD nodes, but that would be wrong. So far there is no
2009     * specification for how "export paths" would work, so NBD does not have
2010     * directory names. */
2011    error_setg(errp, "Cannot generate a base directory for NBD nodes");
2012    return NULL;
2013}
2014
2015static const char *const nbd_strong_runtime_opts[] = {
2016    "path",
2017    "host",
2018    "port",
2019    "export",
2020    "tls-creds",
2021    "server.",
2022
2023    NULL
2024};
2025
2026static BlockDriver bdrv_nbd = {
2027    .format_name                = "nbd",
2028    .protocol_name              = "nbd",
2029    .instance_size              = sizeof(BDRVNBDState),
2030    .bdrv_parse_filename        = nbd_parse_filename,
2031    .bdrv_file_open             = nbd_open,
2032    .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
2033    .bdrv_co_preadv             = nbd_client_co_preadv,
2034    .bdrv_co_pwritev            = nbd_client_co_pwritev,
2035    .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
2036    .bdrv_close                 = nbd_close,
2037    .bdrv_co_flush_to_os        = nbd_co_flush,
2038    .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
2039    .bdrv_refresh_limits        = nbd_refresh_limits,
2040    .bdrv_getlength             = nbd_getlength,
2041    .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
2042    .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
2043    .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
2044    .bdrv_co_drain_end          = nbd_client_co_drain_end,
2045    .bdrv_refresh_filename      = nbd_refresh_filename,
2046    .bdrv_co_block_status       = nbd_client_co_block_status,
2047    .bdrv_dirname               = nbd_dirname,
2048    .strong_runtime_opts        = nbd_strong_runtime_opts,
2049};
2050
2051static BlockDriver bdrv_nbd_tcp = {
2052    .format_name                = "nbd",
2053    .protocol_name              = "nbd+tcp",
2054    .instance_size              = sizeof(BDRVNBDState),
2055    .bdrv_parse_filename        = nbd_parse_filename,
2056    .bdrv_file_open             = nbd_open,
2057    .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
2058    .bdrv_co_preadv             = nbd_client_co_preadv,
2059    .bdrv_co_pwritev            = nbd_client_co_pwritev,
2060    .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
2061    .bdrv_close                 = nbd_close,
2062    .bdrv_co_flush_to_os        = nbd_co_flush,
2063    .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
2064    .bdrv_refresh_limits        = nbd_refresh_limits,
2065    .bdrv_getlength             = nbd_getlength,
2066    .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
2067    .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
2068    .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
2069    .bdrv_co_drain_end          = nbd_client_co_drain_end,
2070    .bdrv_refresh_filename      = nbd_refresh_filename,
2071    .bdrv_co_block_status       = nbd_client_co_block_status,
2072    .bdrv_dirname               = nbd_dirname,
2073    .strong_runtime_opts        = nbd_strong_runtime_opts,
2074};
2075
2076static BlockDriver bdrv_nbd_unix = {
2077    .format_name                = "nbd",
2078    .protocol_name              = "nbd+unix",
2079    .instance_size              = sizeof(BDRVNBDState),
2080    .bdrv_parse_filename        = nbd_parse_filename,
2081    .bdrv_file_open             = nbd_open,
2082    .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
2083    .bdrv_co_preadv             = nbd_client_co_preadv,
2084    .bdrv_co_pwritev            = nbd_client_co_pwritev,
2085    .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
2086    .bdrv_close                 = nbd_close,
2087    .bdrv_co_flush_to_os        = nbd_co_flush,
2088    .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
2089    .bdrv_refresh_limits        = nbd_refresh_limits,
2090    .bdrv_getlength             = nbd_getlength,
2091    .bdrv_detach_aio_context    = nbd_client_detach_aio_context,
2092    .bdrv_attach_aio_context    = nbd_client_attach_aio_context,
2093    .bdrv_co_drain_begin        = nbd_client_co_drain_begin,
2094    .bdrv_co_drain_end          = nbd_client_co_drain_end,
2095    .bdrv_refresh_filename      = nbd_refresh_filename,
2096    .bdrv_co_block_status       = nbd_client_co_block_status,
2097    .bdrv_dirname               = nbd_dirname,
2098    .strong_runtime_opts        = nbd_strong_runtime_opts,
2099};
2100
2101static void bdrv_nbd_init(void)
2102{
2103    bdrv_register(&bdrv_nbd);
2104    bdrv_register(&bdrv_nbd_tcp);
2105    bdrv_register(&bdrv_nbd_unix);
2106}
2107
2108block_init(bdrv_nbd_init);
2109