qemu/block/nbd.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for NBD
   3 *
   4 * Copyright (c) 2019 Virtuozzo International GmbH.
   5 * Copyright Red Hat
   6 * Copyright (C) 2008 Bull S.A.S.
   7 *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
   8 *
   9 * Some parts:
  10 *    Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
  11 *
  12 * Permission is hereby granted, free of charge, to any person obtaining a copy
  13 * of this software and associated documentation files (the "Software"), to deal
  14 * in the Software without restriction, including without limitation the rights
  15 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  16 * copies of the Software, and to permit persons to whom the Software is
  17 * furnished to do so, subject to the following conditions:
  18 *
  19 * The above copyright notice and this permission notice shall be included in
  20 * all copies or substantial portions of the Software.
  21 *
  22 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  23 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  24 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  25 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  26 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  27 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  28 * THE SOFTWARE.
  29 */
  30
  31#include "qemu/osdep.h"
  32
  33#include "trace.h"
  34#include "qemu/uri.h"
  35#include "qemu/option.h"
  36#include "qemu/cutils.h"
  37#include "qemu/main-loop.h"
  38
  39#include "qapi/qapi-visit-sockets.h"
  40#include "qapi/qmp/qstring.h"
  41#include "qapi/clone-visitor.h"
  42
  43#include "block/qdict.h"
  44#include "block/nbd.h"
  45#include "block/block_int.h"
  46#include "block/coroutines.h"
  47
  48#include "qemu/yank.h"
  49
  50#define EN_OPTSTR ":exportname="
  51#define MAX_NBD_REQUESTS    16
  52
  53#define COOKIE_TO_INDEX(cookie) ((cookie) - 1)
  54#define INDEX_TO_COOKIE(index)  ((index) + 1)
  55
  56typedef struct {
  57    Coroutine *coroutine;
  58    uint64_t offset;        /* original offset of the request */
  59    bool receiving;         /* sleeping in the yield in nbd_receive_replies */
  60} NBDClientRequest;
  61
  62typedef enum NBDClientState {
  63    NBD_CLIENT_CONNECTING_WAIT,
  64    NBD_CLIENT_CONNECTING_NOWAIT,
  65    NBD_CLIENT_CONNECTED,
  66    NBD_CLIENT_QUIT
  67} NBDClientState;
  68
  69typedef struct BDRVNBDState {
  70    QIOChannel *ioc; /* The current I/O channel */
  71    NBDExportInfo info;
  72
  73    /*
  74     * Protects state, free_sema, in_flight, requests[].coroutine,
  75     * reconnect_delay_timer.
  76     */
  77    QemuMutex requests_lock;
  78    NBDClientState state;
  79    CoQueue free_sema;
  80    unsigned in_flight;
  81    NBDClientRequest requests[MAX_NBD_REQUESTS];
  82    QEMUTimer *reconnect_delay_timer;
  83
  84    /* Protects sending data on the socket.  */
  85    CoMutex send_mutex;
  86
  87    /*
  88     * Protects receiving reply headers from the socket, as well as the
  89     * fields reply and requests[].receiving
  90     */
  91    CoMutex receive_mutex;
  92    NBDReply reply;
  93
  94    QEMUTimer *open_timer;
  95
  96    BlockDriverState *bs;
  97
  98    /* Connection parameters */
  99    uint32_t reconnect_delay;
 100    uint32_t open_timeout;
 101    SocketAddress *saddr;
 102    char *export;
 103    char *tlscredsid;
 104    QCryptoTLSCreds *tlscreds;
 105    char *tlshostname;
 106    char *x_dirty_bitmap;
 107    bool alloc_depth;
 108
 109    NBDClientConnection *conn;
 110} BDRVNBDState;
 111
 112static void nbd_yank(void *opaque);
 113
 114static void nbd_clear_bdrvstate(BlockDriverState *bs)
 115{
 116    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 117
 118    nbd_client_connection_release(s->conn);
 119    s->conn = NULL;
 120
 121    yank_unregister_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name));
 122
 123    /* Must not leave timers behind that would access freed data */
 124    assert(!s->reconnect_delay_timer);
 125    assert(!s->open_timer);
 126
 127    object_unref(OBJECT(s->tlscreds));
 128    qapi_free_SocketAddress(s->saddr);
 129    s->saddr = NULL;
 130    g_free(s->export);
 131    s->export = NULL;
 132    g_free(s->tlscredsid);
 133    s->tlscredsid = NULL;
 134    g_free(s->tlshostname);
 135    s->tlshostname = NULL;
 136    g_free(s->x_dirty_bitmap);
 137    s->x_dirty_bitmap = NULL;
 138}
 139
 140/* Called with s->receive_mutex taken.  */
 141static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
 142{
 143    if (req->receiving) {
 144        req->receiving = false;
 145        aio_co_wake(req->coroutine);
 146        return true;
 147    }
 148
 149    return false;
 150}
 151
 152static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s)
 153{
 154    int i;
 155
 156    QEMU_LOCK_GUARD(&s->receive_mutex);
 157    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
 158        if (nbd_recv_coroutine_wake_one(&s->requests[i])) {
 159            return;
 160        }
 161    }
 162}
 163
 164/* Called with s->requests_lock held.  */
 165static void coroutine_fn nbd_channel_error_locked(BDRVNBDState *s, int ret)
 166{
 167    if (s->state == NBD_CLIENT_CONNECTED) {
 168        qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
 169    }
 170
 171    if (ret == -EIO) {
 172        if (s->state == NBD_CLIENT_CONNECTED) {
 173            s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
 174                                            NBD_CLIENT_CONNECTING_NOWAIT;
 175        }
 176    } else {
 177        s->state = NBD_CLIENT_QUIT;
 178    }
 179}
 180
 181static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
 182{
 183    QEMU_LOCK_GUARD(&s->requests_lock);
 184    nbd_channel_error_locked(s, ret);
 185}
 186
 187static void reconnect_delay_timer_del(BDRVNBDState *s)
 188{
 189    if (s->reconnect_delay_timer) {
 190        timer_free(s->reconnect_delay_timer);
 191        s->reconnect_delay_timer = NULL;
 192    }
 193}
 194
 195static void reconnect_delay_timer_cb(void *opaque)
 196{
 197    BDRVNBDState *s = opaque;
 198
 199    reconnect_delay_timer_del(s);
 200    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
 201        if (s->state != NBD_CLIENT_CONNECTING_WAIT) {
 202            return;
 203        }
 204        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
 205    }
 206    nbd_co_establish_connection_cancel(s->conn);
 207}
 208
 209static void reconnect_delay_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
 210{
 211    assert(!s->reconnect_delay_timer);
 212    s->reconnect_delay_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
 213                                             QEMU_CLOCK_REALTIME,
 214                                             SCALE_NS,
 215                                             reconnect_delay_timer_cb, s);
 216    timer_mod(s->reconnect_delay_timer, expire_time_ns);
 217}
 218
 219static void nbd_teardown_connection(BlockDriverState *bs)
 220{
 221    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 222
 223    assert(!s->in_flight);
 224
 225    if (s->ioc) {
 226        qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
 227        yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
 228                                 nbd_yank, s->bs);
 229        object_unref(OBJECT(s->ioc));
 230        s->ioc = NULL;
 231    }
 232
 233    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
 234        s->state = NBD_CLIENT_QUIT;
 235    }
 236}
 237
 238static void open_timer_del(BDRVNBDState *s)
 239{
 240    if (s->open_timer) {
 241        timer_free(s->open_timer);
 242        s->open_timer = NULL;
 243    }
 244}
 245
 246static void open_timer_cb(void *opaque)
 247{
 248    BDRVNBDState *s = opaque;
 249
 250    nbd_co_establish_connection_cancel(s->conn);
 251    open_timer_del(s);
 252}
 253
 254static void open_timer_init(BDRVNBDState *s, uint64_t expire_time_ns)
 255{
 256    assert(!s->open_timer);
 257    s->open_timer = aio_timer_new(bdrv_get_aio_context(s->bs),
 258                                  QEMU_CLOCK_REALTIME,
 259                                  SCALE_NS,
 260                                  open_timer_cb, s);
 261    timer_mod(s->open_timer, expire_time_ns);
 262}
 263
 264static bool nbd_client_will_reconnect(BDRVNBDState *s)
 265{
 266    /*
 267     * Called only after a socket error, so this is not performance sensitive.
 268     */
 269    QEMU_LOCK_GUARD(&s->requests_lock);
 270    return s->state == NBD_CLIENT_CONNECTING_WAIT;
 271}
 272
 273/*
 274 * Update @bs with information learned during a completed negotiation process.
 275 * Return failure if the server's advertised options are incompatible with the
 276 * client's needs.
 277 */
 278static int nbd_handle_updated_info(BlockDriverState *bs, Error **errp)
 279{
 280    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 281    int ret;
 282
 283    if (s->x_dirty_bitmap) {
 284        if (!s->info.base_allocation) {
 285            error_setg(errp, "requested x-dirty-bitmap %s not found",
 286                       s->x_dirty_bitmap);
 287            return -EINVAL;
 288        }
 289        if (strcmp(s->x_dirty_bitmap, "qemu:allocation-depth") == 0) {
 290            s->alloc_depth = true;
 291        }
 292    }
 293
 294    if (s->info.flags & NBD_FLAG_READ_ONLY) {
 295        ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
 296        if (ret < 0) {
 297            return ret;
 298        }
 299    }
 300
 301    if (s->info.flags & NBD_FLAG_SEND_FUA) {
 302        bs->supported_write_flags = BDRV_REQ_FUA;
 303        bs->supported_zero_flags |= BDRV_REQ_FUA;
 304    }
 305
 306    if (s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
 307        bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
 308        if (s->info.flags & NBD_FLAG_SEND_FAST_ZERO) {
 309            bs->supported_zero_flags |= BDRV_REQ_NO_FALLBACK;
 310        }
 311    }
 312
 313    trace_nbd_client_handshake_success(s->export);
 314
 315    return 0;
 316}
 317
 318int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
 319                                                bool blocking, Error **errp)
 320{
 321    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 322    int ret;
 323    IO_CODE();
 324
 325    assert_bdrv_graph_readable();
 326    assert(!s->ioc);
 327
 328    s->ioc = nbd_co_establish_connection(s->conn, &s->info, blocking, errp);
 329    if (!s->ioc) {
 330        return -ECONNREFUSED;
 331    }
 332
 333    yank_register_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name), nbd_yank,
 334                           bs);
 335
 336    ret = nbd_handle_updated_info(s->bs, NULL);
 337    if (ret < 0) {
 338        /*
 339         * We have connected, but must fail for other reasons.
 340         * Send NBD_CMD_DISC as a courtesy to the server.
 341         */
 342        NBDRequest request = { .type = NBD_CMD_DISC };
 343
 344        nbd_send_request(s->ioc, &request);
 345
 346        yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
 347                                 nbd_yank, bs);
 348        object_unref(OBJECT(s->ioc));
 349        s->ioc = NULL;
 350
 351        return ret;
 352    }
 353
 354    qio_channel_set_blocking(s->ioc, false, NULL);
 355    qio_channel_attach_aio_context(s->ioc, bdrv_get_aio_context(bs));
 356
 357    /* successfully connected */
 358    WITH_QEMU_LOCK_GUARD(&s->requests_lock) {
 359        s->state = NBD_CLIENT_CONNECTED;
 360    }
 361
 362    return 0;
 363}
 364
 365/* Called with s->requests_lock held.  */
 366static bool nbd_client_connecting(BDRVNBDState *s)
 367{
 368    return s->state == NBD_CLIENT_CONNECTING_WAIT ||
 369        s->state == NBD_CLIENT_CONNECTING_NOWAIT;
 370}
 371
 372/* Called with s->requests_lock taken.  */
 373static void coroutine_fn GRAPH_RDLOCK nbd_reconnect_attempt(BDRVNBDState *s)
 374{
 375    int ret;
 376    bool blocking = s->state == NBD_CLIENT_CONNECTING_WAIT;
 377
 378    /*
 379     * Now we are sure that nobody is accessing the channel, and no one will
 380     * try until we set the state to CONNECTED.
 381     */
 382    assert(nbd_client_connecting(s));
 383    assert(s->in_flight == 1);
 384
 385    trace_nbd_reconnect_attempt(s->bs->in_flight);
 386
 387    if (blocking && !s->reconnect_delay_timer) {
 388        /*
 389         * It's the first reconnect attempt after switching to
 390         * NBD_CLIENT_CONNECTING_WAIT
 391         */
 392        g_assert(s->reconnect_delay);
 393        reconnect_delay_timer_init(s,
 394            qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
 395            s->reconnect_delay * NANOSECONDS_PER_SECOND);
 396    }
 397
 398    /* Finalize previous connection if any */
 399    if (s->ioc) {
 400        qio_channel_detach_aio_context(s->ioc);
 401        yank_unregister_function(BLOCKDEV_YANK_INSTANCE(s->bs->node_name),
 402                                 nbd_yank, s->bs);
 403        object_unref(OBJECT(s->ioc));
 404        s->ioc = NULL;
 405    }
 406
 407    qemu_mutex_unlock(&s->requests_lock);
 408    ret = nbd_co_do_establish_connection(s->bs, blocking, NULL);
 409    trace_nbd_reconnect_attempt_result(ret, s->bs->in_flight);
 410    qemu_mutex_lock(&s->requests_lock);
 411
 412    /*
 413     * The reconnect attempt is done (maybe successfully, maybe not), so
 414     * we no longer need this timer.  Delete it so it will not outlive
 415     * this I/O request (so draining removes all timers).
 416     */
 417    reconnect_delay_timer_del(s);
 418}
 419
 420static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t cookie)
 421{
 422    int ret;
 423    uint64_t ind = COOKIE_TO_INDEX(cookie), ind2;
 424    QEMU_LOCK_GUARD(&s->receive_mutex);
 425
 426    while (true) {
 427        if (s->reply.cookie == cookie) {
 428            /* We are done */
 429            return 0;
 430        }
 431
 432        if (s->reply.cookie != 0) {
 433            /*
 434             * Some other request is being handled now. It should already be
 435             * woken by whoever set s->reply.cookie (or never wait in this
 436             * yield). So, we should not wake it here.
 437             */
 438            ind2 = COOKIE_TO_INDEX(s->reply.cookie);
 439            assert(!s->requests[ind2].receiving);
 440
 441            s->requests[ind].receiving = true;
 442            qemu_co_mutex_unlock(&s->receive_mutex);
 443
 444            qemu_coroutine_yield();
 445            /*
 446             * We may be woken for 2 reasons:
 447             * 1. From this function, executing in parallel coroutine, when our
 448             *    cookie is received.
 449             * 2. From nbd_co_receive_one_chunk(), when previous request is
 450             *    finished and s->reply.cookie set to 0.
 451             * Anyway, it's OK to lock the mutex and go to the next iteration.
 452             */
 453
 454            qemu_co_mutex_lock(&s->receive_mutex);
 455            assert(!s->requests[ind].receiving);
 456            continue;
 457        }
 458
 459        /* We are under mutex and cookie is 0. We have to do the dirty work. */
 460        assert(s->reply.cookie == 0);
 461        ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, NULL);
 462        if (ret <= 0) {
 463            ret = ret ? ret : -EIO;
 464            nbd_channel_error(s, ret);
 465            return ret;
 466        }
 467        if (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply) {
 468            nbd_channel_error(s, -EINVAL);
 469            return -EINVAL;
 470        }
 471        ind2 = COOKIE_TO_INDEX(s->reply.cookie);
 472        if (ind2 >= MAX_NBD_REQUESTS || !s->requests[ind2].coroutine) {
 473            nbd_channel_error(s, -EINVAL);
 474            return -EINVAL;
 475        }
 476        if (s->reply.cookie == cookie) {
 477            /* We are done */
 478            return 0;
 479        }
 480        nbd_recv_coroutine_wake_one(&s->requests[ind2]);
 481    }
 482}
 483
 484static int coroutine_fn GRAPH_RDLOCK
 485nbd_co_send_request(BlockDriverState *bs, NBDRequest *request,
 486                    QEMUIOVector *qiov)
 487{
 488    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 489    int rc, i = -1;
 490
 491    qemu_mutex_lock(&s->requests_lock);
 492    while (s->in_flight == MAX_NBD_REQUESTS ||
 493           (s->state != NBD_CLIENT_CONNECTED && s->in_flight > 0)) {
 494        qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
 495    }
 496
 497    s->in_flight++;
 498    if (s->state != NBD_CLIENT_CONNECTED) {
 499        if (nbd_client_connecting(s)) {
 500            nbd_reconnect_attempt(s);
 501            qemu_co_queue_restart_all(&s->free_sema);
 502        }
 503        if (s->state != NBD_CLIENT_CONNECTED) {
 504            rc = -EIO;
 505            goto err;
 506        }
 507    }
 508
 509    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
 510        if (s->requests[i].coroutine == NULL) {
 511            break;
 512        }
 513    }
 514
 515    assert(i < MAX_NBD_REQUESTS);
 516    s->requests[i].coroutine = qemu_coroutine_self();
 517    s->requests[i].offset = request->from;
 518    s->requests[i].receiving = false;
 519    qemu_mutex_unlock(&s->requests_lock);
 520
 521    qemu_co_mutex_lock(&s->send_mutex);
 522    request->cookie = INDEX_TO_COOKIE(i);
 523
 524    assert(s->ioc);
 525
 526    if (qiov) {
 527        qio_channel_set_cork(s->ioc, true);
 528        rc = nbd_send_request(s->ioc, request);
 529        if (rc >= 0 && qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
 530                                              NULL) < 0) {
 531            rc = -EIO;
 532        }
 533        qio_channel_set_cork(s->ioc, false);
 534    } else {
 535        rc = nbd_send_request(s->ioc, request);
 536    }
 537    qemu_co_mutex_unlock(&s->send_mutex);
 538
 539    if (rc < 0) {
 540        qemu_mutex_lock(&s->requests_lock);
 541err:
 542        nbd_channel_error_locked(s, rc);
 543        if (i != -1) {
 544            s->requests[i].coroutine = NULL;
 545        }
 546        s->in_flight--;
 547        qemu_co_queue_next(&s->free_sema);
 548        qemu_mutex_unlock(&s->requests_lock);
 549    }
 550    return rc;
 551}
 552
 553static inline uint16_t payload_advance16(uint8_t **payload)
 554{
 555    *payload += 2;
 556    return lduw_be_p(*payload - 2);
 557}
 558
 559static inline uint32_t payload_advance32(uint8_t **payload)
 560{
 561    *payload += 4;
 562    return ldl_be_p(*payload - 4);
 563}
 564
 565static inline uint64_t payload_advance64(uint8_t **payload)
 566{
 567    *payload += 8;
 568    return ldq_be_p(*payload - 8);
 569}
 570
 571static int nbd_parse_offset_hole_payload(BDRVNBDState *s,
 572                                         NBDStructuredReplyChunk *chunk,
 573                                         uint8_t *payload, uint64_t orig_offset,
 574                                         QEMUIOVector *qiov, Error **errp)
 575{
 576    uint64_t offset;
 577    uint32_t hole_size;
 578
 579    if (chunk->length != sizeof(offset) + sizeof(hole_size)) {
 580        error_setg(errp, "Protocol error: invalid payload for "
 581                         "NBD_REPLY_TYPE_OFFSET_HOLE");
 582        return -EINVAL;
 583    }
 584
 585    offset = payload_advance64(&payload);
 586    hole_size = payload_advance32(&payload);
 587
 588    if (!hole_size || offset < orig_offset || hole_size > qiov->size ||
 589        offset > orig_offset + qiov->size - hole_size) {
 590        error_setg(errp, "Protocol error: server sent chunk exceeding requested"
 591                         " region");
 592        return -EINVAL;
 593    }
 594    if (s->info.min_block &&
 595        !QEMU_IS_ALIGNED(hole_size, s->info.min_block)) {
 596        trace_nbd_structured_read_compliance("hole");
 597    }
 598
 599    qemu_iovec_memset(qiov, offset - orig_offset, 0, hole_size);
 600
 601    return 0;
 602}
 603
 604/*
 605 * nbd_parse_blockstatus_payload
 606 * Based on our request, we expect only one extent in reply, for the
 607 * base:allocation context.
 608 */
 609static int nbd_parse_blockstatus_payload(BDRVNBDState *s,
 610                                         NBDStructuredReplyChunk *chunk,
 611                                         uint8_t *payload, uint64_t orig_length,
 612                                         NBDExtent *extent, Error **errp)
 613{
 614    uint32_t context_id;
 615
 616    /* The server succeeded, so it must have sent [at least] one extent */
 617    if (chunk->length < sizeof(context_id) + sizeof(*extent)) {
 618        error_setg(errp, "Protocol error: invalid payload for "
 619                         "NBD_REPLY_TYPE_BLOCK_STATUS");
 620        return -EINVAL;
 621    }
 622
 623    context_id = payload_advance32(&payload);
 624    if (s->info.context_id != context_id) {
 625        error_setg(errp, "Protocol error: unexpected context id %d for "
 626                         "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
 627                         "id is %d", context_id,
 628                         s->info.context_id);
 629        return -EINVAL;
 630    }
 631
 632    extent->length = payload_advance32(&payload);
 633    extent->flags = payload_advance32(&payload);
 634
 635    if (extent->length == 0) {
 636        error_setg(errp, "Protocol error: server sent status chunk with "
 637                   "zero length");
 638        return -EINVAL;
 639    }
 640
 641    /*
 642     * A server sending unaligned block status is in violation of the
 643     * protocol, but as qemu-nbd 3.1 is such a server (at least for
 644     * POSIX files that are not a multiple of 512 bytes, since qemu
 645     * rounds files up to 512-byte multiples but lseek(SEEK_HOLE)
 646     * still sees an implicit hole beyond the real EOF), it's nicer to
 647     * work around the misbehaving server. If the request included
 648     * more than the final unaligned block, truncate it back to an
 649     * aligned result; if the request was only the final block, round
 650     * up to the full block and change the status to fully-allocated
 651     * (always a safe status, even if it loses information).
 652     */
 653    if (s->info.min_block && !QEMU_IS_ALIGNED(extent->length,
 654                                                   s->info.min_block)) {
 655        trace_nbd_parse_blockstatus_compliance("extent length is unaligned");
 656        if (extent->length > s->info.min_block) {
 657            extent->length = QEMU_ALIGN_DOWN(extent->length,
 658                                             s->info.min_block);
 659        } else {
 660            extent->length = s->info.min_block;
 661            extent->flags = 0;
 662        }
 663    }
 664
 665    /*
 666     * We used NBD_CMD_FLAG_REQ_ONE, so the server should not have
 667     * sent us any more than one extent, nor should it have included
 668     * status beyond our request in that extent. However, it's easy
 669     * enough to ignore the server's noncompliance without killing the
 670     * connection; just ignore trailing extents, and clamp things to
 671     * the length of our request.
 672     */
 673    if (chunk->length > sizeof(context_id) + sizeof(*extent)) {
 674        trace_nbd_parse_blockstatus_compliance("more than one extent");
 675    }
 676    if (extent->length > orig_length) {
 677        extent->length = orig_length;
 678        trace_nbd_parse_blockstatus_compliance("extent length too large");
 679    }
 680
 681    /*
 682     * HACK: if we are using x-dirty-bitmaps to access
 683     * qemu:allocation-depth, treat all depths > 2 the same as 2,
 684     * since nbd_client_co_block_status is only expecting the low two
 685     * bits to be set.
 686     */
 687    if (s->alloc_depth && extent->flags > 2) {
 688        extent->flags = 2;
 689    }
 690
 691    return 0;
 692}
 693
 694/*
 695 * nbd_parse_error_payload
 696 * on success @errp contains message describing nbd error reply
 697 */
 698static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
 699                                   uint8_t *payload, int *request_ret,
 700                                   Error **errp)
 701{
 702    uint32_t error;
 703    uint16_t message_size;
 704
 705    assert(chunk->type & (1 << 15));
 706
 707    if (chunk->length < sizeof(error) + sizeof(message_size)) {
 708        error_setg(errp,
 709                   "Protocol error: invalid payload for structured error");
 710        return -EINVAL;
 711    }
 712
 713    error = nbd_errno_to_system_errno(payload_advance32(&payload));
 714    if (error == 0) {
 715        error_setg(errp, "Protocol error: server sent structured error chunk "
 716                         "with error = 0");
 717        return -EINVAL;
 718    }
 719
 720    *request_ret = -error;
 721    message_size = payload_advance16(&payload);
 722
 723    if (message_size > chunk->length - sizeof(error) - sizeof(message_size)) {
 724        error_setg(errp, "Protocol error: server sent structured error chunk "
 725                         "with incorrect message size");
 726        return -EINVAL;
 727    }
 728
 729    /* TODO: Add a trace point to mention the server complaint */
 730
 731    /* TODO handle ERROR_OFFSET */
 732
 733    return 0;
 734}
 735
 736static int coroutine_fn
 737nbd_co_receive_offset_data_payload(BDRVNBDState *s, uint64_t orig_offset,
 738                                   QEMUIOVector *qiov, Error **errp)
 739{
 740    QEMUIOVector sub_qiov;
 741    uint64_t offset;
 742    size_t data_size;
 743    int ret;
 744    NBDStructuredReplyChunk *chunk = &s->reply.structured;
 745
 746    assert(nbd_reply_is_structured(&s->reply));
 747
 748    /* The NBD spec requires at least one byte of payload */
 749    if (chunk->length <= sizeof(offset)) {
 750        error_setg(errp, "Protocol error: invalid payload for "
 751                         "NBD_REPLY_TYPE_OFFSET_DATA");
 752        return -EINVAL;
 753    }
 754
 755    if (nbd_read64(s->ioc, &offset, "OFFSET_DATA offset", errp) < 0) {
 756        return -EIO;
 757    }
 758
 759    data_size = chunk->length - sizeof(offset);
 760    assert(data_size);
 761    if (offset < orig_offset || data_size > qiov->size ||
 762        offset > orig_offset + qiov->size - data_size) {
 763        error_setg(errp, "Protocol error: server sent chunk exceeding requested"
 764                         " region");
 765        return -EINVAL;
 766    }
 767    if (s->info.min_block && !QEMU_IS_ALIGNED(data_size, s->info.min_block)) {
 768        trace_nbd_structured_read_compliance("data");
 769    }
 770
 771    qemu_iovec_init(&sub_qiov, qiov->niov);
 772    qemu_iovec_concat(&sub_qiov, qiov, offset - orig_offset, data_size);
 773    ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, errp);
 774    qemu_iovec_destroy(&sub_qiov);
 775
 776    return ret < 0 ? -EIO : 0;
 777}
 778
 779#define NBD_MAX_MALLOC_PAYLOAD 1000
 780static coroutine_fn int nbd_co_receive_structured_payload(
 781        BDRVNBDState *s, void **payload, Error **errp)
 782{
 783    int ret;
 784    uint32_t len;
 785
 786    assert(nbd_reply_is_structured(&s->reply));
 787
 788    len = s->reply.structured.length;
 789
 790    if (len == 0) {
 791        return 0;
 792    }
 793
 794    if (payload == NULL) {
 795        error_setg(errp, "Unexpected structured payload");
 796        return -EINVAL;
 797    }
 798
 799    if (len > NBD_MAX_MALLOC_PAYLOAD) {
 800        error_setg(errp, "Payload too large");
 801        return -EINVAL;
 802    }
 803
 804    *payload = g_new(char, len);
 805    ret = nbd_read(s->ioc, *payload, len, "structured payload", errp);
 806    if (ret < 0) {
 807        g_free(*payload);
 808        *payload = NULL;
 809        return ret;
 810    }
 811
 812    return 0;
 813}
 814
 815/*
 816 * nbd_co_do_receive_one_chunk
 817 * for simple reply:
 818 *   set request_ret to received reply error
 819 *   if qiov is not NULL: read payload to @qiov
 820 * for structured reply chunk:
 821 *   if error chunk: read payload, set @request_ret, do not set @payload
 822 *   else if offset_data chunk: read payload data to @qiov, do not set @payload
 823 *   else: read payload to @payload
 824 *
 825 * If function fails, @errp contains corresponding error message, and the
 826 * connection with the server is suspect.  If it returns 0, then the
 827 * transaction succeeded (although @request_ret may be a negative errno
 828 * corresponding to the server's error reply), and errp is unchanged.
 829 */
 830static coroutine_fn int nbd_co_do_receive_one_chunk(
 831        BDRVNBDState *s, uint64_t cookie, bool only_structured,
 832        int *request_ret, QEMUIOVector *qiov, void **payload, Error **errp)
 833{
 834    int ret;
 835    int i = COOKIE_TO_INDEX(cookie);
 836    void *local_payload = NULL;
 837    NBDStructuredReplyChunk *chunk;
 838
 839    if (payload) {
 840        *payload = NULL;
 841    }
 842    *request_ret = 0;
 843
 844    ret = nbd_receive_replies(s, cookie);
 845    if (ret < 0) {
 846        error_setg(errp, "Connection closed");
 847        return -EIO;
 848    }
 849    assert(s->ioc);
 850
 851    assert(s->reply.cookie == cookie);
 852
 853    if (nbd_reply_is_simple(&s->reply)) {
 854        if (only_structured) {
 855            error_setg(errp, "Protocol error: simple reply when structured "
 856                             "reply chunk was expected");
 857            return -EINVAL;
 858        }
 859
 860        *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error);
 861        if (*request_ret < 0 || !qiov) {
 862            return 0;
 863        }
 864
 865        return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
 866                                     errp) < 0 ? -EIO : 0;
 867    }
 868
 869    /* handle structured reply chunk */
 870    assert(s->info.structured_reply);
 871    chunk = &s->reply.structured;
 872
 873    if (chunk->type == NBD_REPLY_TYPE_NONE) {
 874        if (!(chunk->flags & NBD_REPLY_FLAG_DONE)) {
 875            error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
 876                       " NBD_REPLY_FLAG_DONE flag set");
 877            return -EINVAL;
 878        }
 879        if (chunk->length) {
 880            error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
 881                       " nonzero length");
 882            return -EINVAL;
 883        }
 884        return 0;
 885    }
 886
 887    if (chunk->type == NBD_REPLY_TYPE_OFFSET_DATA) {
 888        if (!qiov) {
 889            error_setg(errp, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
 890            return -EINVAL;
 891        }
 892
 893        return nbd_co_receive_offset_data_payload(s, s->requests[i].offset,
 894                                                  qiov, errp);
 895    }
 896
 897    if (nbd_reply_type_is_error(chunk->type)) {
 898        payload = &local_payload;
 899    }
 900
 901    ret = nbd_co_receive_structured_payload(s, payload, errp);
 902    if (ret < 0) {
 903        return ret;
 904    }
 905
 906    if (nbd_reply_type_is_error(chunk->type)) {
 907        ret = nbd_parse_error_payload(chunk, local_payload, request_ret, errp);
 908        g_free(local_payload);
 909        return ret;
 910    }
 911
 912    return 0;
 913}
 914
 915/*
 916 * nbd_co_receive_one_chunk
 917 * Read reply, wake up connection_co and set s->quit if needed.
 918 * Return value is a fatal error code or normal nbd reply error code
 919 */
 920static coroutine_fn int nbd_co_receive_one_chunk(
 921        BDRVNBDState *s, uint64_t cookie, bool only_structured,
 922        int *request_ret, QEMUIOVector *qiov, NBDReply *reply, void **payload,
 923        Error **errp)
 924{
 925    int ret = nbd_co_do_receive_one_chunk(s, cookie, only_structured,
 926                                          request_ret, qiov, payload, errp);
 927
 928    if (ret < 0) {
 929        memset(reply, 0, sizeof(*reply));
 930        nbd_channel_error(s, ret);
 931    } else {
 932        /* For assert at loop start in nbd_connection_entry */
 933        *reply = s->reply;
 934    }
 935    s->reply.cookie = 0;
 936
 937    nbd_recv_coroutines_wake(s);
 938
 939    return ret;
 940}
 941
 942typedef struct NBDReplyChunkIter {
 943    int ret;
 944    int request_ret;
 945    Error *err;
 946    bool done, only_structured;
 947} NBDReplyChunkIter;
 948
 949static void nbd_iter_channel_error(NBDReplyChunkIter *iter,
 950                                   int ret, Error **local_err)
 951{
 952    assert(local_err && *local_err);
 953    assert(ret < 0);
 954
 955    if (!iter->ret) {
 956        iter->ret = ret;
 957        error_propagate(&iter->err, *local_err);
 958    } else {
 959        error_free(*local_err);
 960    }
 961
 962    *local_err = NULL;
 963}
 964
 965static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret)
 966{
 967    assert(ret < 0);
 968
 969    if (!iter->request_ret) {
 970        iter->request_ret = ret;
 971    }
 972}
 973
 974/*
 975 * NBD_FOREACH_REPLY_CHUNK
 976 * The pointer stored in @payload requires g_free() to free it.
 977 */
 978#define NBD_FOREACH_REPLY_CHUNK(s, iter, cookie, structured, \
 979                                qiov, reply, payload) \
 980    for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
 981         nbd_reply_chunk_iter_receive(s, &iter, cookie, qiov, reply, payload);)
 982
 983/*
 984 * nbd_reply_chunk_iter_receive
 985 * The pointer stored in @payload requires g_free() to free it.
 986 */
 987static bool coroutine_fn nbd_reply_chunk_iter_receive(BDRVNBDState *s,
 988                                                      NBDReplyChunkIter *iter,
 989                                                      uint64_t cookie,
 990                                                      QEMUIOVector *qiov,
 991                                                      NBDReply *reply,
 992                                                      void **payload)
 993{
 994    int ret, request_ret;
 995    NBDReply local_reply;
 996    NBDStructuredReplyChunk *chunk;
 997    Error *local_err = NULL;
 998
 999    if (iter->done) {
1000        /* Previous iteration was last. */
1001        goto break_loop;
1002    }
1003
1004    if (reply == NULL) {
1005        reply = &local_reply;
1006    }
1007
1008    ret = nbd_co_receive_one_chunk(s, cookie, iter->only_structured,
1009                                   &request_ret, qiov, reply, payload,
1010                                   &local_err);
1011    if (ret < 0) {
1012        nbd_iter_channel_error(iter, ret, &local_err);
1013    } else if (request_ret < 0) {
1014        nbd_iter_request_error(iter, request_ret);
1015    }
1016
1017    /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
1018    if (nbd_reply_is_simple(reply) || iter->ret < 0) {
1019        goto break_loop;
1020    }
1021
1022    chunk = &reply->structured;
1023    iter->only_structured = true;
1024
1025    if (chunk->type == NBD_REPLY_TYPE_NONE) {
1026        /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
1027        assert(chunk->flags & NBD_REPLY_FLAG_DONE);
1028        goto break_loop;
1029    }
1030
1031    if (chunk->flags & NBD_REPLY_FLAG_DONE) {
1032        /* This iteration is last. */
1033        iter->done = true;
1034    }
1035
1036    /* Execute the loop body */
1037    return true;
1038
1039break_loop:
1040    qemu_mutex_lock(&s->requests_lock);
1041    s->requests[COOKIE_TO_INDEX(cookie)].coroutine = NULL;
1042    s->in_flight--;
1043    qemu_co_queue_next(&s->free_sema);
1044    qemu_mutex_unlock(&s->requests_lock);
1045
1046    return false;
1047}
1048
1049static int coroutine_fn
1050nbd_co_receive_return_code(BDRVNBDState *s, uint64_t cookie,
1051                           int *request_ret, Error **errp)
1052{
1053    NBDReplyChunkIter iter;
1054
1055    NBD_FOREACH_REPLY_CHUNK(s, iter, cookie, false, NULL, NULL, NULL) {
1056        /* nbd_reply_chunk_iter_receive does all the work */
1057    }
1058
1059    error_propagate(errp, iter.err);
1060    *request_ret = iter.request_ret;
1061    return iter.ret;
1062}
1063
1064static int coroutine_fn
1065nbd_co_receive_cmdread_reply(BDRVNBDState *s, uint64_t cookie,
1066                             uint64_t offset, QEMUIOVector *qiov,
1067                             int *request_ret, Error **errp)
1068{
1069    NBDReplyChunkIter iter;
1070    NBDReply reply;
1071    void *payload = NULL;
1072    Error *local_err = NULL;
1073
1074    NBD_FOREACH_REPLY_CHUNK(s, iter, cookie, s->info.structured_reply,
1075                            qiov, &reply, &payload)
1076    {
1077        int ret;
1078        NBDStructuredReplyChunk *chunk = &reply.structured;
1079
1080        assert(nbd_reply_is_structured(&reply));
1081
1082        switch (chunk->type) {
1083        case NBD_REPLY_TYPE_OFFSET_DATA:
1084            /*
1085             * special cased in nbd_co_receive_one_chunk, data is already
1086             * in qiov
1087             */
1088            break;
1089        case NBD_REPLY_TYPE_OFFSET_HOLE:
1090            ret = nbd_parse_offset_hole_payload(s, &reply.structured, payload,
1091                                                offset, qiov, &local_err);
1092            if (ret < 0) {
1093                nbd_channel_error(s, ret);
1094                nbd_iter_channel_error(&iter, ret, &local_err);
1095            }
1096            break;
1097        default:
1098            if (!nbd_reply_type_is_error(chunk->type)) {
1099                /* not allowed reply type */
1100                nbd_channel_error(s, -EINVAL);
1101                error_setg(&local_err,
1102                           "Unexpected reply type: %d (%s) for CMD_READ",
1103                           chunk->type, nbd_reply_type_lookup(chunk->type));
1104                nbd_iter_channel_error(&iter, -EINVAL, &local_err);
1105            }
1106        }
1107
1108        g_free(payload);
1109        payload = NULL;
1110    }
1111
1112    error_propagate(errp, iter.err);
1113    *request_ret = iter.request_ret;
1114    return iter.ret;
1115}
1116
1117static int coroutine_fn
1118nbd_co_receive_blockstatus_reply(BDRVNBDState *s, uint64_t cookie,
1119                                 uint64_t length, NBDExtent *extent,
1120                                 int *request_ret, Error **errp)
1121{
1122    NBDReplyChunkIter iter;
1123    NBDReply reply;
1124    void *payload = NULL;
1125    Error *local_err = NULL;
1126    bool received = false;
1127
1128    assert(!extent->length);
1129    NBD_FOREACH_REPLY_CHUNK(s, iter, cookie, false, NULL, &reply, &payload) {
1130        int ret;
1131        NBDStructuredReplyChunk *chunk = &reply.structured;
1132
1133        assert(nbd_reply_is_structured(&reply));
1134
1135        switch (chunk->type) {
1136        case NBD_REPLY_TYPE_BLOCK_STATUS:
1137            if (received) {
1138                nbd_channel_error(s, -EINVAL);
1139                error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
1140                nbd_iter_channel_error(&iter, -EINVAL, &local_err);
1141            }
1142            received = true;
1143
1144            ret = nbd_parse_blockstatus_payload(s, &reply.structured,
1145                                                payload, length, extent,
1146                                                &local_err);
1147            if (ret < 0) {
1148                nbd_channel_error(s, ret);
1149                nbd_iter_channel_error(&iter, ret, &local_err);
1150            }
1151            break;
1152        default:
1153            if (!nbd_reply_type_is_error(chunk->type)) {
1154                nbd_channel_error(s, -EINVAL);
1155                error_setg(&local_err,
1156                           "Unexpected reply type: %d (%s) "
1157                           "for CMD_BLOCK_STATUS",
1158                           chunk->type, nbd_reply_type_lookup(chunk->type));
1159                nbd_iter_channel_error(&iter, -EINVAL, &local_err);
1160            }
1161        }
1162
1163        g_free(payload);
1164        payload = NULL;
1165    }
1166
1167    if (!extent->length && !iter.request_ret) {
1168        error_setg(&local_err, "Server did not reply with any status extents");
1169        nbd_iter_channel_error(&iter, -EIO, &local_err);
1170    }
1171
1172    error_propagate(errp, iter.err);
1173    *request_ret = iter.request_ret;
1174    return iter.ret;
1175}
1176
1177static int coroutine_fn GRAPH_RDLOCK
1178nbd_co_request(BlockDriverState *bs, NBDRequest *request,
1179               QEMUIOVector *write_qiov)
1180{
1181    int ret, request_ret;
1182    Error *local_err = NULL;
1183    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1184
1185    assert(request->type != NBD_CMD_READ);
1186    if (write_qiov) {
1187        assert(request->type == NBD_CMD_WRITE);
1188        assert(request->len == iov_size(write_qiov->iov, write_qiov->niov));
1189    } else {
1190        assert(request->type != NBD_CMD_WRITE);
1191    }
1192
1193    do {
1194        ret = nbd_co_send_request(bs, request, write_qiov);
1195        if (ret < 0) {
1196            continue;
1197        }
1198
1199        ret = nbd_co_receive_return_code(s, request->cookie,
1200                                         &request_ret, &local_err);
1201        if (local_err) {
1202            trace_nbd_co_request_fail(request->from, request->len,
1203                                      request->cookie, request->flags,
1204                                      request->type,
1205                                      nbd_cmd_lookup(request->type),
1206                                      ret, error_get_pretty(local_err));
1207            error_free(local_err);
1208            local_err = NULL;
1209        }
1210    } while (ret < 0 && nbd_client_will_reconnect(s));
1211
1212    return ret ? ret : request_ret;
1213}
1214
1215static int coroutine_fn GRAPH_RDLOCK
1216nbd_client_co_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes,
1217                     QEMUIOVector *qiov, BdrvRequestFlags flags)
1218{
1219    int ret, request_ret;
1220    Error *local_err = NULL;
1221    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1222    NBDRequest request = {
1223        .type = NBD_CMD_READ,
1224        .from = offset,
1225        .len = bytes,
1226    };
1227
1228    assert(bytes <= NBD_MAX_BUFFER_SIZE);
1229
1230    if (!bytes) {
1231        return 0;
1232    }
1233    /*
1234     * Work around the fact that the block layer doesn't do
1235     * byte-accurate sizing yet - if the read exceeds the server's
1236     * advertised size because the block layer rounded size up, then
1237     * truncate the request to the server and tail-pad with zero.
1238     */
1239    if (offset >= s->info.size) {
1240        assert(bytes < BDRV_SECTOR_SIZE);
1241        qemu_iovec_memset(qiov, 0, 0, bytes);
1242        return 0;
1243    }
1244    if (offset + bytes > s->info.size) {
1245        uint64_t slop = offset + bytes - s->info.size;
1246
1247        assert(slop < BDRV_SECTOR_SIZE);
1248        qemu_iovec_memset(qiov, bytes - slop, 0, slop);
1249        request.len -= slop;
1250    }
1251
1252    do {
1253        ret = nbd_co_send_request(bs, &request, NULL);
1254        if (ret < 0) {
1255            continue;
1256        }
1257
1258        ret = nbd_co_receive_cmdread_reply(s, request.cookie, offset, qiov,
1259                                           &request_ret, &local_err);
1260        if (local_err) {
1261            trace_nbd_co_request_fail(request.from, request.len, request.cookie,
1262                                      request.flags, request.type,
1263                                      nbd_cmd_lookup(request.type),
1264                                      ret, error_get_pretty(local_err));
1265            error_free(local_err);
1266            local_err = NULL;
1267        }
1268    } while (ret < 0 && nbd_client_will_reconnect(s));
1269
1270    return ret ? ret : request_ret;
1271}
1272
1273static int coroutine_fn GRAPH_RDLOCK
1274nbd_client_co_pwritev(BlockDriverState *bs, int64_t offset, int64_t bytes,
1275                      QEMUIOVector *qiov, BdrvRequestFlags flags)
1276{
1277    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1278    NBDRequest request = {
1279        .type = NBD_CMD_WRITE,
1280        .from = offset,
1281        .len = bytes,
1282    };
1283
1284    assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
1285    if (flags & BDRV_REQ_FUA) {
1286        assert(s->info.flags & NBD_FLAG_SEND_FUA);
1287        request.flags |= NBD_CMD_FLAG_FUA;
1288    }
1289
1290    assert(bytes <= NBD_MAX_BUFFER_SIZE);
1291
1292    if (!bytes) {
1293        return 0;
1294    }
1295    return nbd_co_request(bs, &request, qiov);
1296}
1297
1298static int coroutine_fn GRAPH_RDLOCK
1299nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset, int64_t bytes,
1300                            BdrvRequestFlags flags)
1301{
1302    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1303    NBDRequest request = {
1304        .type = NBD_CMD_WRITE_ZEROES,
1305        .from = offset,
1306        .len = bytes,  /* .len is uint32_t actually */
1307    };
1308
1309    assert(bytes <= UINT32_MAX); /* rely on max_pwrite_zeroes */
1310
1311    assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
1312    if (!(s->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
1313        return -ENOTSUP;
1314    }
1315
1316    if (flags & BDRV_REQ_FUA) {
1317        assert(s->info.flags & NBD_FLAG_SEND_FUA);
1318        request.flags |= NBD_CMD_FLAG_FUA;
1319    }
1320    if (!(flags & BDRV_REQ_MAY_UNMAP)) {
1321        request.flags |= NBD_CMD_FLAG_NO_HOLE;
1322    }
1323    if (flags & BDRV_REQ_NO_FALLBACK) {
1324        assert(s->info.flags & NBD_FLAG_SEND_FAST_ZERO);
1325        request.flags |= NBD_CMD_FLAG_FAST_ZERO;
1326    }
1327
1328    if (!bytes) {
1329        return 0;
1330    }
1331    return nbd_co_request(bs, &request, NULL);
1332}
1333
1334static int coroutine_fn GRAPH_RDLOCK nbd_client_co_flush(BlockDriverState *bs)
1335{
1336    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1337    NBDRequest request = { .type = NBD_CMD_FLUSH };
1338
1339    if (!(s->info.flags & NBD_FLAG_SEND_FLUSH)) {
1340        return 0;
1341    }
1342
1343    request.from = 0;
1344    request.len = 0;
1345
1346    return nbd_co_request(bs, &request, NULL);
1347}
1348
1349static int coroutine_fn GRAPH_RDLOCK
1350nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int64_t bytes)
1351{
1352    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1353    NBDRequest request = {
1354        .type = NBD_CMD_TRIM,
1355        .from = offset,
1356        .len = bytes, /* len is uint32_t */
1357    };
1358
1359    assert(bytes <= UINT32_MAX); /* rely on max_pdiscard */
1360
1361    assert(!(s->info.flags & NBD_FLAG_READ_ONLY));
1362    if (!(s->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
1363        return 0;
1364    }
1365
1366    return nbd_co_request(bs, &request, NULL);
1367}
1368
1369static int coroutine_fn GRAPH_RDLOCK nbd_client_co_block_status(
1370        BlockDriverState *bs, bool want_zero, int64_t offset, int64_t bytes,
1371        int64_t *pnum, int64_t *map, BlockDriverState **file)
1372{
1373    int ret, request_ret;
1374    NBDExtent extent = { 0 };
1375    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1376    Error *local_err = NULL;
1377
1378    NBDRequest request = {
1379        .type = NBD_CMD_BLOCK_STATUS,
1380        .from = offset,
1381        .len = MIN(QEMU_ALIGN_DOWN(INT_MAX, bs->bl.request_alignment),
1382                   MIN(bytes, s->info.size - offset)),
1383        .flags = NBD_CMD_FLAG_REQ_ONE,
1384    };
1385
1386    if (!s->info.base_allocation) {
1387        *pnum = bytes;
1388        *map = offset;
1389        *file = bs;
1390        return BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1391    }
1392
1393    /*
1394     * Work around the fact that the block layer doesn't do
1395     * byte-accurate sizing yet - if the status request exceeds the
1396     * server's advertised size because the block layer rounded size
1397     * up, we truncated the request to the server (above), or are
1398     * called on just the hole.
1399     */
1400    if (offset >= s->info.size) {
1401        *pnum = bytes;
1402        assert(bytes < BDRV_SECTOR_SIZE);
1403        /* Intentionally don't report offset_valid for the hole */
1404        return BDRV_BLOCK_ZERO;
1405    }
1406
1407    if (s->info.min_block) {
1408        assert(QEMU_IS_ALIGNED(request.len, s->info.min_block));
1409    }
1410    do {
1411        ret = nbd_co_send_request(bs, &request, NULL);
1412        if (ret < 0) {
1413            continue;
1414        }
1415
1416        ret = nbd_co_receive_blockstatus_reply(s, request.cookie, bytes,
1417                                               &extent, &request_ret,
1418                                               &local_err);
1419        if (local_err) {
1420            trace_nbd_co_request_fail(request.from, request.len, request.cookie,
1421                                      request.flags, request.type,
1422                                      nbd_cmd_lookup(request.type),
1423                                      ret, error_get_pretty(local_err));
1424            error_free(local_err);
1425            local_err = NULL;
1426        }
1427    } while (ret < 0 && nbd_client_will_reconnect(s));
1428
1429    if (ret < 0 || request_ret < 0) {
1430        return ret ? ret : request_ret;
1431    }
1432
1433    assert(extent.length);
1434    *pnum = extent.length;
1435    *map = offset;
1436    *file = bs;
1437    return (extent.flags & NBD_STATE_HOLE ? 0 : BDRV_BLOCK_DATA) |
1438        (extent.flags & NBD_STATE_ZERO ? BDRV_BLOCK_ZERO : 0) |
1439        BDRV_BLOCK_OFFSET_VALID;
1440}
1441
1442static int nbd_client_reopen_prepare(BDRVReopenState *state,
1443                                     BlockReopenQueue *queue, Error **errp)
1444{
1445    BDRVNBDState *s = (BDRVNBDState *)state->bs->opaque;
1446
1447    if ((state->flags & BDRV_O_RDWR) && (s->info.flags & NBD_FLAG_READ_ONLY)) {
1448        error_setg(errp, "Can't reopen read-only NBD mount as read/write");
1449        return -EACCES;
1450    }
1451    return 0;
1452}
1453
1454static void nbd_yank(void *opaque)
1455{
1456    BlockDriverState *bs = opaque;
1457    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1458
1459    QEMU_LOCK_GUARD(&s->requests_lock);
1460    qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
1461    s->state = NBD_CLIENT_QUIT;
1462}
1463
1464static void nbd_client_close(BlockDriverState *bs)
1465{
1466    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1467    NBDRequest request = { .type = NBD_CMD_DISC };
1468
1469    if (s->ioc) {
1470        nbd_send_request(s->ioc, &request);
1471    }
1472
1473    nbd_teardown_connection(bs);
1474}
1475
1476
1477/*
1478 * Parse nbd_open options
1479 */
1480
1481static int nbd_parse_uri(const char *filename, QDict *options)
1482{
1483    URI *uri;
1484    const char *p;
1485    QueryParams *qp = NULL;
1486    int ret = 0;
1487    bool is_unix;
1488
1489    uri = uri_parse(filename);
1490    if (!uri) {
1491        return -EINVAL;
1492    }
1493
1494    /* transport */
1495    if (!g_strcmp0(uri->scheme, "nbd")) {
1496        is_unix = false;
1497    } else if (!g_strcmp0(uri->scheme, "nbd+tcp")) {
1498        is_unix = false;
1499    } else if (!g_strcmp0(uri->scheme, "nbd+unix")) {
1500        is_unix = true;
1501    } else {
1502        ret = -EINVAL;
1503        goto out;
1504    }
1505
1506    p = uri->path ? uri->path : "";
1507    if (p[0] == '/') {
1508        p++;
1509    }
1510    if (p[0]) {
1511        qdict_put_str(options, "export", p);
1512    }
1513
1514    qp = query_params_parse(uri->query);
1515    if (qp->n > 1 || (is_unix && !qp->n) || (!is_unix && qp->n)) {
1516        ret = -EINVAL;
1517        goto out;
1518    }
1519
1520    if (is_unix) {
1521        /* nbd+unix:///export?socket=path */
1522        if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) {
1523            ret = -EINVAL;
1524            goto out;
1525        }
1526        qdict_put_str(options, "server.type", "unix");
1527        qdict_put_str(options, "server.path", qp->p[0].value);
1528    } else {
1529        QString *host;
1530        char *port_str;
1531
1532        /* nbd[+tcp]://host[:port]/export */
1533        if (!uri->server) {
1534            ret = -EINVAL;
1535            goto out;
1536        }
1537
1538        /* strip braces from literal IPv6 address */
1539        if (uri->server[0] == '[') {
1540            host = qstring_from_substr(uri->server, 1,
1541                                       strlen(uri->server) - 1);
1542        } else {
1543            host = qstring_from_str(uri->server);
1544        }
1545
1546        qdict_put_str(options, "server.type", "inet");
1547        qdict_put(options, "server.host", host);
1548
1549        port_str = g_strdup_printf("%d", uri->port ?: NBD_DEFAULT_PORT);
1550        qdict_put_str(options, "server.port", port_str);
1551        g_free(port_str);
1552    }
1553
1554out:
1555    if (qp) {
1556        query_params_free(qp);
1557    }
1558    uri_free(uri);
1559    return ret;
1560}
1561
1562static bool nbd_has_filename_options_conflict(QDict *options, Error **errp)
1563{
1564    const QDictEntry *e;
1565
1566    for (e = qdict_first(options); e; e = qdict_next(options, e)) {
1567        if (!strcmp(e->key, "host") ||
1568            !strcmp(e->key, "port") ||
1569            !strcmp(e->key, "path") ||
1570            !strcmp(e->key, "export") ||
1571            strstart(e->key, "server.", NULL))
1572        {
1573            error_setg(errp, "Option '%s' cannot be used with a file name",
1574                       e->key);
1575            return true;
1576        }
1577    }
1578
1579    return false;
1580}
1581
1582static void nbd_parse_filename(const char *filename, QDict *options,
1583                               Error **errp)
1584{
1585    g_autofree char *file = NULL;
1586    char *export_name;
1587    const char *host_spec;
1588    const char *unixpath;
1589
1590    if (nbd_has_filename_options_conflict(options, errp)) {
1591        return;
1592    }
1593
1594    if (strstr(filename, "://")) {
1595        int ret = nbd_parse_uri(filename, options);
1596        if (ret < 0) {
1597            error_setg(errp, "No valid URL specified");
1598        }
1599        return;
1600    }
1601
1602    file = g_strdup(filename);
1603
1604    export_name = strstr(file, EN_OPTSTR);
1605    if (export_name) {
1606        if (export_name[strlen(EN_OPTSTR)] == 0) {
1607            return;
1608        }
1609        export_name[0] = 0; /* truncate 'file' */
1610        export_name += strlen(EN_OPTSTR);
1611
1612        qdict_put_str(options, "export", export_name);
1613    }
1614
1615    /* extract the host_spec - fail if it's not nbd:... */
1616    if (!strstart(file, "nbd:", &host_spec)) {
1617        error_setg(errp, "File name string for NBD must start with 'nbd:'");
1618        return;
1619    }
1620
1621    if (!*host_spec) {
1622        return;
1623    }
1624
1625    /* are we a UNIX or TCP socket? */
1626    if (strstart(host_spec, "unix:", &unixpath)) {
1627        qdict_put_str(options, "server.type", "unix");
1628        qdict_put_str(options, "server.path", unixpath);
1629    } else {
1630        InetSocketAddress *addr = g_new(InetSocketAddress, 1);
1631
1632        if (inet_parse(addr, host_spec, errp)) {
1633            goto out_inet;
1634        }
1635
1636        qdict_put_str(options, "server.type", "inet");
1637        qdict_put_str(options, "server.host", addr->host);
1638        qdict_put_str(options, "server.port", addr->port);
1639    out_inet:
1640        qapi_free_InetSocketAddress(addr);
1641    }
1642}
1643
1644static bool nbd_process_legacy_socket_options(QDict *output_options,
1645                                              QemuOpts *legacy_opts,
1646                                              Error **errp)
1647{
1648    const char *path = qemu_opt_get(legacy_opts, "path");
1649    const char *host = qemu_opt_get(legacy_opts, "host");
1650    const char *port = qemu_opt_get(legacy_opts, "port");
1651    const QDictEntry *e;
1652
1653    if (!path && !host && !port) {
1654        return true;
1655    }
1656
1657    for (e = qdict_first(output_options); e; e = qdict_next(output_options, e))
1658    {
1659        if (strstart(e->key, "server.", NULL)) {
1660            error_setg(errp, "Cannot use 'server' and path/host/port at the "
1661                       "same time");
1662            return false;
1663        }
1664    }
1665
1666    if (path && host) {
1667        error_setg(errp, "path and host may not be used at the same time");
1668        return false;
1669    } else if (path) {
1670        if (port) {
1671            error_setg(errp, "port may not be used without host");
1672            return false;
1673        }
1674
1675        qdict_put_str(output_options, "server.type", "unix");
1676        qdict_put_str(output_options, "server.path", path);
1677    } else if (host) {
1678        qdict_put_str(output_options, "server.type", "inet");
1679        qdict_put_str(output_options, "server.host", host);
1680        qdict_put_str(output_options, "server.port",
1681                      port ?: stringify(NBD_DEFAULT_PORT));
1682    }
1683
1684    return true;
1685}
1686
1687static SocketAddress *nbd_config(BDRVNBDState *s, QDict *options,
1688                                 Error **errp)
1689{
1690    SocketAddress *saddr = NULL;
1691    QDict *addr = NULL;
1692    Visitor *iv = NULL;
1693
1694    qdict_extract_subqdict(options, &addr, "server.");
1695    if (!qdict_size(addr)) {
1696        error_setg(errp, "NBD server address missing");
1697        goto done;
1698    }
1699
1700    iv = qobject_input_visitor_new_flat_confused(addr, errp);
1701    if (!iv) {
1702        goto done;
1703    }
1704
1705    if (!visit_type_SocketAddress(iv, NULL, &saddr, errp)) {
1706        goto done;
1707    }
1708
1709    if (socket_address_parse_named_fd(saddr, errp) < 0) {
1710        qapi_free_SocketAddress(saddr);
1711        saddr = NULL;
1712        goto done;
1713    }
1714
1715done:
1716    qobject_unref(addr);
1717    visit_free(iv);
1718    return saddr;
1719}
1720
1721static QCryptoTLSCreds *nbd_get_tls_creds(const char *id, Error **errp)
1722{
1723    Object *obj;
1724    QCryptoTLSCreds *creds;
1725
1726    obj = object_resolve_path_component(
1727        object_get_objects_root(), id);
1728    if (!obj) {
1729        error_setg(errp, "No TLS credentials with id '%s'",
1730                   id);
1731        return NULL;
1732    }
1733    creds = (QCryptoTLSCreds *)
1734        object_dynamic_cast(obj, TYPE_QCRYPTO_TLS_CREDS);
1735    if (!creds) {
1736        error_setg(errp, "Object with id '%s' is not TLS credentials",
1737                   id);
1738        return NULL;
1739    }
1740
1741    if (!qcrypto_tls_creds_check_endpoint(creds,
1742                                          QCRYPTO_TLS_CREDS_ENDPOINT_CLIENT,
1743                                          errp)) {
1744        return NULL;
1745    }
1746    object_ref(obj);
1747    return creds;
1748}
1749
1750
1751static QemuOptsList nbd_runtime_opts = {
1752    .name = "nbd",
1753    .head = QTAILQ_HEAD_INITIALIZER(nbd_runtime_opts.head),
1754    .desc = {
1755        {
1756            .name = "host",
1757            .type = QEMU_OPT_STRING,
1758            .help = "TCP host to connect to",
1759        },
1760        {
1761            .name = "port",
1762            .type = QEMU_OPT_STRING,
1763            .help = "TCP port to connect to",
1764        },
1765        {
1766            .name = "path",
1767            .type = QEMU_OPT_STRING,
1768            .help = "Unix socket path to connect to",
1769        },
1770        {
1771            .name = "export",
1772            .type = QEMU_OPT_STRING,
1773            .help = "Name of the NBD export to open",
1774        },
1775        {
1776            .name = "tls-creds",
1777            .type = QEMU_OPT_STRING,
1778            .help = "ID of the TLS credentials to use",
1779        },
1780        {
1781            .name = "tls-hostname",
1782            .type = QEMU_OPT_STRING,
1783            .help = "Override hostname for validating TLS x509 certificate",
1784        },
1785        {
1786            .name = "x-dirty-bitmap",
1787            .type = QEMU_OPT_STRING,
1788            .help = "experimental: expose named dirty bitmap in place of "
1789                    "block status",
1790        },
1791        {
1792            .name = "reconnect-delay",
1793            .type = QEMU_OPT_NUMBER,
1794            .help = "On an unexpected disconnect, the nbd client tries to "
1795                    "connect again until succeeding or encountering a serious "
1796                    "error.  During the first @reconnect-delay seconds, all "
1797                    "requests are paused and will be rerun on a successful "
1798                    "reconnect. After that time, any delayed requests and all "
1799                    "future requests before a successful reconnect will "
1800                    "immediately fail. Default 0",
1801        },
1802        {
1803            .name = "open-timeout",
1804            .type = QEMU_OPT_NUMBER,
1805            .help = "In seconds. If zero, the nbd driver tries the connection "
1806                    "only once, and fails to open if the connection fails. "
1807                    "If non-zero, the nbd driver will repeat connection "
1808                    "attempts until successful or until @open-timeout seconds "
1809                    "have elapsed. Default 0",
1810        },
1811        { /* end of list */ }
1812    },
1813};
1814
1815static int nbd_process_options(BlockDriverState *bs, QDict *options,
1816                               Error **errp)
1817{
1818    BDRVNBDState *s = bs->opaque;
1819    QemuOpts *opts;
1820    int ret = -EINVAL;
1821
1822    opts = qemu_opts_create(&nbd_runtime_opts, NULL, 0, &error_abort);
1823    if (!qemu_opts_absorb_qdict(opts, options, errp)) {
1824        goto error;
1825    }
1826
1827    /* Translate @host, @port, and @path to a SocketAddress */
1828    if (!nbd_process_legacy_socket_options(options, opts, errp)) {
1829        goto error;
1830    }
1831
1832    /* Pop the config into our state object. Exit if invalid. */
1833    s->saddr = nbd_config(s, options, errp);
1834    if (!s->saddr) {
1835        goto error;
1836    }
1837
1838    s->export = g_strdup(qemu_opt_get(opts, "export"));
1839    if (s->export && strlen(s->export) > NBD_MAX_STRING_SIZE) {
1840        error_setg(errp, "export name too long to send to server");
1841        goto error;
1842    }
1843
1844    s->tlscredsid = g_strdup(qemu_opt_get(opts, "tls-creds"));
1845    if (s->tlscredsid) {
1846        s->tlscreds = nbd_get_tls_creds(s->tlscredsid, errp);
1847        if (!s->tlscreds) {
1848            goto error;
1849        }
1850
1851        s->tlshostname = g_strdup(qemu_opt_get(opts, "tls-hostname"));
1852        if (!s->tlshostname &&
1853            s->saddr->type == SOCKET_ADDRESS_TYPE_INET) {
1854            s->tlshostname = g_strdup(s->saddr->u.inet.host);
1855        }
1856    }
1857
1858    s->x_dirty_bitmap = g_strdup(qemu_opt_get(opts, "x-dirty-bitmap"));
1859    if (s->x_dirty_bitmap && strlen(s->x_dirty_bitmap) > NBD_MAX_STRING_SIZE) {
1860        error_setg(errp, "x-dirty-bitmap query too long to send to server");
1861        goto error;
1862    }
1863
1864    s->reconnect_delay = qemu_opt_get_number(opts, "reconnect-delay", 0);
1865    s->open_timeout = qemu_opt_get_number(opts, "open-timeout", 0);
1866
1867    ret = 0;
1868
1869 error:
1870    qemu_opts_del(opts);
1871    return ret;
1872}
1873
1874static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
1875                    Error **errp)
1876{
1877    int ret;
1878    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1879
1880    s->bs = bs;
1881    qemu_mutex_init(&s->requests_lock);
1882    qemu_co_queue_init(&s->free_sema);
1883    qemu_co_mutex_init(&s->send_mutex);
1884    qemu_co_mutex_init(&s->receive_mutex);
1885
1886    if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
1887        return -EEXIST;
1888    }
1889
1890    ret = nbd_process_options(bs, options, errp);
1891    if (ret < 0) {
1892        goto fail;
1893    }
1894
1895    s->conn = nbd_client_connection_new(s->saddr, true, s->export,
1896                                        s->x_dirty_bitmap, s->tlscreds,
1897                                        s->tlshostname);
1898
1899    if (s->open_timeout) {
1900        nbd_client_connection_enable_retry(s->conn);
1901        open_timer_init(s, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
1902                        s->open_timeout * NANOSECONDS_PER_SECOND);
1903    }
1904
1905    s->state = NBD_CLIENT_CONNECTING_WAIT;
1906    ret = nbd_do_establish_connection(bs, true, errp);
1907    if (ret < 0) {
1908        goto fail;
1909    }
1910
1911    /*
1912     * The connect attempt is done, so we no longer need this timer.
1913     * Delete it, because we do not want it to be around when this node
1914     * is drained or closed.
1915     */
1916    open_timer_del(s);
1917
1918    nbd_client_connection_enable_retry(s->conn);
1919
1920    return 0;
1921
1922fail:
1923    open_timer_del(s);
1924    nbd_clear_bdrvstate(bs);
1925    return ret;
1926}
1927
1928static void nbd_refresh_limits(BlockDriverState *bs, Error **errp)
1929{
1930    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
1931    uint32_t min = s->info.min_block;
1932    uint32_t max = MIN_NON_ZERO(NBD_MAX_BUFFER_SIZE, s->info.max_block);
1933
1934    /*
1935     * If the server did not advertise an alignment:
1936     * - a size that is not sector-aligned implies that an alignment
1937     *   of 1 can be used to access those tail bytes
1938     * - advertisement of block status requires an alignment of 1, so
1939     *   that we don't violate block layer constraints that block
1940     *   status is always aligned (as we can't control whether the
1941     *   server will report sub-sector extents, such as a hole at EOF
1942     *   on an unaligned POSIX file)
1943     * - otherwise, assume the server is so old that we are safer avoiding
1944     *   sub-sector requests
1945     */
1946    if (!min) {
1947        min = (!QEMU_IS_ALIGNED(s->info.size, BDRV_SECTOR_SIZE) ||
1948               s->info.base_allocation) ? 1 : BDRV_SECTOR_SIZE;
1949    }
1950
1951    bs->bl.request_alignment = min;
1952    bs->bl.max_pdiscard = QEMU_ALIGN_DOWN(INT_MAX, min);
1953    bs->bl.max_pwrite_zeroes = max;
1954    bs->bl.max_transfer = max;
1955
1956    if (s->info.opt_block &&
1957        s->info.opt_block > bs->bl.opt_transfer) {
1958        bs->bl.opt_transfer = s->info.opt_block;
1959    }
1960}
1961
1962static void nbd_close(BlockDriverState *bs)
1963{
1964    nbd_client_close(bs);
1965    nbd_clear_bdrvstate(bs);
1966}
1967
1968/*
1969 * NBD cannot truncate, but if the caller asks to truncate to the same size, or
1970 * to a smaller size with exact=false, there is no reason to fail the
1971 * operation.
1972 *
1973 * Preallocation mode is ignored since it does not seems useful to fail when
1974 * we never change anything.
1975 */
1976static int coroutine_fn nbd_co_truncate(BlockDriverState *bs, int64_t offset,
1977                                        bool exact, PreallocMode prealloc,
1978                                        BdrvRequestFlags flags, Error **errp)
1979{
1980    BDRVNBDState *s = bs->opaque;
1981
1982    if (offset != s->info.size && exact) {
1983        error_setg(errp, "Cannot resize NBD nodes");
1984        return -ENOTSUP;
1985    }
1986
1987    if (offset > s->info.size) {
1988        error_setg(errp, "Cannot grow NBD nodes");
1989        return -EINVAL;
1990    }
1991
1992    return 0;
1993}
1994
1995static int64_t coroutine_fn nbd_co_getlength(BlockDriverState *bs)
1996{
1997    BDRVNBDState *s = bs->opaque;
1998
1999    return s->info.size;
2000}
2001
2002static void nbd_refresh_filename(BlockDriverState *bs)
2003{
2004    BDRVNBDState *s = bs->opaque;
2005    const char *host = NULL, *port = NULL, *path = NULL;
2006    size_t len = 0;
2007
2008    if (s->saddr->type == SOCKET_ADDRESS_TYPE_INET) {
2009        const InetSocketAddress *inet = &s->saddr->u.inet;
2010        if (!inet->has_ipv4 && !inet->has_ipv6 && !inet->has_to) {
2011            host = inet->host;
2012            port = inet->port;
2013        }
2014    } else if (s->saddr->type == SOCKET_ADDRESS_TYPE_UNIX) {
2015        path = s->saddr->u.q_unix.path;
2016    } /* else can't represent as pseudo-filename */
2017
2018    if (path && s->export) {
2019        len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
2020                       "nbd+unix:///%s?socket=%s", s->export, path);
2021    } else if (path && !s->export) {
2022        len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
2023                       "nbd+unix://?socket=%s", path);
2024    } else if (host && s->export) {
2025        len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
2026                       "nbd://%s:%s/%s", host, port, s->export);
2027    } else if (host && !s->export) {
2028        len = snprintf(bs->exact_filename, sizeof(bs->exact_filename),
2029                       "nbd://%s:%s", host, port);
2030    }
2031    if (len >= sizeof(bs->exact_filename)) {
2032        /* Name is too long to represent exactly, so leave it empty. */
2033        bs->exact_filename[0] = '\0';
2034    }
2035}
2036
2037static char *nbd_dirname(BlockDriverState *bs, Error **errp)
2038{
2039    /* The generic bdrv_dirname() implementation is able to work out some
2040     * directory name for NBD nodes, but that would be wrong. So far there is no
2041     * specification for how "export paths" would work, so NBD does not have
2042     * directory names. */
2043    error_setg(errp, "Cannot generate a base directory for NBD nodes");
2044    return NULL;
2045}
2046
2047static const char *const nbd_strong_runtime_opts[] = {
2048    "path",
2049    "host",
2050    "port",
2051    "export",
2052    "tls-creds",
2053    "tls-hostname",
2054    "server.",
2055
2056    NULL
2057};
2058
2059static void nbd_cancel_in_flight(BlockDriverState *bs)
2060{
2061    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
2062
2063    reconnect_delay_timer_del(s);
2064
2065    qemu_mutex_lock(&s->requests_lock);
2066    if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
2067        s->state = NBD_CLIENT_CONNECTING_NOWAIT;
2068    }
2069    qemu_mutex_unlock(&s->requests_lock);
2070
2071    nbd_co_establish_connection_cancel(s->conn);
2072}
2073
2074static void nbd_attach_aio_context(BlockDriverState *bs,
2075                                   AioContext *new_context)
2076{
2077    BDRVNBDState *s = bs->opaque;
2078
2079    /* The open_timer is used only during nbd_open() */
2080    assert(!s->open_timer);
2081
2082    /*
2083     * The reconnect_delay_timer is scheduled in I/O paths when the
2084     * connection is lost, to cancel the reconnection attempt after a
2085     * given time.  Once this attempt is done (successfully or not),
2086     * nbd_reconnect_attempt() ensures the timer is deleted before the
2087     * respective I/O request is resumed.
2088     * Since the AioContext can only be changed when a node is drained,
2089     * the reconnect_delay_timer cannot be active here.
2090     */
2091    assert(!s->reconnect_delay_timer);
2092
2093    if (s->ioc) {
2094        qio_channel_attach_aio_context(s->ioc, new_context);
2095    }
2096}
2097
2098static void nbd_detach_aio_context(BlockDriverState *bs)
2099{
2100    BDRVNBDState *s = bs->opaque;
2101
2102    assert(!s->open_timer);
2103    assert(!s->reconnect_delay_timer);
2104
2105    if (s->ioc) {
2106        qio_channel_detach_aio_context(s->ioc);
2107    }
2108}
2109
2110static BlockDriver bdrv_nbd = {
2111    .format_name                = "nbd",
2112    .protocol_name              = "nbd",
2113    .instance_size              = sizeof(BDRVNBDState),
2114    .bdrv_parse_filename        = nbd_parse_filename,
2115    .bdrv_co_create_opts        = bdrv_co_create_opts_simple,
2116    .create_opts                = &bdrv_create_opts_simple,
2117    .bdrv_file_open             = nbd_open,
2118    .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
2119    .bdrv_co_preadv             = nbd_client_co_preadv,
2120    .bdrv_co_pwritev            = nbd_client_co_pwritev,
2121    .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
2122    .bdrv_close                 = nbd_close,
2123    .bdrv_co_flush_to_os        = nbd_client_co_flush,
2124    .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
2125    .bdrv_refresh_limits        = nbd_refresh_limits,
2126    .bdrv_co_truncate           = nbd_co_truncate,
2127    .bdrv_co_getlength          = nbd_co_getlength,
2128    .bdrv_refresh_filename      = nbd_refresh_filename,
2129    .bdrv_co_block_status       = nbd_client_co_block_status,
2130    .bdrv_dirname               = nbd_dirname,
2131    .strong_runtime_opts        = nbd_strong_runtime_opts,
2132    .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
2133
2134    .bdrv_attach_aio_context    = nbd_attach_aio_context,
2135    .bdrv_detach_aio_context    = nbd_detach_aio_context,
2136};
2137
2138static BlockDriver bdrv_nbd_tcp = {
2139    .format_name                = "nbd",
2140    .protocol_name              = "nbd+tcp",
2141    .instance_size              = sizeof(BDRVNBDState),
2142    .bdrv_parse_filename        = nbd_parse_filename,
2143    .bdrv_co_create_opts        = bdrv_co_create_opts_simple,
2144    .create_opts                = &bdrv_create_opts_simple,
2145    .bdrv_file_open             = nbd_open,
2146    .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
2147    .bdrv_co_preadv             = nbd_client_co_preadv,
2148    .bdrv_co_pwritev            = nbd_client_co_pwritev,
2149    .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
2150    .bdrv_close                 = nbd_close,
2151    .bdrv_co_flush_to_os        = nbd_client_co_flush,
2152    .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
2153    .bdrv_refresh_limits        = nbd_refresh_limits,
2154    .bdrv_co_truncate           = nbd_co_truncate,
2155    .bdrv_co_getlength          = nbd_co_getlength,
2156    .bdrv_refresh_filename      = nbd_refresh_filename,
2157    .bdrv_co_block_status       = nbd_client_co_block_status,
2158    .bdrv_dirname               = nbd_dirname,
2159    .strong_runtime_opts        = nbd_strong_runtime_opts,
2160    .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
2161
2162    .bdrv_attach_aio_context    = nbd_attach_aio_context,
2163    .bdrv_detach_aio_context    = nbd_detach_aio_context,
2164};
2165
2166static BlockDriver bdrv_nbd_unix = {
2167    .format_name                = "nbd",
2168    .protocol_name              = "nbd+unix",
2169    .instance_size              = sizeof(BDRVNBDState),
2170    .bdrv_parse_filename        = nbd_parse_filename,
2171    .bdrv_co_create_opts        = bdrv_co_create_opts_simple,
2172    .create_opts                = &bdrv_create_opts_simple,
2173    .bdrv_file_open             = nbd_open,
2174    .bdrv_reopen_prepare        = nbd_client_reopen_prepare,
2175    .bdrv_co_preadv             = nbd_client_co_preadv,
2176    .bdrv_co_pwritev            = nbd_client_co_pwritev,
2177    .bdrv_co_pwrite_zeroes      = nbd_client_co_pwrite_zeroes,
2178    .bdrv_close                 = nbd_close,
2179    .bdrv_co_flush_to_os        = nbd_client_co_flush,
2180    .bdrv_co_pdiscard           = nbd_client_co_pdiscard,
2181    .bdrv_refresh_limits        = nbd_refresh_limits,
2182    .bdrv_co_truncate           = nbd_co_truncate,
2183    .bdrv_co_getlength          = nbd_co_getlength,
2184    .bdrv_refresh_filename      = nbd_refresh_filename,
2185    .bdrv_co_block_status       = nbd_client_co_block_status,
2186    .bdrv_dirname               = nbd_dirname,
2187    .strong_runtime_opts        = nbd_strong_runtime_opts,
2188    .bdrv_cancel_in_flight      = nbd_cancel_in_flight,
2189
2190    .bdrv_attach_aio_context    = nbd_attach_aio_context,
2191    .bdrv_detach_aio_context    = nbd_detach_aio_context,
2192};
2193
2194static void bdrv_nbd_init(void)
2195{
2196    bdrv_register(&bdrv_nbd);
2197    bdrv_register(&bdrv_nbd_tcp);
2198    bdrv_register(&bdrv_nbd_unix);
2199}
2200
2201block_init(bdrv_nbd_init);
2202