qemu/nbd/server.c
<<
>>
Prefs
   1/*
   2 *  Copyright Red Hat
   3 *  Copyright (C) 2005  Anthony Liguori <anthony@codemonkey.ws>
   4 *
   5 *  Network Block Device Server Side
   6 *
   7 *  This program is free software; you can redistribute it and/or modify
   8 *  it under the terms of the GNU General Public License as published by
   9 *  the Free Software Foundation; under version 2 of the License.
  10 *
  11 *  This program is distributed in the hope that it will be useful,
  12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14 *  GNU General Public License for more details.
  15 *
  16 *  You should have received a copy of the GNU General Public License
  17 *  along with this program; if not, see <http://www.gnu.org/licenses/>.
  18 */
  19
  20#include "qemu/osdep.h"
  21
  22#include "block/block_int.h"
  23#include "block/export.h"
  24#include "block/dirty-bitmap.h"
  25#include "qapi/error.h"
  26#include "qemu/queue.h"
  27#include "trace.h"
  28#include "nbd-internal.h"
  29#include "qemu/units.h"
  30#include "qemu/memalign.h"
  31
  32#define NBD_META_ID_BASE_ALLOCATION 0
  33#define NBD_META_ID_ALLOCATION_DEPTH 1
  34/* Dirty bitmaps use 'NBD_META_ID_DIRTY_BITMAP + i', so keep this id last. */
  35#define NBD_META_ID_DIRTY_BITMAP 2
  36
  37/*
  38 * NBD_MAX_BLOCK_STATUS_EXTENTS: 1 MiB of extents data. An empirical
  39 * constant. If an increase is needed, note that the NBD protocol
  40 * recommends no larger than 32 mb, so that the client won't consider
  41 * the reply as a denial of service attack.
  42 */
  43#define NBD_MAX_BLOCK_STATUS_EXTENTS (1 * MiB / 8)
  44
  45static int system_errno_to_nbd_errno(int err)
  46{
  47    switch (err) {
  48    case 0:
  49        return NBD_SUCCESS;
  50    case EPERM:
  51    case EROFS:
  52        return NBD_EPERM;
  53    case EIO:
  54        return NBD_EIO;
  55    case ENOMEM:
  56        return NBD_ENOMEM;
  57#ifdef EDQUOT
  58    case EDQUOT:
  59#endif
  60    case EFBIG:
  61    case ENOSPC:
  62        return NBD_ENOSPC;
  63    case EOVERFLOW:
  64        return NBD_EOVERFLOW;
  65    case ENOTSUP:
  66#if ENOTSUP != EOPNOTSUPP
  67    case EOPNOTSUPP:
  68#endif
  69        return NBD_ENOTSUP;
  70    case ESHUTDOWN:
  71        return NBD_ESHUTDOWN;
  72    case EINVAL:
  73    default:
  74        return NBD_EINVAL;
  75    }
  76}
  77
  78/* Definitions for opaque data types */
  79
  80typedef struct NBDRequestData NBDRequestData;
  81
  82struct NBDRequestData {
  83    NBDClient *client;
  84    uint8_t *data;
  85    bool complete;
  86};
  87
  88struct NBDExport {
  89    BlockExport common;
  90
  91    char *name;
  92    char *description;
  93    uint64_t size;
  94    uint16_t nbdflags;
  95    QTAILQ_HEAD(, NBDClient) clients;
  96    QTAILQ_ENTRY(NBDExport) next;
  97
  98    BlockBackend *eject_notifier_blk;
  99    Notifier eject_notifier;
 100
 101    bool allocation_depth;
 102    BdrvDirtyBitmap **export_bitmaps;
 103    size_t nr_export_bitmaps;
 104};
 105
 106static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
 107
 108/*
 109 * NBDMetaContexts represents a list of meta contexts in use,
 110 * as selected by NBD_OPT_SET_META_CONTEXT. Also used for
 111 * NBD_OPT_LIST_META_CONTEXT.
 112 */
 113struct NBDMetaContexts {
 114    const NBDExport *exp; /* associated export */
 115    size_t count; /* number of negotiated contexts */
 116    bool base_allocation; /* export base:allocation context (block status) */
 117    bool allocation_depth; /* export qemu:allocation-depth */
 118    bool *bitmaps; /*
 119                    * export qemu:dirty-bitmap:<export bitmap name>,
 120                    * sized by exp->nr_export_bitmaps
 121                    */
 122};
 123
 124struct NBDClient {
 125    int refcount; /* atomic */
 126    void (*close_fn)(NBDClient *client, bool negotiated);
 127    void *owner;
 128
 129    QemuMutex lock;
 130
 131    NBDExport *exp;
 132    QCryptoTLSCreds *tlscreds;
 133    char *tlsauthz;
 134    uint32_t handshake_max_secs;
 135    QIOChannelSocket *sioc; /* The underlying data channel */
 136    QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
 137
 138    Coroutine *recv_coroutine; /* protected by lock */
 139
 140    CoMutex send_lock;
 141    Coroutine *send_coroutine;
 142
 143    bool read_yielding; /* protected by lock */
 144    bool quiescing; /* protected by lock */
 145
 146    QTAILQ_ENTRY(NBDClient) next;
 147    int nb_requests; /* protected by lock */
 148    bool closing; /* protected by lock */
 149
 150    uint32_t check_align; /* If non-zero, check for aligned client requests */
 151
 152    NBDMode mode;
 153    NBDMetaContexts contexts; /* Negotiated meta contexts */
 154
 155    uint32_t opt; /* Current option being negotiated */
 156    uint32_t optlen; /* remaining length of data in ioc for the option being
 157                        negotiated now */
 158};
 159
 160static void nbd_client_receive_next_request(NBDClient *client);
 161
 162/* Basic flow for negotiation
 163
 164   Server         Client
 165   Negotiate
 166
 167   or
 168
 169   Server         Client
 170   Negotiate #1
 171                  Option
 172   Negotiate #2
 173
 174   ----
 175
 176   followed by
 177
 178   Server         Client
 179                  Request
 180   Response
 181                  Request
 182   Response
 183                  ...
 184   ...
 185                  Request (type == 2)
 186
 187*/
 188
 189static inline void set_be_option_rep(NBDOptionReply *rep, uint32_t option,
 190                                     uint32_t type, uint32_t length)
 191{
 192    stq_be_p(&rep->magic, NBD_REP_MAGIC);
 193    stl_be_p(&rep->option, option);
 194    stl_be_p(&rep->type, type);
 195    stl_be_p(&rep->length, length);
 196}
 197
 198/* Send a reply header, including length, but no payload.
 199 * Return -errno on error, 0 on success. */
 200static coroutine_fn int
 201nbd_negotiate_send_rep_len(NBDClient *client, uint32_t type,
 202                           uint32_t len, Error **errp)
 203{
 204    NBDOptionReply rep;
 205
 206    trace_nbd_negotiate_send_rep_len(client->opt, nbd_opt_lookup(client->opt),
 207                                     type, nbd_rep_lookup(type), len);
 208
 209    assert(len < NBD_MAX_BUFFER_SIZE);
 210
 211    set_be_option_rep(&rep, client->opt, type, len);
 212    return nbd_write(client->ioc, &rep, sizeof(rep), errp);
 213}
 214
 215/* Send a reply header with default 0 length.
 216 * Return -errno on error, 0 on success. */
 217static coroutine_fn int
 218nbd_negotiate_send_rep(NBDClient *client, uint32_t type, Error **errp)
 219{
 220    return nbd_negotiate_send_rep_len(client, type, 0, errp);
 221}
 222
 223/* Send an error reply.
 224 * Return -errno on error, 0 on success. */
 225static coroutine_fn int G_GNUC_PRINTF(4, 0)
 226nbd_negotiate_send_rep_verr(NBDClient *client, uint32_t type,
 227                            Error **errp, const char *fmt, va_list va)
 228{
 229    ERRP_GUARD();
 230    g_autofree char *msg = NULL;
 231    int ret;
 232    size_t len;
 233
 234    msg = g_strdup_vprintf(fmt, va);
 235    len = strlen(msg);
 236    assert(len < NBD_MAX_STRING_SIZE);
 237    trace_nbd_negotiate_send_rep_err(msg);
 238    ret = nbd_negotiate_send_rep_len(client, type, len, errp);
 239    if (ret < 0) {
 240        return ret;
 241    }
 242    if (nbd_write(client->ioc, msg, len, errp) < 0) {
 243        error_prepend(errp, "write failed (error message): ");
 244        return -EIO;
 245    }
 246
 247    return 0;
 248}
 249
 250/*
 251 * Return a malloc'd copy of @name suitable for use in an error reply.
 252 */
 253static char *
 254nbd_sanitize_name(const char *name)
 255{
 256    if (strnlen(name, 80) < 80) {
 257        return g_strdup(name);
 258    }
 259    /* XXX Should we also try to sanitize any control characters? */
 260    return g_strdup_printf("%.80s...", name);
 261}
 262
 263/* Send an error reply.
 264 * Return -errno on error, 0 on success. */
 265static coroutine_fn int G_GNUC_PRINTF(4, 5)
 266nbd_negotiate_send_rep_err(NBDClient *client, uint32_t type,
 267                           Error **errp, const char *fmt, ...)
 268{
 269    va_list va;
 270    int ret;
 271
 272    va_start(va, fmt);
 273    ret = nbd_negotiate_send_rep_verr(client, type, errp, fmt, va);
 274    va_end(va);
 275    return ret;
 276}
 277
 278/* Drop remainder of the current option, and send a reply with the
 279 * given error type and message. Return -errno on read or write
 280 * failure; or 0 if connection is still live. */
 281static coroutine_fn int G_GNUC_PRINTF(4, 0)
 282nbd_opt_vdrop(NBDClient *client, uint32_t type, Error **errp,
 283              const char *fmt, va_list va)
 284{
 285    int ret = nbd_drop(client->ioc, client->optlen, errp);
 286
 287    client->optlen = 0;
 288    if (!ret) {
 289        ret = nbd_negotiate_send_rep_verr(client, type, errp, fmt, va);
 290    }
 291    return ret;
 292}
 293
 294static coroutine_fn int G_GNUC_PRINTF(4, 5)
 295nbd_opt_drop(NBDClient *client, uint32_t type, Error **errp,
 296             const char *fmt, ...)
 297{
 298    int ret;
 299    va_list va;
 300
 301    va_start(va, fmt);
 302    ret = nbd_opt_vdrop(client, type, errp, fmt, va);
 303    va_end(va);
 304
 305    return ret;
 306}
 307
 308static coroutine_fn int G_GNUC_PRINTF(3, 4)
 309nbd_opt_invalid(NBDClient *client, Error **errp, const char *fmt, ...)
 310{
 311    int ret;
 312    va_list va;
 313
 314    va_start(va, fmt);
 315    ret = nbd_opt_vdrop(client, NBD_REP_ERR_INVALID, errp, fmt, va);
 316    va_end(va);
 317
 318    return ret;
 319}
 320
 321/* Read size bytes from the unparsed payload of the current option.
 322 * If @check_nul, require that no NUL bytes appear in buffer.
 323 * Return -errno on I/O error, 0 if option was completely handled by
 324 * sending a reply about inconsistent lengths, or 1 on success. */
 325static coroutine_fn int
 326nbd_opt_read(NBDClient *client, void *buffer, size_t size,
 327             bool check_nul, Error **errp)
 328{
 329    if (size > client->optlen) {
 330        return nbd_opt_invalid(client, errp,
 331                               "Inconsistent lengths in option %s",
 332                               nbd_opt_lookup(client->opt));
 333    }
 334    client->optlen -= size;
 335    if (qio_channel_read_all(client->ioc, buffer, size, errp) < 0) {
 336        return -EIO;
 337    }
 338
 339    if (check_nul && strnlen(buffer, size) != size) {
 340        return nbd_opt_invalid(client, errp,
 341                               "Unexpected embedded NUL in option %s",
 342                               nbd_opt_lookup(client->opt));
 343    }
 344    return 1;
 345}
 346
 347/* Drop size bytes from the unparsed payload of the current option.
 348 * Return -errno on I/O error, 0 if option was completely handled by
 349 * sending a reply about inconsistent lengths, or 1 on success. */
 350static coroutine_fn int
 351nbd_opt_skip(NBDClient *client, size_t size, Error **errp)
 352{
 353    if (size > client->optlen) {
 354        return nbd_opt_invalid(client, errp,
 355                               "Inconsistent lengths in option %s",
 356                               nbd_opt_lookup(client->opt));
 357    }
 358    client->optlen -= size;
 359    return nbd_drop(client->ioc, size, errp) < 0 ? -EIO : 1;
 360}
 361
 362/* nbd_opt_read_name
 363 *
 364 * Read a string with the format:
 365 *   uint32_t len     (<= NBD_MAX_STRING_SIZE)
 366 *   len bytes string (not 0-terminated)
 367 *
 368 * On success, @name will be allocated.
 369 * If @length is non-null, it will be set to the actual string length.
 370 *
 371 * Return -errno on I/O error, 0 if option was completely handled by
 372 * sending a reply about inconsistent lengths, or 1 on success.
 373 */
 374static coroutine_fn int
 375nbd_opt_read_name(NBDClient *client, char **name, uint32_t *length,
 376                  Error **errp)
 377{
 378    int ret;
 379    uint32_t len;
 380    g_autofree char *local_name = NULL;
 381
 382    *name = NULL;
 383    ret = nbd_opt_read(client, &len, sizeof(len), false, errp);
 384    if (ret <= 0) {
 385        return ret;
 386    }
 387    len = cpu_to_be32(len);
 388
 389    if (len > NBD_MAX_STRING_SIZE) {
 390        return nbd_opt_invalid(client, errp,
 391                               "Invalid name length: %" PRIu32, len);
 392    }
 393
 394    local_name = g_malloc(len + 1);
 395    ret = nbd_opt_read(client, local_name, len, true, errp);
 396    if (ret <= 0) {
 397        return ret;
 398    }
 399    local_name[len] = '\0';
 400
 401    if (length) {
 402        *length = len;
 403    }
 404    *name = g_steal_pointer(&local_name);
 405
 406    return 1;
 407}
 408
 409/* Send a single NBD_REP_SERVER reply to NBD_OPT_LIST, including payload.
 410 * Return -errno on error, 0 on success. */
 411static coroutine_fn int
 412nbd_negotiate_send_rep_list(NBDClient *client, NBDExport *exp, Error **errp)
 413{
 414    ERRP_GUARD();
 415    size_t name_len, desc_len;
 416    uint32_t len;
 417    const char *name = exp->name ? exp->name : "";
 418    const char *desc = exp->description ? exp->description : "";
 419    QIOChannel *ioc = client->ioc;
 420    int ret;
 421
 422    trace_nbd_negotiate_send_rep_list(name, desc);
 423    name_len = strlen(name);
 424    desc_len = strlen(desc);
 425    assert(name_len <= NBD_MAX_STRING_SIZE && desc_len <= NBD_MAX_STRING_SIZE);
 426    len = name_len + desc_len + sizeof(len);
 427    ret = nbd_negotiate_send_rep_len(client, NBD_REP_SERVER, len, errp);
 428    if (ret < 0) {
 429        return ret;
 430    }
 431
 432    len = cpu_to_be32(name_len);
 433    if (nbd_write(ioc, &len, sizeof(len), errp) < 0) {
 434        error_prepend(errp, "write failed (name length): ");
 435        return -EINVAL;
 436    }
 437
 438    if (nbd_write(ioc, name, name_len, errp) < 0) {
 439        error_prepend(errp, "write failed (name buffer): ");
 440        return -EINVAL;
 441    }
 442
 443    if (nbd_write(ioc, desc, desc_len, errp) < 0) {
 444        error_prepend(errp, "write failed (description buffer): ");
 445        return -EINVAL;
 446    }
 447
 448    return 0;
 449}
 450
 451/* Process the NBD_OPT_LIST command, with a potential series of replies.
 452 * Return -errno on error, 0 on success. */
 453static coroutine_fn int
 454nbd_negotiate_handle_list(NBDClient *client, Error **errp)
 455{
 456    NBDExport *exp;
 457    assert(client->opt == NBD_OPT_LIST);
 458
 459    /* For each export, send a NBD_REP_SERVER reply. */
 460    QTAILQ_FOREACH(exp, &exports, next) {
 461        if (nbd_negotiate_send_rep_list(client, exp, errp)) {
 462            return -EINVAL;
 463        }
 464    }
 465    /* Finish with a NBD_REP_ACK. */
 466    return nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
 467}
 468
 469static coroutine_fn void
 470nbd_check_meta_export(NBDClient *client, NBDExport *exp)
 471{
 472    if (exp != client->contexts.exp) {
 473        client->contexts.count = 0;
 474    }
 475}
 476
 477/* Send a reply to NBD_OPT_EXPORT_NAME.
 478 * Return -errno on error, 0 on success. */
 479static coroutine_fn int
 480nbd_negotiate_handle_export_name(NBDClient *client, bool no_zeroes,
 481                                 Error **errp)
 482{
 483    ERRP_GUARD();
 484    g_autofree char *name = NULL;
 485    char buf[NBD_REPLY_EXPORT_NAME_SIZE] = "";
 486    size_t len;
 487    int ret;
 488    uint16_t myflags;
 489
 490    /* Client sends:
 491        [20 ..  xx]   export name (length bytes)
 492       Server replies:
 493        [ 0 ..   7]   size
 494        [ 8 ..   9]   export flags
 495        [10 .. 133]   reserved     (0) [unless no_zeroes]
 496     */
 497    trace_nbd_negotiate_handle_export_name();
 498    if (client->mode >= NBD_MODE_EXTENDED) {
 499        error_setg(errp, "Extended headers already negotiated");
 500        return -EINVAL;
 501    }
 502    if (client->optlen > NBD_MAX_STRING_SIZE) {
 503        error_setg(errp, "Bad length received");
 504        return -EINVAL;
 505    }
 506    name = g_malloc(client->optlen + 1);
 507    if (nbd_read(client->ioc, name, client->optlen, "export name", errp) < 0) {
 508        return -EIO;
 509    }
 510    name[client->optlen] = '\0';
 511    client->optlen = 0;
 512
 513    trace_nbd_negotiate_handle_export_name_request(name);
 514
 515    client->exp = nbd_export_find(name);
 516    if (!client->exp) {
 517        error_setg(errp, "export not found");
 518        return -EINVAL;
 519    }
 520    nbd_check_meta_export(client, client->exp);
 521
 522    myflags = client->exp->nbdflags;
 523    if (client->mode >= NBD_MODE_STRUCTURED) {
 524        myflags |= NBD_FLAG_SEND_DF;
 525    }
 526    if (client->mode >= NBD_MODE_EXTENDED && client->contexts.count) {
 527        myflags |= NBD_FLAG_BLOCK_STAT_PAYLOAD;
 528    }
 529    trace_nbd_negotiate_new_style_size_flags(client->exp->size, myflags);
 530    stq_be_p(buf, client->exp->size);
 531    stw_be_p(buf + 8, myflags);
 532    len = no_zeroes ? 10 : sizeof(buf);
 533    ret = nbd_write(client->ioc, buf, len, errp);
 534    if (ret < 0) {
 535        error_prepend(errp, "write failed: ");
 536        return ret;
 537    }
 538
 539    QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
 540    blk_exp_ref(&client->exp->common);
 541
 542    return 0;
 543}
 544
 545/* Send a single NBD_REP_INFO, with a buffer @buf of @length bytes.
 546 * The buffer does NOT include the info type prefix.
 547 * Return -errno on error, 0 if ready to send more. */
 548static coroutine_fn int
 549nbd_negotiate_send_info(NBDClient *client, uint16_t info, uint32_t length,
 550                        void *buf, Error **errp)
 551{
 552    int rc;
 553
 554    trace_nbd_negotiate_send_info(info, nbd_info_lookup(info), length);
 555    rc = nbd_negotiate_send_rep_len(client, NBD_REP_INFO,
 556                                    sizeof(info) + length, errp);
 557    if (rc < 0) {
 558        return rc;
 559    }
 560    info = cpu_to_be16(info);
 561    if (nbd_write(client->ioc, &info, sizeof(info), errp) < 0) {
 562        return -EIO;
 563    }
 564    if (nbd_write(client->ioc, buf, length, errp) < 0) {
 565        return -EIO;
 566    }
 567    return 0;
 568}
 569
 570/* nbd_reject_length: Handle any unexpected payload.
 571 * @fatal requests that we quit talking to the client, even if we are able
 572 * to successfully send an error reply.
 573 * Return:
 574 * -errno  transmission error occurred or @fatal was requested, errp is set
 575 * 0       error message successfully sent to client, errp is not set
 576 */
 577static coroutine_fn int
 578nbd_reject_length(NBDClient *client, bool fatal, Error **errp)
 579{
 580    int ret;
 581
 582    assert(client->optlen);
 583    ret = nbd_opt_invalid(client, errp, "option '%s' has unexpected length",
 584                          nbd_opt_lookup(client->opt));
 585    if (fatal && !ret) {
 586        error_setg(errp, "option '%s' has unexpected length",
 587                   nbd_opt_lookup(client->opt));
 588        return -EINVAL;
 589    }
 590    return ret;
 591}
 592
 593/* Handle NBD_OPT_INFO and NBD_OPT_GO.
 594 * Return -errno on error, 0 if ready for next option, and 1 to move
 595 * into transmission phase.  */
 596static coroutine_fn int
 597nbd_negotiate_handle_info(NBDClient *client, Error **errp)
 598{
 599    int rc;
 600    g_autofree char *name = NULL;
 601    NBDExport *exp;
 602    uint16_t requests;
 603    uint16_t request;
 604    uint32_t namelen = 0;
 605    bool sendname = false;
 606    bool blocksize = false;
 607    uint32_t sizes[3];
 608    char buf[sizeof(uint64_t) + sizeof(uint16_t)];
 609    uint32_t check_align = 0;
 610    uint16_t myflags;
 611
 612    /* Client sends:
 613        4 bytes: L, name length (can be 0)
 614        L bytes: export name
 615        2 bytes: N, number of requests (can be 0)
 616        N * 2 bytes: N requests
 617    */
 618    rc = nbd_opt_read_name(client, &name, &namelen, errp);
 619    if (rc <= 0) {
 620        return rc;
 621    }
 622    trace_nbd_negotiate_handle_export_name_request(name);
 623
 624    rc = nbd_opt_read(client, &requests, sizeof(requests), false, errp);
 625    if (rc <= 0) {
 626        return rc;
 627    }
 628    requests = be16_to_cpu(requests);
 629    trace_nbd_negotiate_handle_info_requests(requests);
 630    while (requests--) {
 631        rc = nbd_opt_read(client, &request, sizeof(request), false, errp);
 632        if (rc <= 0) {
 633            return rc;
 634        }
 635        request = be16_to_cpu(request);
 636        trace_nbd_negotiate_handle_info_request(request,
 637                                                nbd_info_lookup(request));
 638        /* We care about NBD_INFO_NAME and NBD_INFO_BLOCK_SIZE;
 639         * everything else is either a request we don't know or
 640         * something we send regardless of request */
 641        switch (request) {
 642        case NBD_INFO_NAME:
 643            sendname = true;
 644            break;
 645        case NBD_INFO_BLOCK_SIZE:
 646            blocksize = true;
 647            break;
 648        }
 649    }
 650    if (client->optlen) {
 651        return nbd_reject_length(client, false, errp);
 652    }
 653
 654    exp = nbd_export_find(name);
 655    if (!exp) {
 656        g_autofree char *sane_name = nbd_sanitize_name(name);
 657
 658        return nbd_negotiate_send_rep_err(client, NBD_REP_ERR_UNKNOWN,
 659                                          errp, "export '%s' not present",
 660                                          sane_name);
 661    }
 662    if (client->opt == NBD_OPT_GO) {
 663        nbd_check_meta_export(client, exp);
 664    }
 665
 666    /* Don't bother sending NBD_INFO_NAME unless client requested it */
 667    if (sendname) {
 668        rc = nbd_negotiate_send_info(client, NBD_INFO_NAME, namelen, name,
 669                                     errp);
 670        if (rc < 0) {
 671            return rc;
 672        }
 673    }
 674
 675    /* Send NBD_INFO_DESCRIPTION only if available, regardless of
 676     * client request */
 677    if (exp->description) {
 678        size_t len = strlen(exp->description);
 679
 680        assert(len <= NBD_MAX_STRING_SIZE);
 681        rc = nbd_negotiate_send_info(client, NBD_INFO_DESCRIPTION,
 682                                     len, exp->description, errp);
 683        if (rc < 0) {
 684            return rc;
 685        }
 686    }
 687
 688    /* Send NBD_INFO_BLOCK_SIZE always, but tweak the minimum size
 689     * according to whether the client requested it, and according to
 690     * whether this is OPT_INFO or OPT_GO. */
 691    /* minimum - 1 for back-compat, or actual if client will obey it. */
 692    if (client->opt == NBD_OPT_INFO || blocksize) {
 693        check_align = sizes[0] = blk_get_request_alignment(exp->common.blk);
 694    } else {
 695        sizes[0] = 1;
 696    }
 697    assert(sizes[0] <= NBD_MAX_BUFFER_SIZE);
 698    /* preferred - Hard-code to 4096 for now.
 699     * TODO: is blk_bs(blk)->bl.opt_transfer appropriate? */
 700    sizes[1] = MAX(4096, sizes[0]);
 701    /* maximum - At most 32M, but smaller as appropriate. */
 702    sizes[2] = MIN(blk_get_max_transfer(exp->common.blk), NBD_MAX_BUFFER_SIZE);
 703    trace_nbd_negotiate_handle_info_block_size(sizes[0], sizes[1], sizes[2]);
 704    sizes[0] = cpu_to_be32(sizes[0]);
 705    sizes[1] = cpu_to_be32(sizes[1]);
 706    sizes[2] = cpu_to_be32(sizes[2]);
 707    rc = nbd_negotiate_send_info(client, NBD_INFO_BLOCK_SIZE,
 708                                 sizeof(sizes), sizes, errp);
 709    if (rc < 0) {
 710        return rc;
 711    }
 712
 713    /* Send NBD_INFO_EXPORT always */
 714    myflags = exp->nbdflags;
 715    if (client->mode >= NBD_MODE_STRUCTURED) {
 716        myflags |= NBD_FLAG_SEND_DF;
 717    }
 718    if (client->mode >= NBD_MODE_EXTENDED &&
 719        (client->contexts.count || client->opt == NBD_OPT_INFO)) {
 720        myflags |= NBD_FLAG_BLOCK_STAT_PAYLOAD;
 721    }
 722    trace_nbd_negotiate_new_style_size_flags(exp->size, myflags);
 723    stq_be_p(buf, exp->size);
 724    stw_be_p(buf + 8, myflags);
 725    rc = nbd_negotiate_send_info(client, NBD_INFO_EXPORT,
 726                                 sizeof(buf), buf, errp);
 727    if (rc < 0) {
 728        return rc;
 729    }
 730
 731    /*
 732     * If the client is just asking for NBD_OPT_INFO, but forgot to
 733     * request block sizes in a situation that would impact
 734     * performance, then return an error. But for NBD_OPT_GO, we
 735     * tolerate all clients, regardless of alignments.
 736     */
 737    if (client->opt == NBD_OPT_INFO && !blocksize &&
 738        blk_get_request_alignment(exp->common.blk) > 1) {
 739        return nbd_negotiate_send_rep_err(client,
 740                                          NBD_REP_ERR_BLOCK_SIZE_REQD,
 741                                          errp,
 742                                          "request NBD_INFO_BLOCK_SIZE to "
 743                                          "use this export");
 744    }
 745
 746    /* Final reply */
 747    rc = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
 748    if (rc < 0) {
 749        return rc;
 750    }
 751
 752    if (client->opt == NBD_OPT_GO) {
 753        client->exp = exp;
 754        client->check_align = check_align;
 755        QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
 756        blk_exp_ref(&client->exp->common);
 757        rc = 1;
 758    }
 759    return rc;
 760}
 761
 762/* Callback to learn when QIO TLS upgrade is complete */
 763struct NBDTLSServerHandshakeData {
 764    bool complete;
 765    Error *error;
 766    Coroutine *co;
 767};
 768
 769static void
 770nbd_server_tls_handshake(QIOTask *task, void *opaque)
 771{
 772    struct NBDTLSServerHandshakeData *data = opaque;
 773
 774    qio_task_propagate_error(task, &data->error);
 775    data->complete = true;
 776    if (!qemu_coroutine_entered(data->co)) {
 777        aio_co_wake(data->co);
 778    }
 779}
 780
 781/* Handle NBD_OPT_STARTTLS. Return NULL to drop connection, or else the
 782 * new channel for all further (now-encrypted) communication. */
 783static coroutine_fn QIOChannel *
 784nbd_negotiate_handle_starttls(NBDClient *client, Error **errp)
 785{
 786    QIOChannel *ioc;
 787    QIOChannelTLS *tioc;
 788    struct NBDTLSServerHandshakeData data = { 0 };
 789
 790    assert(client->opt == NBD_OPT_STARTTLS);
 791
 792    trace_nbd_negotiate_handle_starttls();
 793    ioc = client->ioc;
 794
 795    if (nbd_negotiate_send_rep(client, NBD_REP_ACK, errp) < 0) {
 796        return NULL;
 797    }
 798
 799    tioc = qio_channel_tls_new_server(ioc,
 800                                      client->tlscreds,
 801                                      client->tlsauthz,
 802                                      errp);
 803    if (!tioc) {
 804        return NULL;
 805    }
 806
 807    qio_channel_set_name(QIO_CHANNEL(tioc), "nbd-server-tls");
 808    trace_nbd_negotiate_handle_starttls_handshake();
 809    data.co = qemu_coroutine_self();
 810    qio_channel_tls_handshake(tioc,
 811                              nbd_server_tls_handshake,
 812                              &data,
 813                              NULL,
 814                              NULL);
 815
 816    if (!data.complete) {
 817        qemu_coroutine_yield();
 818        assert(data.complete);
 819    }
 820
 821    if (data.error) {
 822        object_unref(OBJECT(tioc));
 823        error_propagate(errp, data.error);
 824        return NULL;
 825    }
 826
 827    return QIO_CHANNEL(tioc);
 828}
 829
 830/* nbd_negotiate_send_meta_context
 831 *
 832 * Send one chunk of reply to NBD_OPT_{LIST,SET}_META_CONTEXT
 833 *
 834 * For NBD_OPT_LIST_META_CONTEXT @context_id is ignored, 0 is used instead.
 835 */
 836static coroutine_fn int
 837nbd_negotiate_send_meta_context(NBDClient *client, const char *context,
 838                                uint32_t context_id, Error **errp)
 839{
 840    NBDOptionReplyMetaContext opt;
 841    struct iovec iov[] = {
 842        {.iov_base = &opt, .iov_len = sizeof(opt)},
 843        {.iov_base = (void *)context, .iov_len = strlen(context)}
 844    };
 845
 846    assert(iov[1].iov_len <= NBD_MAX_STRING_SIZE);
 847    if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
 848        context_id = 0;
 849    }
 850
 851    trace_nbd_negotiate_meta_query_reply(context, context_id);
 852    set_be_option_rep(&opt.h, client->opt, NBD_REP_META_CONTEXT,
 853                      sizeof(opt) - sizeof(opt.h) + iov[1].iov_len);
 854    stl_be_p(&opt.context_id, context_id);
 855
 856    return qio_channel_writev_all(client->ioc, iov, 2, errp) < 0 ? -EIO : 0;
 857}
 858
 859/*
 860 * Return true if @query matches @pattern, or if @query is empty when
 861 * the @client is performing _LIST_.
 862 */
 863static coroutine_fn bool
 864nbd_meta_empty_or_pattern(NBDClient *client, const char *pattern,
 865                          const char *query)
 866{
 867    if (!*query) {
 868        trace_nbd_negotiate_meta_query_parse("empty");
 869        return client->opt == NBD_OPT_LIST_META_CONTEXT;
 870    }
 871    if (strcmp(query, pattern) == 0) {
 872        trace_nbd_negotiate_meta_query_parse(pattern);
 873        return true;
 874    }
 875    trace_nbd_negotiate_meta_query_skip("pattern not matched");
 876    return false;
 877}
 878
 879/*
 880 * Return true and adjust @str in place if it begins with @prefix.
 881 */
 882static coroutine_fn bool
 883nbd_strshift(const char **str, const char *prefix)
 884{
 885    size_t len = strlen(prefix);
 886
 887    if (strncmp(*str, prefix, len) == 0) {
 888        *str += len;
 889        return true;
 890    }
 891    return false;
 892}
 893
 894/* nbd_meta_base_query
 895 *
 896 * Handle queries to 'base' namespace. For now, only the base:allocation
 897 * context is available.  Return true if @query has been handled.
 898 */
 899static coroutine_fn bool
 900nbd_meta_base_query(NBDClient *client, NBDMetaContexts *meta,
 901                    const char *query)
 902{
 903    if (!nbd_strshift(&query, "base:")) {
 904        return false;
 905    }
 906    trace_nbd_negotiate_meta_query_parse("base:");
 907
 908    if (nbd_meta_empty_or_pattern(client, "allocation", query)) {
 909        meta->base_allocation = true;
 910    }
 911    return true;
 912}
 913
 914/* nbd_meta_qemu_query
 915 *
 916 * Handle queries to 'qemu' namespace. For now, only the qemu:dirty-bitmap:
 917 * and qemu:allocation-depth contexts are available.  Return true if @query
 918 * has been handled.
 919 */
 920static coroutine_fn bool
 921nbd_meta_qemu_query(NBDClient *client, NBDMetaContexts *meta,
 922                    const char *query)
 923{
 924    size_t i;
 925
 926    if (!nbd_strshift(&query, "qemu:")) {
 927        return false;
 928    }
 929    trace_nbd_negotiate_meta_query_parse("qemu:");
 930
 931    if (!*query) {
 932        if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
 933            meta->allocation_depth = meta->exp->allocation_depth;
 934            if (meta->exp->nr_export_bitmaps) {
 935                memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
 936            }
 937        }
 938        trace_nbd_negotiate_meta_query_parse("empty");
 939        return true;
 940    }
 941
 942    if (strcmp(query, "allocation-depth") == 0) {
 943        trace_nbd_negotiate_meta_query_parse("allocation-depth");
 944        meta->allocation_depth = meta->exp->allocation_depth;
 945        return true;
 946    }
 947
 948    if (nbd_strshift(&query, "dirty-bitmap:")) {
 949        trace_nbd_negotiate_meta_query_parse("dirty-bitmap:");
 950        if (!*query) {
 951            if (client->opt == NBD_OPT_LIST_META_CONTEXT &&
 952                meta->exp->nr_export_bitmaps) {
 953                memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
 954            }
 955            trace_nbd_negotiate_meta_query_parse("empty");
 956            return true;
 957        }
 958
 959        for (i = 0; i < meta->exp->nr_export_bitmaps; i++) {
 960            const char *bm_name;
 961
 962            bm_name = bdrv_dirty_bitmap_name(meta->exp->export_bitmaps[i]);
 963            if (strcmp(bm_name, query) == 0) {
 964                meta->bitmaps[i] = true;
 965                trace_nbd_negotiate_meta_query_parse(query);
 966                return true;
 967            }
 968        }
 969        trace_nbd_negotiate_meta_query_skip("no dirty-bitmap match");
 970        return true;
 971    }
 972
 973    trace_nbd_negotiate_meta_query_skip("unknown qemu context");
 974    return true;
 975}
 976
 977/* nbd_negotiate_meta_query
 978 *
 979 * Parse namespace name and call corresponding function to parse body of the
 980 * query.
 981 *
 982 * The only supported namespaces are 'base' and 'qemu'.
 983 *
 984 * Return -errno on I/O error, 0 if option was completely handled by
 985 * sending a reply about inconsistent lengths, or 1 on success. */
 986static coroutine_fn int
 987nbd_negotiate_meta_query(NBDClient *client,
 988                         NBDMetaContexts *meta, Error **errp)
 989{
 990    int ret;
 991    g_autofree char *query = NULL;
 992    uint32_t len;
 993
 994    ret = nbd_opt_read(client, &len, sizeof(len), false, errp);
 995    if (ret <= 0) {
 996        return ret;
 997    }
 998    len = cpu_to_be32(len);
 999
1000    if (len > NBD_MAX_STRING_SIZE) {
1001        trace_nbd_negotiate_meta_query_skip("length too long");
1002        return nbd_opt_skip(client, len, errp);
1003    }
1004
1005    query = g_malloc(len + 1);
1006    ret = nbd_opt_read(client, query, len, true, errp);
1007    if (ret <= 0) {
1008        return ret;
1009    }
1010    query[len] = '\0';
1011
1012    if (nbd_meta_base_query(client, meta, query)) {
1013        return 1;
1014    }
1015    if (nbd_meta_qemu_query(client, meta, query)) {
1016        return 1;
1017    }
1018
1019    trace_nbd_negotiate_meta_query_skip("unknown namespace");
1020    return 1;
1021}
1022
1023/* nbd_negotiate_meta_queries
1024 * Handle NBD_OPT_LIST_META_CONTEXT and NBD_OPT_SET_META_CONTEXT
1025 *
1026 * Return -errno on I/O error, or 0 if option was completely handled. */
1027static coroutine_fn int
1028nbd_negotiate_meta_queries(NBDClient *client, Error **errp)
1029{
1030    int ret;
1031    g_autofree char *export_name = NULL;
1032    /* Mark unused to work around https://bugs.llvm.org/show_bug.cgi?id=3888 */
1033    g_autofree G_GNUC_UNUSED bool *bitmaps = NULL;
1034    NBDMetaContexts local_meta = {0};
1035    NBDMetaContexts *meta;
1036    uint32_t nb_queries;
1037    size_t i;
1038    size_t count = 0;
1039
1040    if (client->opt == NBD_OPT_SET_META_CONTEXT &&
1041        client->mode < NBD_MODE_STRUCTURED) {
1042        return nbd_opt_invalid(client, errp,
1043                               "request option '%s' when structured reply "
1044                               "is not negotiated",
1045                               nbd_opt_lookup(client->opt));
1046    }
1047
1048    if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
1049        /* Only change the caller's meta on SET. */
1050        meta = &local_meta;
1051    } else {
1052        meta = &client->contexts;
1053    }
1054
1055    g_free(meta->bitmaps);
1056    memset(meta, 0, sizeof(*meta));
1057
1058    ret = nbd_opt_read_name(client, &export_name, NULL, errp);
1059    if (ret <= 0) {
1060        return ret;
1061    }
1062
1063    meta->exp = nbd_export_find(export_name);
1064    if (meta->exp == NULL) {
1065        g_autofree char *sane_name = nbd_sanitize_name(export_name);
1066
1067        return nbd_opt_drop(client, NBD_REP_ERR_UNKNOWN, errp,
1068                            "export '%s' not present", sane_name);
1069    }
1070    meta->bitmaps = g_new0(bool, meta->exp->nr_export_bitmaps);
1071    if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
1072        bitmaps = meta->bitmaps;
1073    }
1074
1075    ret = nbd_opt_read(client, &nb_queries, sizeof(nb_queries), false, errp);
1076    if (ret <= 0) {
1077        return ret;
1078    }
1079    nb_queries = cpu_to_be32(nb_queries);
1080    trace_nbd_negotiate_meta_context(nbd_opt_lookup(client->opt),
1081                                     export_name, nb_queries);
1082
1083    if (client->opt == NBD_OPT_LIST_META_CONTEXT && !nb_queries) {
1084        /* enable all known contexts */
1085        meta->base_allocation = true;
1086        meta->allocation_depth = meta->exp->allocation_depth;
1087        if (meta->exp->nr_export_bitmaps) {
1088            memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
1089        }
1090    } else {
1091        for (i = 0; i < nb_queries; ++i) {
1092            ret = nbd_negotiate_meta_query(client, meta, errp);
1093            if (ret <= 0) {
1094                return ret;
1095            }
1096        }
1097    }
1098
1099    if (meta->base_allocation) {
1100        ret = nbd_negotiate_send_meta_context(client, "base:allocation",
1101                                              NBD_META_ID_BASE_ALLOCATION,
1102                                              errp);
1103        if (ret < 0) {
1104            return ret;
1105        }
1106        count++;
1107    }
1108
1109    if (meta->allocation_depth) {
1110        ret = nbd_negotiate_send_meta_context(client, "qemu:allocation-depth",
1111                                              NBD_META_ID_ALLOCATION_DEPTH,
1112                                              errp);
1113        if (ret < 0) {
1114            return ret;
1115        }
1116        count++;
1117    }
1118
1119    for (i = 0; i < meta->exp->nr_export_bitmaps; i++) {
1120        const char *bm_name;
1121        g_autofree char *context = NULL;
1122
1123        if (!meta->bitmaps[i]) {
1124            continue;
1125        }
1126
1127        bm_name = bdrv_dirty_bitmap_name(meta->exp->export_bitmaps[i]);
1128        context = g_strdup_printf("qemu:dirty-bitmap:%s", bm_name);
1129
1130        ret = nbd_negotiate_send_meta_context(client, context,
1131                                              NBD_META_ID_DIRTY_BITMAP + i,
1132                                              errp);
1133        if (ret < 0) {
1134            return ret;
1135        }
1136        count++;
1137    }
1138
1139    ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
1140    if (ret == 0) {
1141        meta->count = count;
1142    }
1143
1144    return ret;
1145}
1146
1147/* nbd_negotiate_options
1148 * Process all NBD_OPT_* client option commands, during fixed newstyle
1149 * negotiation.
1150 * Return:
1151 * -errno  on error, errp is set
1152 * 0       on successful negotiation, errp is not set
1153 * 1       if client sent NBD_OPT_ABORT (i.e. on valid disconnect) or never
1154 *         wrote anything (i.e. port probe); errp is not set
1155 */
1156static coroutine_fn int
1157nbd_negotiate_options(NBDClient *client, Error **errp)
1158{
1159    uint32_t flags;
1160    bool fixedNewstyle = false;
1161    bool no_zeroes = false;
1162
1163    /* Client sends:
1164        [ 0 ..   3]   client flags
1165
1166       Then we loop until NBD_OPT_EXPORT_NAME or NBD_OPT_GO:
1167        [ 0 ..   7]   NBD_OPTS_MAGIC
1168        [ 8 ..  11]   NBD option
1169        [12 ..  15]   Data length
1170        ...           Rest of request
1171
1172        [ 0 ..   7]   NBD_OPTS_MAGIC
1173        [ 8 ..  11]   Second NBD option
1174        [12 ..  15]   Data length
1175        ...           Rest of request
1176    */
1177
1178    /*
1179     * Intentionally ignore errors on this first read - we do not want
1180     * to be noisy about a mere port probe, but only for clients that
1181     * start talking the protocol and then quit abruptly.
1182     */
1183    if (nbd_read32(client->ioc, &flags, "flags", NULL) < 0) {
1184        return 1;
1185    }
1186    client->mode = NBD_MODE_EXPORT_NAME;
1187    trace_nbd_negotiate_options_flags(flags);
1188    if (flags & NBD_FLAG_C_FIXED_NEWSTYLE) {
1189        fixedNewstyle = true;
1190        flags &= ~NBD_FLAG_C_FIXED_NEWSTYLE;
1191        client->mode = NBD_MODE_SIMPLE;
1192    }
1193    if (flags & NBD_FLAG_C_NO_ZEROES) {
1194        no_zeroes = true;
1195        flags &= ~NBD_FLAG_C_NO_ZEROES;
1196    }
1197    if (flags != 0) {
1198        error_setg(errp, "Unknown client flags 0x%" PRIx32 " received", flags);
1199        return -EINVAL;
1200    }
1201
1202    while (1) {
1203        int ret;
1204        uint32_t option, length;
1205        uint64_t magic;
1206
1207        if (nbd_read64(client->ioc, &magic, "opts magic", errp) < 0) {
1208            return -EINVAL;
1209        }
1210        trace_nbd_negotiate_options_check_magic(magic);
1211        if (magic != NBD_OPTS_MAGIC) {
1212            error_setg(errp, "Bad magic received");
1213            return -EINVAL;
1214        }
1215
1216        if (nbd_read32(client->ioc, &option, "option", errp) < 0) {
1217            return -EINVAL;
1218        }
1219        client->opt = option;
1220
1221        if (nbd_read32(client->ioc, &length, "option length", errp) < 0) {
1222            return -EINVAL;
1223        }
1224        assert(!client->optlen);
1225        client->optlen = length;
1226
1227        if (length > NBD_MAX_BUFFER_SIZE) {
1228            error_setg(errp, "len (%" PRIu32 ") is larger than max len (%u)",
1229                       length, NBD_MAX_BUFFER_SIZE);
1230            return -EINVAL;
1231        }
1232
1233        trace_nbd_negotiate_options_check_option(option,
1234                                                 nbd_opt_lookup(option));
1235        if (client->tlscreds &&
1236            client->ioc == (QIOChannel *)client->sioc) {
1237            QIOChannel *tioc;
1238            if (!fixedNewstyle) {
1239                error_setg(errp, "Unsupported option 0x%" PRIx32, option);
1240                return -EINVAL;
1241            }
1242            switch (option) {
1243            case NBD_OPT_STARTTLS:
1244                if (length) {
1245                    /* Unconditionally drop the connection if the client
1246                     * can't start a TLS negotiation correctly */
1247                    return nbd_reject_length(client, true, errp);
1248                }
1249                tioc = nbd_negotiate_handle_starttls(client, errp);
1250                if (!tioc) {
1251                    return -EIO;
1252                }
1253                ret = 0;
1254                object_unref(OBJECT(client->ioc));
1255                client->ioc = tioc;
1256                break;
1257
1258            case NBD_OPT_EXPORT_NAME:
1259                /* No way to return an error to client, so drop connection */
1260                error_setg(errp, "Option 0x%x not permitted before TLS",
1261                           option);
1262                return -EINVAL;
1263
1264            default:
1265                /* Let the client keep trying, unless they asked to
1266                 * quit. Always try to give an error back to the
1267                 * client; but when replying to OPT_ABORT, be aware
1268                 * that the client may hang up before receiving the
1269                 * error, in which case we are fine ignoring the
1270                 * resulting EPIPE. */
1271                ret = nbd_opt_drop(client, NBD_REP_ERR_TLS_REQD,
1272                                   option == NBD_OPT_ABORT ? NULL : errp,
1273                                   "Option 0x%" PRIx32
1274                                   " not permitted before TLS", option);
1275                if (option == NBD_OPT_ABORT) {
1276                    return 1;
1277                }
1278                break;
1279            }
1280        } else if (fixedNewstyle) {
1281            switch (option) {
1282            case NBD_OPT_LIST:
1283                if (length) {
1284                    ret = nbd_reject_length(client, false, errp);
1285                } else {
1286                    ret = nbd_negotiate_handle_list(client, errp);
1287                }
1288                break;
1289
1290            case NBD_OPT_ABORT:
1291                /* NBD spec says we must try to reply before
1292                 * disconnecting, but that we must also tolerate
1293                 * guests that don't wait for our reply. */
1294                nbd_negotiate_send_rep(client, NBD_REP_ACK, NULL);
1295                return 1;
1296
1297            case NBD_OPT_EXPORT_NAME:
1298                return nbd_negotiate_handle_export_name(client, no_zeroes,
1299                                                        errp);
1300
1301            case NBD_OPT_INFO:
1302            case NBD_OPT_GO:
1303                ret = nbd_negotiate_handle_info(client, errp);
1304                if (ret == 1) {
1305                    assert(option == NBD_OPT_GO);
1306                    return 0;
1307                }
1308                break;
1309
1310            case NBD_OPT_STARTTLS:
1311                if (length) {
1312                    ret = nbd_reject_length(client, false, errp);
1313                } else if (client->tlscreds) {
1314                    ret = nbd_negotiate_send_rep_err(client,
1315                                                     NBD_REP_ERR_INVALID, errp,
1316                                                     "TLS already enabled");
1317                } else {
1318                    ret = nbd_negotiate_send_rep_err(client,
1319                                                     NBD_REP_ERR_POLICY, errp,
1320                                                     "TLS not configured");
1321                }
1322                break;
1323
1324            case NBD_OPT_STRUCTURED_REPLY:
1325                if (length) {
1326                    ret = nbd_reject_length(client, false, errp);
1327                } else if (client->mode >= NBD_MODE_EXTENDED) {
1328                    ret = nbd_negotiate_send_rep_err(
1329                        client, NBD_REP_ERR_EXT_HEADER_REQD, errp,
1330                        "extended headers already negotiated");
1331                } else if (client->mode >= NBD_MODE_STRUCTURED) {
1332                    ret = nbd_negotiate_send_rep_err(
1333                        client, NBD_REP_ERR_INVALID, errp,
1334                        "structured reply already negotiated");
1335                } else {
1336                    ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
1337                    client->mode = NBD_MODE_STRUCTURED;
1338                }
1339                break;
1340
1341            case NBD_OPT_LIST_META_CONTEXT:
1342            case NBD_OPT_SET_META_CONTEXT:
1343                ret = nbd_negotiate_meta_queries(client, errp);
1344                break;
1345
1346            case NBD_OPT_EXTENDED_HEADERS:
1347                if (length) {
1348                    ret = nbd_reject_length(client, false, errp);
1349                } else if (client->mode >= NBD_MODE_EXTENDED) {
1350                    ret = nbd_negotiate_send_rep_err(
1351                        client, NBD_REP_ERR_INVALID, errp,
1352                        "extended headers already negotiated");
1353                } else {
1354                    ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
1355                    client->mode = NBD_MODE_EXTENDED;
1356                }
1357                break;
1358
1359            default:
1360                ret = nbd_opt_drop(client, NBD_REP_ERR_UNSUP, errp,
1361                                   "Unsupported option %" PRIu32 " (%s)",
1362                                   option, nbd_opt_lookup(option));
1363                break;
1364            }
1365        } else {
1366            /*
1367             * If broken new-style we should drop the connection
1368             * for anything except NBD_OPT_EXPORT_NAME
1369             */
1370            switch (option) {
1371            case NBD_OPT_EXPORT_NAME:
1372                return nbd_negotiate_handle_export_name(client, no_zeroes,
1373                                                        errp);
1374
1375            default:
1376                error_setg(errp, "Unsupported option %" PRIu32 " (%s)",
1377                           option, nbd_opt_lookup(option));
1378                return -EINVAL;
1379            }
1380        }
1381        if (ret < 0) {
1382            return ret;
1383        }
1384    }
1385}
1386
1387/* nbd_negotiate
1388 * Return:
1389 * -errno  on error, errp is set
1390 * 0       on successful negotiation, errp is not set
1391 * 1       if client sent NBD_OPT_ABORT (i.e. on valid disconnect) or never
1392 *         wrote anything (i.e. port probe); errp is not set
1393 */
1394static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
1395{
1396    ERRP_GUARD();
1397    char buf[NBD_OLDSTYLE_NEGOTIATE_SIZE] = "";
1398    int ret;
1399
1400    /* Old style negotiation header, no room for options
1401        [ 0 ..   7]   passwd       ("NBDMAGIC")
1402        [ 8 ..  15]   magic        (NBD_CLIENT_MAGIC)
1403        [16 ..  23]   size
1404        [24 ..  27]   export flags (zero-extended)
1405        [28 .. 151]   reserved     (0)
1406
1407       New style negotiation header, client can send options
1408        [ 0 ..   7]   passwd       ("NBDMAGIC")
1409        [ 8 ..  15]   magic        (NBD_OPTS_MAGIC)
1410        [16 ..  17]   server flags (0)
1411        ....options sent, ending in NBD_OPT_EXPORT_NAME or NBD_OPT_GO....
1412     */
1413
1414    qio_channel_set_blocking(client->ioc, false, NULL);
1415    qio_channel_set_follow_coroutine_ctx(client->ioc, true);
1416
1417    trace_nbd_negotiate_begin();
1418    memcpy(buf, "NBDMAGIC", 8);
1419
1420    stq_be_p(buf + 8, NBD_OPTS_MAGIC);
1421    stw_be_p(buf + 16, NBD_FLAG_FIXED_NEWSTYLE | NBD_FLAG_NO_ZEROES);
1422
1423    /*
1424     * Be silent about failure to write our greeting: there is nothing
1425     * wrong with a client testing if our port is alive.
1426     */
1427    if (nbd_write(client->ioc, buf, 18, NULL) < 0) {
1428        return 1;
1429    }
1430    ret = nbd_negotiate_options(client, errp);
1431    if (ret != 0) {
1432        if (ret < 0) {
1433            error_prepend(errp, "option negotiation failed: ");
1434        }
1435        return ret;
1436    }
1437
1438    assert(!client->optlen);
1439    trace_nbd_negotiate_success();
1440
1441    return 0;
1442}
1443
1444/* nbd_read_eof
1445 * Tries to read @size bytes from @ioc. This is a local implementation of
1446 * qio_channel_readv_all_eof. We have it here because we need it to be
1447 * interruptible and to know when the coroutine is yielding.
1448 * Returns 1 on success
1449 *         0 on eof, when no data was read (errp is not set)
1450 *         negative errno on failure (errp is set)
1451 */
1452static inline int coroutine_fn
1453nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
1454{
1455    bool partial = false;
1456
1457    assert(size);
1458    while (size > 0) {
1459        struct iovec iov = { .iov_base = buffer, .iov_len = size };
1460        ssize_t len;
1461
1462        len = qio_channel_readv(client->ioc, &iov, 1, errp);
1463        if (len == QIO_CHANNEL_ERR_BLOCK) {
1464            WITH_QEMU_LOCK_GUARD(&client->lock) {
1465                client->read_yielding = true;
1466
1467                /* Prompt main loop thread to re-run nbd_drained_poll() */
1468                aio_wait_kick();
1469            }
1470            qio_channel_yield(client->ioc, G_IO_IN);
1471            WITH_QEMU_LOCK_GUARD(&client->lock) {
1472                client->read_yielding = false;
1473                if (client->quiescing) {
1474                    return -EAGAIN;
1475                }
1476            }
1477            continue;
1478        } else if (len < 0) {
1479            return -EIO;
1480        } else if (len == 0) {
1481            if (partial) {
1482                error_setg(errp,
1483                           "Unexpected end-of-file before all bytes were read");
1484                return -EIO;
1485            } else {
1486                return 0;
1487            }
1488        }
1489
1490        partial = true;
1491        size -= len;
1492        buffer = (uint8_t *) buffer + len;
1493    }
1494    return 1;
1495}
1496
1497static int coroutine_fn nbd_receive_request(NBDClient *client, NBDRequest *request,
1498                                            Error **errp)
1499{
1500    uint8_t buf[NBD_EXTENDED_REQUEST_SIZE];
1501    uint32_t magic, expect;
1502    int ret;
1503    size_t size = client->mode >= NBD_MODE_EXTENDED ?
1504        NBD_EXTENDED_REQUEST_SIZE : NBD_REQUEST_SIZE;
1505
1506    ret = nbd_read_eof(client, buf, size, errp);
1507    if (ret < 0) {
1508        return ret;
1509    }
1510    if (ret == 0) {
1511        return -EIO;
1512    }
1513
1514    /*
1515     * Compact request
1516     *  [ 0 ..  3]   magic   (NBD_REQUEST_MAGIC)
1517     *  [ 4 ..  5]   flags   (NBD_CMD_FLAG_FUA, ...)
1518     *  [ 6 ..  7]   type    (NBD_CMD_READ, ...)
1519     *  [ 8 .. 15]   cookie
1520     *  [16 .. 23]   from
1521     *  [24 .. 27]   len
1522     * Extended request
1523     *  [ 0 ..  3]   magic   (NBD_EXTENDED_REQUEST_MAGIC)
1524     *  [ 4 ..  5]   flags   (NBD_CMD_FLAG_FUA, NBD_CMD_FLAG_PAYLOAD_LEN, ...)
1525     *  [ 6 ..  7]   type    (NBD_CMD_READ, ...)
1526     *  [ 8 .. 15]   cookie
1527     *  [16 .. 23]   from
1528     *  [24 .. 31]   len
1529     */
1530
1531    magic = ldl_be_p(buf);
1532    request->flags  = lduw_be_p(buf + 4);
1533    request->type   = lduw_be_p(buf + 6);
1534    request->cookie = ldq_be_p(buf + 8);
1535    request->from   = ldq_be_p(buf + 16);
1536    if (client->mode >= NBD_MODE_EXTENDED) {
1537        request->len = ldq_be_p(buf + 24);
1538        expect = NBD_EXTENDED_REQUEST_MAGIC;
1539    } else {
1540        request->len = (uint32_t)ldl_be_p(buf + 24); /* widen 32 to 64 bits */
1541        expect = NBD_REQUEST_MAGIC;
1542    }
1543
1544    trace_nbd_receive_request(magic, request->flags, request->type,
1545                              request->from, request->len);
1546
1547    if (magic != expect) {
1548        error_setg(errp, "invalid magic (got 0x%" PRIx32 ", expected 0x%"
1549                   PRIx32 ")", magic, expect);
1550        return -EINVAL;
1551    }
1552    return 0;
1553}
1554
1555#define MAX_NBD_REQUESTS 16
1556
1557/* Runs in export AioContext and main loop thread */
1558void nbd_client_get(NBDClient *client)
1559{
1560    qatomic_inc(&client->refcount);
1561}
1562
1563void nbd_client_put(NBDClient *client)
1564{
1565    assert(qemu_in_main_thread());
1566
1567    if (qatomic_fetch_dec(&client->refcount) == 1) {
1568        /* The last reference should be dropped by client->close,
1569         * which is called by client_close.
1570         */
1571        assert(client->closing);
1572
1573        object_unref(OBJECT(client->sioc));
1574        object_unref(OBJECT(client->ioc));
1575        if (client->tlscreds) {
1576            object_unref(OBJECT(client->tlscreds));
1577        }
1578        g_free(client->tlsauthz);
1579        if (client->exp) {
1580            QTAILQ_REMOVE(&client->exp->clients, client, next);
1581            blk_exp_unref(&client->exp->common);
1582        }
1583        g_free(client->contexts.bitmaps);
1584        qemu_mutex_destroy(&client->lock);
1585        g_free(client);
1586    }
1587}
1588
1589/*
1590 * Tries to release the reference to @client, but only if other references
1591 * remain. This is an optimization for the common case where we want to avoid
1592 * the expense of scheduling nbd_client_put() in the main loop thread.
1593 *
1594 * Returns true upon success or false if the reference was not released because
1595 * it is the last reference.
1596 */
1597static bool nbd_client_put_nonzero(NBDClient *client)
1598{
1599    int old = qatomic_read(&client->refcount);
1600    int expected;
1601
1602    do {
1603        if (old == 1) {
1604            return false;
1605        }
1606
1607        expected = old;
1608        old = qatomic_cmpxchg(&client->refcount, expected, expected - 1);
1609    } while (old != expected);
1610
1611    return true;
1612}
1613
1614static void client_close(NBDClient *client, bool negotiated)
1615{
1616    assert(qemu_in_main_thread());
1617
1618    WITH_QEMU_LOCK_GUARD(&client->lock) {
1619        if (client->closing) {
1620            return;
1621        }
1622
1623        client->closing = true;
1624    }
1625
1626    /* Force requests to finish.  They will drop their own references,
1627     * then we'll close the socket and free the NBDClient.
1628     */
1629    qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH,
1630                         NULL);
1631
1632    /* Also tell the client, so that they release their reference.  */
1633    if (client->close_fn) {
1634        client->close_fn(client, negotiated);
1635    }
1636}
1637
1638/* Runs in export AioContext with client->lock held */
1639static NBDRequestData *nbd_request_get(NBDClient *client)
1640{
1641    NBDRequestData *req;
1642
1643    assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
1644    client->nb_requests++;
1645
1646    req = g_new0(NBDRequestData, 1);
1647    req->client = client;
1648    return req;
1649}
1650
1651/* Runs in export AioContext with client->lock held */
1652static void nbd_request_put(NBDRequestData *req)
1653{
1654    NBDClient *client = req->client;
1655
1656    if (req->data) {
1657        qemu_vfree(req->data);
1658    }
1659    g_free(req);
1660
1661    client->nb_requests--;
1662
1663    if (client->quiescing && client->nb_requests == 0) {
1664        aio_wait_kick();
1665    }
1666
1667    nbd_client_receive_next_request(client);
1668}
1669
1670static void blk_aio_attached(AioContext *ctx, void *opaque)
1671{
1672    NBDExport *exp = opaque;
1673    NBDClient *client;
1674
1675    assert(qemu_in_main_thread());
1676
1677    trace_nbd_blk_aio_attached(exp->name, ctx);
1678
1679    exp->common.ctx = ctx;
1680
1681    QTAILQ_FOREACH(client, &exp->clients, next) {
1682        WITH_QEMU_LOCK_GUARD(&client->lock) {
1683            assert(client->nb_requests == 0);
1684            assert(client->recv_coroutine == NULL);
1685            assert(client->send_coroutine == NULL);
1686        }
1687    }
1688}
1689
1690static void blk_aio_detach(void *opaque)
1691{
1692    NBDExport *exp = opaque;
1693
1694    assert(qemu_in_main_thread());
1695
1696    trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
1697
1698    exp->common.ctx = NULL;
1699}
1700
1701static void nbd_drained_begin(void *opaque)
1702{
1703    NBDExport *exp = opaque;
1704    NBDClient *client;
1705
1706    assert(qemu_in_main_thread());
1707
1708    QTAILQ_FOREACH(client, &exp->clients, next) {
1709        WITH_QEMU_LOCK_GUARD(&client->lock) {
1710            client->quiescing = true;
1711        }
1712    }
1713}
1714
1715static void nbd_drained_end(void *opaque)
1716{
1717    NBDExport *exp = opaque;
1718    NBDClient *client;
1719
1720    assert(qemu_in_main_thread());
1721
1722    QTAILQ_FOREACH(client, &exp->clients, next) {
1723        WITH_QEMU_LOCK_GUARD(&client->lock) {
1724            client->quiescing = false;
1725            nbd_client_receive_next_request(client);
1726        }
1727    }
1728}
1729
1730/* Runs in export AioContext */
1731static void nbd_wake_read_bh(void *opaque)
1732{
1733    NBDClient *client = opaque;
1734    qio_channel_wake_read(client->ioc);
1735}
1736
1737static bool nbd_drained_poll(void *opaque)
1738{
1739    NBDExport *exp = opaque;
1740    NBDClient *client;
1741
1742    assert(qemu_in_main_thread());
1743
1744    QTAILQ_FOREACH(client, &exp->clients, next) {
1745        WITH_QEMU_LOCK_GUARD(&client->lock) {
1746            if (client->nb_requests != 0) {
1747                /*
1748                 * If there's a coroutine waiting for a request on nbd_read_eof()
1749                 * enter it here so we don't depend on the client to wake it up.
1750                 *
1751                 * Schedule a BH in the export AioContext to avoid missing the
1752                 * wake up due to the race between qio_channel_wake_read() and
1753                 * qio_channel_yield().
1754                 */
1755                if (client->recv_coroutine != NULL && client->read_yielding) {
1756                    aio_bh_schedule_oneshot(nbd_export_aio_context(client->exp),
1757                                            nbd_wake_read_bh, client);
1758                }
1759
1760                return true;
1761            }
1762        }
1763    }
1764
1765    return false;
1766}
1767
1768static void nbd_eject_notifier(Notifier *n, void *data)
1769{
1770    NBDExport *exp = container_of(n, NBDExport, eject_notifier);
1771
1772    assert(qemu_in_main_thread());
1773
1774    blk_exp_request_shutdown(&exp->common);
1775}
1776
1777void nbd_export_set_on_eject_blk(BlockExport *exp, BlockBackend *blk)
1778{
1779    NBDExport *nbd_exp = container_of(exp, NBDExport, common);
1780    assert(exp->drv == &blk_exp_nbd);
1781    assert(nbd_exp->eject_notifier_blk == NULL);
1782
1783    blk_ref(blk);
1784    nbd_exp->eject_notifier_blk = blk;
1785    nbd_exp->eject_notifier.notify = nbd_eject_notifier;
1786    blk_add_remove_bs_notifier(blk, &nbd_exp->eject_notifier);
1787}
1788
1789static const BlockDevOps nbd_block_ops = {
1790    .drained_begin = nbd_drained_begin,
1791    .drained_end = nbd_drained_end,
1792    .drained_poll = nbd_drained_poll,
1793};
1794
1795static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args,
1796                             Error **errp)
1797{
1798    NBDExport *exp = container_of(blk_exp, NBDExport, common);
1799    BlockExportOptionsNbd *arg = &exp_args->u.nbd;
1800    const char *name = arg->name ?: exp_args->node_name;
1801    BlockBackend *blk = blk_exp->blk;
1802    int64_t size;
1803    uint64_t perm, shared_perm;
1804    bool readonly = !exp_args->writable;
1805    BlockDirtyBitmapOrStrList *bitmaps;
1806    size_t i;
1807    int ret;
1808
1809    GLOBAL_STATE_CODE();
1810    assert(exp_args->type == BLOCK_EXPORT_TYPE_NBD);
1811
1812    if (!nbd_server_is_running()) {
1813        error_setg(errp, "NBD server not running");
1814        return -EINVAL;
1815    }
1816
1817    if (strlen(name) > NBD_MAX_STRING_SIZE) {
1818        error_setg(errp, "export name '%s' too long", name);
1819        return -EINVAL;
1820    }
1821
1822    if (arg->description && strlen(arg->description) > NBD_MAX_STRING_SIZE) {
1823        error_setg(errp, "description '%s' too long", arg->description);
1824        return -EINVAL;
1825    }
1826
1827    if (nbd_export_find(name)) {
1828        error_setg(errp, "NBD server already has export named '%s'", name);
1829        return -EEXIST;
1830    }
1831
1832    size = blk_getlength(blk);
1833    if (size < 0) {
1834        error_setg_errno(errp, -size,
1835                         "Failed to determine the NBD export's length");
1836        return size;
1837    }
1838
1839    /* Don't allow resize while the NBD server is running, otherwise we don't
1840     * care what happens with the node. */
1841    blk_get_perm(blk, &perm, &shared_perm);
1842    ret = blk_set_perm(blk, perm, shared_perm & ~BLK_PERM_RESIZE, errp);
1843    if (ret < 0) {
1844        return ret;
1845    }
1846
1847    QTAILQ_INIT(&exp->clients);
1848    exp->name = g_strdup(name);
1849    exp->description = g_strdup(arg->description);
1850    exp->nbdflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_FLUSH |
1851                     NBD_FLAG_SEND_FUA | NBD_FLAG_SEND_CACHE);
1852
1853    if (nbd_server_max_connections() != 1) {
1854        exp->nbdflags |= NBD_FLAG_CAN_MULTI_CONN;
1855    }
1856    if (readonly) {
1857        exp->nbdflags |= NBD_FLAG_READ_ONLY;
1858    } else {
1859        exp->nbdflags |= (NBD_FLAG_SEND_TRIM | NBD_FLAG_SEND_WRITE_ZEROES |
1860                          NBD_FLAG_SEND_FAST_ZERO);
1861    }
1862    exp->size = QEMU_ALIGN_DOWN(size, BDRV_SECTOR_SIZE);
1863
1864    bdrv_graph_rdlock_main_loop();
1865
1866    for (bitmaps = arg->bitmaps; bitmaps; bitmaps = bitmaps->next) {
1867        exp->nr_export_bitmaps++;
1868    }
1869    exp->export_bitmaps = g_new0(BdrvDirtyBitmap *, exp->nr_export_bitmaps);
1870    for (i = 0, bitmaps = arg->bitmaps; bitmaps;
1871         i++, bitmaps = bitmaps->next)
1872    {
1873        const char *bitmap;
1874        BlockDriverState *bs = blk_bs(blk);
1875        BdrvDirtyBitmap *bm = NULL;
1876
1877        switch (bitmaps->value->type) {
1878        case QTYPE_QSTRING:
1879            bitmap = bitmaps->value->u.local;
1880            while (bs) {
1881                bm = bdrv_find_dirty_bitmap(bs, bitmap);
1882                if (bm != NULL) {
1883                    break;
1884                }
1885
1886                bs = bdrv_filter_or_cow_bs(bs);
1887            }
1888
1889            if (bm == NULL) {
1890                ret = -ENOENT;
1891                error_setg(errp, "Bitmap '%s' is not found",
1892                           bitmaps->value->u.local);
1893                goto fail;
1894            }
1895
1896            if (readonly && bdrv_is_writable(bs) &&
1897                bdrv_dirty_bitmap_enabled(bm)) {
1898                ret = -EINVAL;
1899                error_setg(errp, "Enabled bitmap '%s' incompatible with "
1900                           "readonly export", bitmap);
1901                goto fail;
1902            }
1903            break;
1904        case QTYPE_QDICT:
1905            bitmap = bitmaps->value->u.external.name;
1906            bm = block_dirty_bitmap_lookup(bitmaps->value->u.external.node,
1907                                           bitmap, NULL, errp);
1908            if (!bm) {
1909                ret = -ENOENT;
1910                goto fail;
1911            }
1912            break;
1913        default:
1914            abort();
1915        }
1916
1917        assert(bm);
1918
1919        if (bdrv_dirty_bitmap_check(bm, BDRV_BITMAP_ALLOW_RO, errp)) {
1920            ret = -EINVAL;
1921            goto fail;
1922        }
1923
1924        exp->export_bitmaps[i] = bm;
1925        assert(strlen(bitmap) <= BDRV_BITMAP_MAX_NAME_SIZE);
1926    }
1927
1928    /* Mark bitmaps busy in a separate loop, to simplify roll-back concerns. */
1929    for (i = 0; i < exp->nr_export_bitmaps; i++) {
1930        bdrv_dirty_bitmap_set_busy(exp->export_bitmaps[i], true);
1931    }
1932
1933    exp->allocation_depth = arg->allocation_depth;
1934
1935    /*
1936     * We need to inhibit request queuing in the block layer to ensure we can
1937     * be properly quiesced when entering a drained section, as our coroutines
1938     * servicing pending requests might enter blk_pread().
1939     */
1940    blk_set_disable_request_queuing(blk, true);
1941
1942    blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
1943
1944    blk_set_dev_ops(blk, &nbd_block_ops, exp);
1945
1946    QTAILQ_INSERT_TAIL(&exports, exp, next);
1947
1948    bdrv_graph_rdunlock_main_loop();
1949
1950    return 0;
1951
1952fail:
1953    bdrv_graph_rdunlock_main_loop();
1954    g_free(exp->export_bitmaps);
1955    g_free(exp->name);
1956    g_free(exp->description);
1957    return ret;
1958}
1959
1960NBDExport *nbd_export_find(const char *name)
1961{
1962    NBDExport *exp;
1963    QTAILQ_FOREACH(exp, &exports, next) {
1964        if (strcmp(name, exp->name) == 0) {
1965            return exp;
1966        }
1967    }
1968
1969    return NULL;
1970}
1971
1972AioContext *
1973nbd_export_aio_context(NBDExport *exp)
1974{
1975    return exp->common.ctx;
1976}
1977
1978static void nbd_export_request_shutdown(BlockExport *blk_exp)
1979{
1980    NBDExport *exp = container_of(blk_exp, NBDExport, common);
1981    NBDClient *client, *next;
1982
1983    blk_exp_ref(&exp->common);
1984    /*
1985     * TODO: Should we expand QMP BlockExportRemoveMode enum to allow a
1986     * close mode that stops advertising the export to new clients but
1987     * still permits existing clients to run to completion? Because of
1988     * that possibility, nbd_export_close() can be called more than
1989     * once on an export.
1990     */
1991    QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) {
1992        client_close(client, true);
1993    }
1994    if (exp->name) {
1995        g_free(exp->name);
1996        exp->name = NULL;
1997        QTAILQ_REMOVE(&exports, exp, next);
1998    }
1999    blk_exp_unref(&exp->common);
2000}
2001
2002static void nbd_export_delete(BlockExport *blk_exp)
2003{
2004    size_t i;
2005    NBDExport *exp = container_of(blk_exp, NBDExport, common);
2006
2007    assert(exp->name == NULL);
2008    assert(QTAILQ_EMPTY(&exp->clients));
2009
2010    g_free(exp->description);
2011    exp->description = NULL;
2012
2013    if (exp->eject_notifier_blk) {
2014        notifier_remove(&exp->eject_notifier);
2015        blk_unref(exp->eject_notifier_blk);
2016    }
2017    blk_remove_aio_context_notifier(exp->common.blk, blk_aio_attached,
2018                                    blk_aio_detach, exp);
2019    blk_set_disable_request_queuing(exp->common.blk, false);
2020
2021    for (i = 0; i < exp->nr_export_bitmaps; i++) {
2022        bdrv_dirty_bitmap_set_busy(exp->export_bitmaps[i], false);
2023    }
2024}
2025
2026const BlockExportDriver blk_exp_nbd = {
2027    .type               = BLOCK_EXPORT_TYPE_NBD,
2028    .instance_size      = sizeof(NBDExport),
2029    .supports_inactive  = true,
2030    .create             = nbd_export_create,
2031    .delete             = nbd_export_delete,
2032    .request_shutdown   = nbd_export_request_shutdown,
2033};
2034
2035static int coroutine_fn nbd_co_send_iov(NBDClient *client, struct iovec *iov,
2036                                        unsigned niov, Error **errp)
2037{
2038    int ret;
2039
2040    g_assert(qemu_in_coroutine());
2041    qemu_co_mutex_lock(&client->send_lock);
2042    client->send_coroutine = qemu_coroutine_self();
2043
2044    ret = qio_channel_writev_all(client->ioc, iov, niov, errp) < 0 ? -EIO : 0;
2045
2046    client->send_coroutine = NULL;
2047    qemu_co_mutex_unlock(&client->send_lock);
2048
2049    return ret;
2050}
2051
2052static inline void set_be_simple_reply(NBDSimpleReply *reply, uint64_t error,
2053                                       uint64_t cookie)
2054{
2055    stl_be_p(&reply->magic, NBD_SIMPLE_REPLY_MAGIC);
2056    stl_be_p(&reply->error, error);
2057    stq_be_p(&reply->cookie, cookie);
2058}
2059
2060static int coroutine_fn nbd_co_send_simple_reply(NBDClient *client,
2061                                                 NBDRequest *request,
2062                                                 uint32_t error,
2063                                                 void *data,
2064                                                 uint64_t len,
2065                                                 Error **errp)
2066{
2067    NBDSimpleReply reply;
2068    int nbd_err = system_errno_to_nbd_errno(error);
2069    struct iovec iov[] = {
2070        {.iov_base = &reply, .iov_len = sizeof(reply)},
2071        {.iov_base = data, .iov_len = len}
2072    };
2073
2074    assert(!len || !nbd_err);
2075    assert(len <= NBD_MAX_BUFFER_SIZE);
2076    assert(client->mode < NBD_MODE_STRUCTURED ||
2077           (client->mode == NBD_MODE_STRUCTURED &&
2078            request->type != NBD_CMD_READ));
2079    trace_nbd_co_send_simple_reply(request->cookie, nbd_err,
2080                                   nbd_err_lookup(nbd_err), len);
2081    set_be_simple_reply(&reply, nbd_err, request->cookie);
2082
2083    return nbd_co_send_iov(client, iov, 2, errp);
2084}
2085
2086/*
2087 * Prepare the header of a reply chunk for network transmission.
2088 *
2089 * On input, @iov is partially initialized: iov[0].iov_base must point
2090 * to an uninitialized NBDReply, while the remaining @niov elements
2091 * (if any) must be ready for transmission.  This function then
2092 * populates iov[0] for transmission.
2093 */
2094static inline void set_be_chunk(NBDClient *client, struct iovec *iov,
2095                                size_t niov, uint16_t flags, uint16_t type,
2096                                NBDRequest *request)
2097{
2098    size_t i, length = 0;
2099
2100    for (i = 1; i < niov; i++) {
2101        length += iov[i].iov_len;
2102    }
2103    assert(length <= NBD_MAX_BUFFER_SIZE + sizeof(NBDStructuredReadData));
2104
2105    if (client->mode >= NBD_MODE_EXTENDED) {
2106        NBDExtendedReplyChunk *chunk = iov->iov_base;
2107
2108        iov[0].iov_len = sizeof(*chunk);
2109        stl_be_p(&chunk->magic, NBD_EXTENDED_REPLY_MAGIC);
2110        stw_be_p(&chunk->flags, flags);
2111        stw_be_p(&chunk->type, type);
2112        stq_be_p(&chunk->cookie, request->cookie);
2113        stq_be_p(&chunk->offset, request->from);
2114        stq_be_p(&chunk->length, length);
2115    } else {
2116        NBDStructuredReplyChunk *chunk = iov->iov_base;
2117
2118        iov[0].iov_len = sizeof(*chunk);
2119        stl_be_p(&chunk->magic, NBD_STRUCTURED_REPLY_MAGIC);
2120        stw_be_p(&chunk->flags, flags);
2121        stw_be_p(&chunk->type, type);
2122        stq_be_p(&chunk->cookie, request->cookie);
2123        stl_be_p(&chunk->length, length);
2124    }
2125}
2126
2127static int coroutine_fn nbd_co_send_chunk_done(NBDClient *client,
2128                                               NBDRequest *request,
2129                                               Error **errp)
2130{
2131    NBDReply hdr;
2132    struct iovec iov[] = {
2133        {.iov_base = &hdr},
2134    };
2135
2136    trace_nbd_co_send_chunk_done(request->cookie);
2137    set_be_chunk(client, iov, 1, NBD_REPLY_FLAG_DONE,
2138                 NBD_REPLY_TYPE_NONE, request);
2139    return nbd_co_send_iov(client, iov, 1, errp);
2140}
2141
2142static int coroutine_fn nbd_co_send_chunk_read(NBDClient *client,
2143                                               NBDRequest *request,
2144                                               uint64_t offset,
2145                                               void *data,
2146                                               uint64_t size,
2147                                               bool final,
2148                                               Error **errp)
2149{
2150    NBDReply hdr;
2151    NBDStructuredReadData chunk;
2152    struct iovec iov[] = {
2153        {.iov_base = &hdr},
2154        {.iov_base = &chunk, .iov_len = sizeof(chunk)},
2155        {.iov_base = data, .iov_len = size}
2156    };
2157
2158    assert(size && size <= NBD_MAX_BUFFER_SIZE);
2159    trace_nbd_co_send_chunk_read(request->cookie, offset, data, size);
2160    set_be_chunk(client, iov, 3, final ? NBD_REPLY_FLAG_DONE : 0,
2161                 NBD_REPLY_TYPE_OFFSET_DATA, request);
2162    stq_be_p(&chunk.offset, offset);
2163
2164    return nbd_co_send_iov(client, iov, 3, errp);
2165}
2166
2167static int coroutine_fn nbd_co_send_chunk_error(NBDClient *client,
2168                                                NBDRequest *request,
2169                                                uint32_t error,
2170                                                const char *msg,
2171                                                Error **errp)
2172{
2173    NBDReply hdr;
2174    NBDStructuredError chunk;
2175    int nbd_err = system_errno_to_nbd_errno(error);
2176    struct iovec iov[] = {
2177        {.iov_base = &hdr},
2178        {.iov_base = &chunk, .iov_len = sizeof(chunk)},
2179        {.iov_base = (char *)msg, .iov_len = msg ? strlen(msg) : 0},
2180    };
2181
2182    assert(nbd_err);
2183    trace_nbd_co_send_chunk_error(request->cookie, nbd_err,
2184                                  nbd_err_lookup(nbd_err), msg ? msg : "");
2185    set_be_chunk(client, iov, 3, NBD_REPLY_FLAG_DONE,
2186                 NBD_REPLY_TYPE_ERROR, request);
2187    stl_be_p(&chunk.error, nbd_err);
2188    stw_be_p(&chunk.message_length, iov[2].iov_len);
2189
2190    return nbd_co_send_iov(client, iov, 3, errp);
2191}
2192
2193/* Do a sparse read and send the structured reply to the client.
2194 * Returns -errno if sending fails. blk_co_block_status_above() failure is
2195 * reported to the client, at which point this function succeeds.
2196 */
2197static int coroutine_fn nbd_co_send_sparse_read(NBDClient *client,
2198                                                NBDRequest *request,
2199                                                uint64_t offset,
2200                                                uint8_t *data,
2201                                                uint64_t size,
2202                                                Error **errp)
2203{
2204    int ret = 0;
2205    NBDExport *exp = client->exp;
2206    size_t progress = 0;
2207
2208    assert(size <= NBD_MAX_BUFFER_SIZE);
2209    while (progress < size) {
2210        int64_t pnum;
2211        int status = blk_co_block_status_above(exp->common.blk, NULL,
2212                                               offset + progress,
2213                                               size - progress, &pnum, NULL,
2214                                               NULL);
2215        bool final;
2216
2217        if (status < 0) {
2218            char *msg = g_strdup_printf("unable to check for holes: %s",
2219                                        strerror(-status));
2220
2221            ret = nbd_co_send_chunk_error(client, request, -status, msg, errp);
2222            g_free(msg);
2223            return ret;
2224        }
2225        assert(pnum && pnum <= size - progress);
2226        final = progress + pnum == size;
2227        if (status & BDRV_BLOCK_ZERO) {
2228            NBDReply hdr;
2229            NBDStructuredReadHole chunk;
2230            struct iovec iov[] = {
2231                {.iov_base = &hdr},
2232                {.iov_base = &chunk, .iov_len = sizeof(chunk)},
2233            };
2234
2235            trace_nbd_co_send_chunk_read_hole(request->cookie,
2236                                              offset + progress, pnum);
2237            set_be_chunk(client, iov, 2,
2238                         final ? NBD_REPLY_FLAG_DONE : 0,
2239                         NBD_REPLY_TYPE_OFFSET_HOLE, request);
2240            stq_be_p(&chunk.offset, offset + progress);
2241            stl_be_p(&chunk.length, pnum);
2242            ret = nbd_co_send_iov(client, iov, 2, errp);
2243        } else {
2244            ret = blk_co_pread(exp->common.blk, offset + progress, pnum,
2245                               data + progress, 0);
2246            if (ret < 0) {
2247                error_setg_errno(errp, -ret, "reading from file failed");
2248                break;
2249            }
2250            ret = nbd_co_send_chunk_read(client, request, offset + progress,
2251                                         data + progress, pnum, final, errp);
2252        }
2253
2254        if (ret < 0) {
2255            break;
2256        }
2257        progress += pnum;
2258    }
2259    return ret;
2260}
2261
2262typedef struct NBDExtentArray {
2263    NBDExtent64 *extents;
2264    unsigned int nb_alloc;
2265    unsigned int count;
2266    uint64_t total_length;
2267    bool extended;
2268    bool can_add;
2269    bool converted_to_be;
2270} NBDExtentArray;
2271
2272static NBDExtentArray *nbd_extent_array_new(unsigned int nb_alloc,
2273                                            NBDMode mode)
2274{
2275    NBDExtentArray *ea = g_new0(NBDExtentArray, 1);
2276
2277    assert(mode >= NBD_MODE_STRUCTURED);
2278    ea->nb_alloc = nb_alloc;
2279    ea->extents = g_new(NBDExtent64, nb_alloc);
2280    ea->extended = mode >= NBD_MODE_EXTENDED;
2281    ea->can_add = true;
2282
2283    return ea;
2284}
2285
2286static void nbd_extent_array_free(NBDExtentArray *ea)
2287{
2288    g_free(ea->extents);
2289    g_free(ea);
2290}
2291G_DEFINE_AUTOPTR_CLEANUP_FUNC(NBDExtentArray, nbd_extent_array_free)
2292
2293/* Further modifications of the array after conversion are abandoned */
2294static void nbd_extent_array_convert_to_be(NBDExtentArray *ea)
2295{
2296    int i;
2297
2298    assert(!ea->converted_to_be);
2299    assert(ea->extended);
2300    ea->can_add = false;
2301    ea->converted_to_be = true;
2302
2303    for (i = 0; i < ea->count; i++) {
2304        ea->extents[i].length = cpu_to_be64(ea->extents[i].length);
2305        ea->extents[i].flags = cpu_to_be64(ea->extents[i].flags);
2306    }
2307}
2308
2309/* Further modifications of the array after conversion are abandoned */
2310static NBDExtent32 *nbd_extent_array_convert_to_narrow(NBDExtentArray *ea)
2311{
2312    int i;
2313    NBDExtent32 *extents = g_new(NBDExtent32, ea->count);
2314
2315    assert(!ea->converted_to_be);
2316    assert(!ea->extended);
2317    ea->can_add = false;
2318    ea->converted_to_be = true;
2319
2320    for (i = 0; i < ea->count; i++) {
2321        assert((ea->extents[i].length | ea->extents[i].flags) <= UINT32_MAX);
2322        extents[i].length = cpu_to_be32(ea->extents[i].length);
2323        extents[i].flags = cpu_to_be32(ea->extents[i].flags);
2324    }
2325
2326    return extents;
2327}
2328
2329/*
2330 * Add extent to NBDExtentArray. If extent can't be added (no available space),
2331 * return -1.
2332 * For safety, when returning -1 for the first time, .can_add is set to false,
2333 * and further calls to nbd_extent_array_add() will crash.
2334 * (this avoids the situation where a caller ignores failure to add one extent,
2335 * where adding another extent that would squash into the last array entry
2336 * would result in an incorrect range reported to the client)
2337 */
2338static int nbd_extent_array_add(NBDExtentArray *ea,
2339                                uint64_t length, uint32_t flags)
2340{
2341    assert(ea->can_add);
2342
2343    if (!length) {
2344        return 0;
2345    }
2346    if (!ea->extended) {
2347        assert(length <= UINT32_MAX);
2348    }
2349
2350    /* Extend previous extent if flags are the same */
2351    if (ea->count > 0 && flags == ea->extents[ea->count - 1].flags) {
2352        uint64_t sum = length + ea->extents[ea->count - 1].length;
2353
2354        /*
2355         * sum cannot overflow: the block layer bounds image size at
2356         * 2^63, and ea->extents[].length comes from the block layer.
2357         */
2358        assert(sum >= length);
2359        if (sum <= UINT32_MAX || ea->extended) {
2360            ea->extents[ea->count - 1].length = sum;
2361            ea->total_length += length;
2362            return 0;
2363        }
2364    }
2365
2366    if (ea->count >= ea->nb_alloc) {
2367        ea->can_add = false;
2368        return -1;
2369    }
2370
2371    ea->total_length += length;
2372    ea->extents[ea->count] = (NBDExtent64) {.length = length, .flags = flags};
2373    ea->count++;
2374
2375    return 0;
2376}
2377
2378static int coroutine_fn blockstatus_to_extents(BlockBackend *blk,
2379                                               uint64_t offset, uint64_t bytes,
2380                                               NBDExtentArray *ea)
2381{
2382    while (bytes) {
2383        uint32_t flags;
2384        int64_t num;
2385        int ret = blk_co_block_status_above(blk, NULL, offset, bytes, &num,
2386                                            NULL, NULL);
2387
2388        if (ret < 0) {
2389            return ret;
2390        }
2391
2392        flags = (ret & BDRV_BLOCK_DATA ? 0 : NBD_STATE_HOLE) |
2393                (ret & BDRV_BLOCK_ZERO ? NBD_STATE_ZERO : 0);
2394
2395        if (nbd_extent_array_add(ea, num, flags) < 0) {
2396            return 0;
2397        }
2398
2399        offset += num;
2400        bytes -= num;
2401    }
2402
2403    return 0;
2404}
2405
2406static int coroutine_fn blockalloc_to_extents(BlockBackend *blk,
2407                                              uint64_t offset, uint64_t bytes,
2408                                              NBDExtentArray *ea)
2409{
2410    while (bytes) {
2411        int64_t num;
2412        int ret = blk_co_is_allocated_above(blk, NULL, false, offset, bytes,
2413                                            &num);
2414
2415        if (ret < 0) {
2416            return ret;
2417        }
2418
2419        if (nbd_extent_array_add(ea, num, ret) < 0) {
2420            return 0;
2421        }
2422
2423        offset += num;
2424        bytes -= num;
2425    }
2426
2427    return 0;
2428}
2429
2430/*
2431 * nbd_co_send_extents
2432 *
2433 * @ea is converted to BE by the function
2434 * @last controls whether NBD_REPLY_FLAG_DONE is sent.
2435 */
2436static int coroutine_fn
2437nbd_co_send_extents(NBDClient *client, NBDRequest *request, NBDExtentArray *ea,
2438                    bool last, uint32_t context_id, Error **errp)
2439{
2440    NBDReply hdr;
2441    NBDStructuredMeta meta;
2442    NBDExtendedMeta meta_ext;
2443    g_autofree NBDExtent32 *extents = NULL;
2444    uint16_t type;
2445    struct iovec iov[] = { {.iov_base = &hdr}, {0}, {0} };
2446
2447    if (client->mode >= NBD_MODE_EXTENDED) {
2448        type = NBD_REPLY_TYPE_BLOCK_STATUS_EXT;
2449
2450        iov[1].iov_base = &meta_ext;
2451        iov[1].iov_len = sizeof(meta_ext);
2452        stl_be_p(&meta_ext.context_id, context_id);
2453        stl_be_p(&meta_ext.count, ea->count);
2454
2455        nbd_extent_array_convert_to_be(ea);
2456        iov[2].iov_base = ea->extents;
2457        iov[2].iov_len = ea->count * sizeof(ea->extents[0]);
2458    } else {
2459        type = NBD_REPLY_TYPE_BLOCK_STATUS;
2460
2461        iov[1].iov_base = &meta;
2462        iov[1].iov_len = sizeof(meta);
2463        stl_be_p(&meta.context_id, context_id);
2464
2465        extents = nbd_extent_array_convert_to_narrow(ea);
2466        iov[2].iov_base = extents;
2467        iov[2].iov_len = ea->count * sizeof(extents[0]);
2468    }
2469
2470    trace_nbd_co_send_extents(request->cookie, ea->count, context_id,
2471                              ea->total_length, last);
2472    set_be_chunk(client, iov, 3, last ? NBD_REPLY_FLAG_DONE : 0, type,
2473                 request);
2474
2475    return nbd_co_send_iov(client, iov, 3, errp);
2476}
2477
2478/* Get block status from the exported device and send it to the client */
2479static int
2480coroutine_fn nbd_co_send_block_status(NBDClient *client, NBDRequest *request,
2481                                      BlockBackend *blk, uint64_t offset,
2482                                      uint64_t length, bool dont_fragment,
2483                                      bool last, uint32_t context_id,
2484                                      Error **errp)
2485{
2486    int ret;
2487    unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
2488    g_autoptr(NBDExtentArray) ea =
2489        nbd_extent_array_new(nb_extents, client->mode);
2490
2491    if (context_id == NBD_META_ID_BASE_ALLOCATION) {
2492        ret = blockstatus_to_extents(blk, offset, length, ea);
2493    } else {
2494        ret = blockalloc_to_extents(blk, offset, length, ea);
2495    }
2496    if (ret < 0) {
2497        return nbd_co_send_chunk_error(client, request, -ret,
2498                                       "can't get block status", errp);
2499    }
2500
2501    return nbd_co_send_extents(client, request, ea, last, context_id, errp);
2502}
2503
2504/* Populate @ea from a dirty bitmap. */
2505static void bitmap_to_extents(BdrvDirtyBitmap *bitmap,
2506                              uint64_t offset, uint64_t length,
2507                              NBDExtentArray *es)
2508{
2509    int64_t start, dirty_start, dirty_count;
2510    int64_t end = offset + length;
2511    bool full = false;
2512    int64_t bound = es->extended ? INT64_MAX : INT32_MAX;
2513
2514    bdrv_dirty_bitmap_lock(bitmap);
2515
2516    for (start = offset;
2517         bdrv_dirty_bitmap_next_dirty_area(bitmap, start, end, bound,
2518                                           &dirty_start, &dirty_count);
2519         start = dirty_start + dirty_count)
2520    {
2521        if ((nbd_extent_array_add(es, dirty_start - start, 0) < 0) ||
2522            (nbd_extent_array_add(es, dirty_count, NBD_STATE_DIRTY) < 0))
2523        {
2524            full = true;
2525            break;
2526        }
2527    }
2528
2529    if (!full) {
2530        /* last non dirty extent, nothing to do if array is now full */
2531        (void) nbd_extent_array_add(es, end - start, 0);
2532    }
2533
2534    bdrv_dirty_bitmap_unlock(bitmap);
2535}
2536
2537static int coroutine_fn nbd_co_send_bitmap(NBDClient *client,
2538                                           NBDRequest *request,
2539                                           BdrvDirtyBitmap *bitmap,
2540                                           uint64_t offset,
2541                                           uint64_t length, bool dont_fragment,
2542                                           bool last, uint32_t context_id,
2543                                           Error **errp)
2544{
2545    unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
2546    g_autoptr(NBDExtentArray) ea =
2547        nbd_extent_array_new(nb_extents, client->mode);
2548
2549    bitmap_to_extents(bitmap, offset, length, ea);
2550
2551    return nbd_co_send_extents(client, request, ea, last, context_id, errp);
2552}
2553
2554/*
2555 * nbd_co_block_status_payload_read
2556 * Called when a client wants a subset of negotiated contexts via a
2557 * BLOCK_STATUS payload.  Check the payload for valid length and
2558 * contents.  On success, return 0 with request updated to effective
2559 * length.  If request was invalid but all payload consumed, return 0
2560 * with request->len and request->contexts->count set to 0 (which will
2561 * trigger an appropriate NBD_EINVAL response later on).  Return
2562 * negative errno if the payload was not fully consumed.
2563 */
2564static int
2565nbd_co_block_status_payload_read(NBDClient *client, NBDRequest *request,
2566                                 Error **errp)
2567{
2568    uint64_t payload_len = request->len;
2569    g_autofree char *buf = NULL;
2570    size_t count, i, nr_bitmaps;
2571    uint32_t id;
2572
2573    if (payload_len > NBD_MAX_BUFFER_SIZE) {
2574        error_setg(errp, "len (%" PRIu64 ") is larger than max len (%u)",
2575                   request->len, NBD_MAX_BUFFER_SIZE);
2576        return -EINVAL;
2577    }
2578
2579    assert(client->contexts.exp == client->exp);
2580    nr_bitmaps = client->exp->nr_export_bitmaps;
2581    request->contexts = g_new0(NBDMetaContexts, 1);
2582    request->contexts->exp = client->exp;
2583
2584    if (payload_len % sizeof(uint32_t) ||
2585        payload_len < sizeof(NBDBlockStatusPayload) ||
2586        payload_len > (sizeof(NBDBlockStatusPayload) +
2587                       sizeof(id) * client->contexts.count)) {
2588        goto skip;
2589    }
2590
2591    buf = g_malloc(payload_len);
2592    if (nbd_read(client->ioc, buf, payload_len,
2593                 "CMD_BLOCK_STATUS data", errp) < 0) {
2594        return -EIO;
2595    }
2596    trace_nbd_co_receive_request_payload_received(request->cookie,
2597                                                  payload_len);
2598    request->contexts->bitmaps = g_new0(bool, nr_bitmaps);
2599    count = (payload_len - sizeof(NBDBlockStatusPayload)) / sizeof(id);
2600    payload_len = 0;
2601
2602    for (i = 0; i < count; i++) {
2603        id = ldl_be_p(buf + sizeof(NBDBlockStatusPayload) + sizeof(id) * i);
2604        if (id == NBD_META_ID_BASE_ALLOCATION) {
2605            if (!client->contexts.base_allocation ||
2606                request->contexts->base_allocation) {
2607                goto skip;
2608            }
2609            request->contexts->base_allocation = true;
2610        } else if (id == NBD_META_ID_ALLOCATION_DEPTH) {
2611            if (!client->contexts.allocation_depth ||
2612                request->contexts->allocation_depth) {
2613                goto skip;
2614            }
2615            request->contexts->allocation_depth = true;
2616        } else {
2617            unsigned idx = id - NBD_META_ID_DIRTY_BITMAP;
2618
2619            if (idx >= nr_bitmaps || !client->contexts.bitmaps[idx] ||
2620                request->contexts->bitmaps[idx]) {
2621                goto skip;
2622            }
2623            request->contexts->bitmaps[idx] = true;
2624        }
2625    }
2626
2627    request->len = ldq_be_p(buf);
2628    request->contexts->count = count;
2629    return 0;
2630
2631 skip:
2632    trace_nbd_co_receive_block_status_payload_compliance(request->from,
2633                                                         request->len);
2634    request->len = request->contexts->count = 0;
2635    return nbd_drop(client->ioc, payload_len, errp);
2636}
2637
2638/* nbd_co_receive_request
2639 * Collect a client request. Return 0 if request looks valid, -EIO to drop
2640 * connection right away, -EAGAIN to indicate we were interrupted and the
2641 * channel should be quiesced, and any other negative value to report an error
2642 * to the client (although the caller may still need to disconnect after
2643 * reporting the error).
2644 */
2645static int coroutine_fn nbd_co_receive_request(NBDRequestData *req,
2646                                               NBDRequest *request,
2647                                               Error **errp)
2648{
2649    NBDClient *client = req->client;
2650    bool extended_with_payload;
2651    bool check_length = false;
2652    bool check_rofs = false;
2653    bool allocate_buffer = false;
2654    bool payload_okay = false;
2655    uint64_t payload_len = 0;
2656    int valid_flags = NBD_CMD_FLAG_FUA;
2657    int ret;
2658
2659    g_assert(qemu_in_coroutine());
2660    ret = nbd_receive_request(client, request, errp);
2661    if (ret < 0) {
2662        return ret;
2663    }
2664
2665    trace_nbd_co_receive_request_decode_type(request->cookie, request->type,
2666                                             nbd_cmd_lookup(request->type));
2667    extended_with_payload = client->mode >= NBD_MODE_EXTENDED &&
2668        request->flags & NBD_CMD_FLAG_PAYLOAD_LEN;
2669    if (extended_with_payload) {
2670        payload_len = request->len;
2671        check_length = true;
2672    }
2673
2674    switch (request->type) {
2675    case NBD_CMD_DISC:
2676        /* Special case: we're going to disconnect without a reply,
2677         * whether or not flags, from, or len are bogus */
2678        req->complete = true;
2679        return -EIO;
2680
2681    case NBD_CMD_READ:
2682        if (client->mode >= NBD_MODE_STRUCTURED) {
2683            valid_flags |= NBD_CMD_FLAG_DF;
2684        }
2685        check_length = true;
2686        allocate_buffer = true;
2687        break;
2688
2689    case NBD_CMD_WRITE:
2690        if (client->mode >= NBD_MODE_EXTENDED) {
2691            if (!extended_with_payload) {
2692                /* The client is noncompliant. Trace it, but proceed. */
2693                trace_nbd_co_receive_ext_payload_compliance(request->from,
2694                                                            request->len);
2695            }
2696            valid_flags |= NBD_CMD_FLAG_PAYLOAD_LEN;
2697        }
2698        payload_okay = true;
2699        payload_len = request->len;
2700        check_length = true;
2701        allocate_buffer = true;
2702        check_rofs = true;
2703        break;
2704
2705    case NBD_CMD_FLUSH:
2706        break;
2707
2708    case NBD_CMD_TRIM:
2709        check_rofs = true;
2710        break;
2711
2712    case NBD_CMD_CACHE:
2713        check_length = true;
2714        break;
2715
2716    case NBD_CMD_WRITE_ZEROES:
2717        valid_flags |= NBD_CMD_FLAG_NO_HOLE | NBD_CMD_FLAG_FAST_ZERO;
2718        check_rofs = true;
2719        break;
2720
2721    case NBD_CMD_BLOCK_STATUS:
2722        if (extended_with_payload) {
2723            ret = nbd_co_block_status_payload_read(client, request, errp);
2724            if (ret < 0) {
2725                return ret;
2726            }
2727            /* payload now consumed */
2728            check_length = false;
2729            payload_len = 0;
2730            valid_flags |= NBD_CMD_FLAG_PAYLOAD_LEN;
2731        } else {
2732            request->contexts = &client->contexts;
2733        }
2734        valid_flags |= NBD_CMD_FLAG_REQ_ONE;
2735        break;
2736
2737    default:
2738        /* Unrecognized, will fail later */
2739        ;
2740    }
2741
2742    /* Payload and buffer handling. */
2743    if (!payload_len) {
2744        req->complete = true;
2745    }
2746    if (check_length && request->len > NBD_MAX_BUFFER_SIZE) {
2747        /* READ, WRITE, CACHE */
2748        error_setg(errp, "len (%" PRIu64 ") is larger than max len (%u)",
2749                   request->len, NBD_MAX_BUFFER_SIZE);
2750        return -EINVAL;
2751    }
2752    if (payload_len && !payload_okay) {
2753        /*
2754         * For now, we don't support payloads on other commands; but
2755         * we can keep the connection alive by ignoring the payload.
2756         * We will fail the command later with NBD_EINVAL for the use
2757         * of an unsupported flag (and not for access beyond bounds).
2758         */
2759        assert(request->type != NBD_CMD_WRITE);
2760        request->len = 0;
2761    }
2762    if (allocate_buffer) {
2763        /* READ, WRITE */
2764        req->data = blk_try_blockalign(client->exp->common.blk,
2765                                       request->len);
2766        if (req->data == NULL) {
2767            error_setg(errp, "No memory");
2768            return -ENOMEM;
2769        }
2770    }
2771    if (payload_len) {
2772        if (payload_okay) {
2773            /* WRITE */
2774            assert(req->data);
2775            ret = nbd_read(client->ioc, req->data, payload_len,
2776                           "CMD_WRITE data", errp);
2777        } else {
2778            ret = nbd_drop(client->ioc, payload_len, errp);
2779        }
2780        if (ret < 0) {
2781            return -EIO;
2782        }
2783        req->complete = true;
2784        trace_nbd_co_receive_request_payload_received(request->cookie,
2785                                                      payload_len);
2786    }
2787
2788    /* Sanity checks. */
2789    if (client->exp->nbdflags & NBD_FLAG_READ_ONLY && check_rofs) {
2790        /* WRITE, TRIM, WRITE_ZEROES */
2791        error_setg(errp, "Export is read-only");
2792        return -EROFS;
2793    }
2794    if (request->from > client->exp->size ||
2795        request->len > client->exp->size - request->from) {
2796        error_setg(errp, "operation past EOF; From: %" PRIu64 ", Len: %" PRIu64
2797                   ", Size: %" PRIu64, request->from, request->len,
2798                   client->exp->size);
2799        return (request->type == NBD_CMD_WRITE ||
2800                request->type == NBD_CMD_WRITE_ZEROES) ? -ENOSPC : -EINVAL;
2801    }
2802    if (client->check_align && !QEMU_IS_ALIGNED(request->from | request->len,
2803                                                client->check_align)) {
2804        /*
2805         * The block layer gracefully handles unaligned requests, but
2806         * it's still worth tracing client non-compliance
2807         */
2808        trace_nbd_co_receive_align_compliance(nbd_cmd_lookup(request->type),
2809                                              request->from,
2810                                              request->len,
2811                                              client->check_align);
2812    }
2813    if (request->flags & ~valid_flags) {
2814        error_setg(errp, "unsupported flags for command %s (got 0x%x)",
2815                   nbd_cmd_lookup(request->type), request->flags);
2816        return -EINVAL;
2817    }
2818
2819    return 0;
2820}
2821
2822/* Send simple reply without a payload, or a structured error
2823 * @error_msg is ignored if @ret >= 0
2824 * Returns 0 if connection is still live, -errno on failure to talk to client
2825 */
2826static coroutine_fn int nbd_send_generic_reply(NBDClient *client,
2827                                               NBDRequest *request,
2828                                               int ret,
2829                                               const char *error_msg,
2830                                               Error **errp)
2831{
2832    if (client->mode >= NBD_MODE_STRUCTURED && ret < 0) {
2833        return nbd_co_send_chunk_error(client, request, -ret, error_msg, errp);
2834    } else if (client->mode >= NBD_MODE_EXTENDED) {
2835        return nbd_co_send_chunk_done(client, request, errp);
2836    } else {
2837        return nbd_co_send_simple_reply(client, request, ret < 0 ? -ret : 0,
2838                                        NULL, 0, errp);
2839    }
2840}
2841
2842/* Handle NBD_CMD_READ request.
2843 * Return -errno if sending fails. Other errors are reported directly to the
2844 * client as an error reply. */
2845static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
2846                                        uint8_t *data, Error **errp)
2847{
2848    int ret;
2849    NBDExport *exp = client->exp;
2850
2851    assert(request->type == NBD_CMD_READ);
2852    assert(request->len <= NBD_MAX_BUFFER_SIZE);
2853
2854    /* XXX: NBD Protocol only documents use of FUA with WRITE */
2855    if (request->flags & NBD_CMD_FLAG_FUA) {
2856        ret = blk_co_flush(exp->common.blk);
2857        if (ret < 0) {
2858            return nbd_send_generic_reply(client, request, ret,
2859                                          "flush failed", errp);
2860        }
2861    }
2862
2863    if (client->mode >= NBD_MODE_STRUCTURED &&
2864        !(request->flags & NBD_CMD_FLAG_DF) && request->len)
2865    {
2866        return nbd_co_send_sparse_read(client, request, request->from,
2867                                       data, request->len, errp);
2868    }
2869
2870    ret = blk_co_pread(exp->common.blk, request->from, request->len, data, 0);
2871    if (ret < 0) {
2872        return nbd_send_generic_reply(client, request, ret,
2873                                      "reading from file failed", errp);
2874    }
2875
2876    if (client->mode >= NBD_MODE_STRUCTURED) {
2877        if (request->len) {
2878            return nbd_co_send_chunk_read(client, request, request->from, data,
2879                                          request->len, true, errp);
2880        } else {
2881            return nbd_co_send_chunk_done(client, request, errp);
2882        }
2883    } else {
2884        return nbd_co_send_simple_reply(client, request, 0,
2885                                        data, request->len, errp);
2886    }
2887}
2888
2889/*
2890 * nbd_do_cmd_cache
2891 *
2892 * Handle NBD_CMD_CACHE request.
2893 * Return -errno if sending fails. Other errors are reported directly to the
2894 * client as an error reply.
2895 */
2896static coroutine_fn int nbd_do_cmd_cache(NBDClient *client, NBDRequest *request,
2897                                         Error **errp)
2898{
2899    int ret;
2900    NBDExport *exp = client->exp;
2901
2902    assert(request->type == NBD_CMD_CACHE);
2903    assert(request->len <= NBD_MAX_BUFFER_SIZE);
2904
2905    ret = blk_co_preadv(exp->common.blk, request->from, request->len,
2906                        NULL, BDRV_REQ_COPY_ON_READ | BDRV_REQ_PREFETCH);
2907
2908    return nbd_send_generic_reply(client, request, ret,
2909                                  "caching data failed", errp);
2910}
2911
2912/* Handle NBD request.
2913 * Return -errno if sending fails. Other errors are reported directly to the
2914 * client as an error reply. */
2915static coroutine_fn int nbd_handle_request(NBDClient *client,
2916                                           NBDRequest *request,
2917                                           uint8_t *data, Error **errp)
2918{
2919    int ret;
2920    int flags;
2921    NBDExport *exp = client->exp;
2922    char *msg;
2923    size_t i;
2924    bool inactive;
2925
2926    WITH_GRAPH_RDLOCK_GUARD() {
2927        inactive = bdrv_is_inactive(blk_bs(exp->common.blk));
2928        if (inactive) {
2929            switch (request->type) {
2930            case NBD_CMD_READ:
2931                /* These commands are allowed on inactive nodes */
2932                break;
2933            default:
2934                /* Return an error for the rest */
2935                return nbd_send_generic_reply(client, request, -EPERM,
2936                                              "export is inactive", errp);
2937            }
2938        }
2939    }
2940
2941    switch (request->type) {
2942    case NBD_CMD_CACHE:
2943        return nbd_do_cmd_cache(client, request, errp);
2944
2945    case NBD_CMD_READ:
2946        return nbd_do_cmd_read(client, request, data, errp);
2947
2948    case NBD_CMD_WRITE:
2949        flags = 0;
2950        if (request->flags & NBD_CMD_FLAG_FUA) {
2951            flags |= BDRV_REQ_FUA;
2952        }
2953        assert(request->len <= NBD_MAX_BUFFER_SIZE);
2954        ret = blk_co_pwrite(exp->common.blk, request->from, request->len, data,
2955                            flags);
2956        return nbd_send_generic_reply(client, request, ret,
2957                                      "writing to file failed", errp);
2958
2959    case NBD_CMD_WRITE_ZEROES:
2960        flags = 0;
2961        if (request->flags & NBD_CMD_FLAG_FUA) {
2962            flags |= BDRV_REQ_FUA;
2963        }
2964        if (!(request->flags & NBD_CMD_FLAG_NO_HOLE)) {
2965            flags |= BDRV_REQ_MAY_UNMAP;
2966        }
2967        if (request->flags & NBD_CMD_FLAG_FAST_ZERO) {
2968            flags |= BDRV_REQ_NO_FALLBACK;
2969        }
2970        ret = blk_co_pwrite_zeroes(exp->common.blk, request->from, request->len,
2971                                   flags);
2972        return nbd_send_generic_reply(client, request, ret,
2973                                      "writing to file failed", errp);
2974
2975    case NBD_CMD_DISC:
2976        /* unreachable, thanks to special case in nbd_co_receive_request() */
2977        abort();
2978
2979    case NBD_CMD_FLUSH:
2980        ret = blk_co_flush(exp->common.blk);
2981        return nbd_send_generic_reply(client, request, ret,
2982                                      "flush failed", errp);
2983
2984    case NBD_CMD_TRIM:
2985        ret = blk_co_pdiscard(exp->common.blk, request->from, request->len);
2986        if (ret >= 0 && request->flags & NBD_CMD_FLAG_FUA) {
2987            ret = blk_co_flush(exp->common.blk);
2988        }
2989        return nbd_send_generic_reply(client, request, ret,
2990                                      "discard failed", errp);
2991
2992    case NBD_CMD_BLOCK_STATUS:
2993        assert(request->contexts);
2994        assert(client->mode >= NBD_MODE_EXTENDED ||
2995               request->len <= UINT32_MAX);
2996        if (request->contexts->count) {
2997            bool dont_fragment = request->flags & NBD_CMD_FLAG_REQ_ONE;
2998            int contexts_remaining = request->contexts->count;
2999
3000            if (!request->len) {
3001                return nbd_send_generic_reply(client, request, -EINVAL,
3002                                              "need non-zero length", errp);
3003            }
3004            if (request->contexts->base_allocation) {
3005                ret = nbd_co_send_block_status(client, request,
3006                                               exp->common.blk,
3007                                               request->from,
3008                                               request->len, dont_fragment,
3009                                               !--contexts_remaining,
3010                                               NBD_META_ID_BASE_ALLOCATION,
3011                                               errp);
3012                if (ret < 0) {
3013                    return ret;
3014                }
3015            }
3016
3017            if (request->contexts->allocation_depth) {
3018                ret = nbd_co_send_block_status(client, request,
3019                                               exp->common.blk,
3020                                               request->from, request->len,
3021                                               dont_fragment,
3022                                               !--contexts_remaining,
3023                                               NBD_META_ID_ALLOCATION_DEPTH,
3024                                               errp);
3025                if (ret < 0) {
3026                    return ret;
3027                }
3028            }
3029
3030            assert(request->contexts->exp == client->exp);
3031            for (i = 0; i < client->exp->nr_export_bitmaps; i++) {
3032                if (!request->contexts->bitmaps[i]) {
3033                    continue;
3034                }
3035                ret = nbd_co_send_bitmap(client, request,
3036                                         client->exp->export_bitmaps[i],
3037                                         request->from, request->len,
3038                                         dont_fragment, !--contexts_remaining,
3039                                         NBD_META_ID_DIRTY_BITMAP + i, errp);
3040                if (ret < 0) {
3041                    return ret;
3042                }
3043            }
3044
3045            assert(!contexts_remaining);
3046
3047            return 0;
3048        } else if (client->contexts.count) {
3049            return nbd_send_generic_reply(client, request, -EINVAL,
3050                                          "CMD_BLOCK_STATUS payload not valid",
3051                                          errp);
3052        } else {
3053            return nbd_send_generic_reply(client, request, -EINVAL,
3054                                          "CMD_BLOCK_STATUS not negotiated",
3055                                          errp);
3056        }
3057
3058    default:
3059        msg = g_strdup_printf("invalid request type (%" PRIu32 ") received",
3060                              request->type);
3061        ret = nbd_send_generic_reply(client, request, -EINVAL, msg,
3062                                     errp);
3063        g_free(msg);
3064        return ret;
3065    }
3066}
3067
3068/* Owns a reference to the NBDClient passed as opaque.  */
3069static coroutine_fn void nbd_trip(void *opaque)
3070{
3071    NBDRequestData *req = opaque;
3072    NBDClient *client = req->client;
3073    NBDRequest request = { 0 };    /* GCC thinks it can be used uninitialized */
3074    int ret;
3075    Error *local_err = NULL;
3076
3077    /*
3078     * Note that nbd_client_put() and client_close() must be called from the
3079     * main loop thread. Use aio_co_reschedule_self() to switch AioContext
3080     * before calling these functions.
3081     */
3082
3083    trace_nbd_trip();
3084
3085    qemu_mutex_lock(&client->lock);
3086
3087    if (client->closing) {
3088        goto done;
3089    }
3090
3091    if (client->quiescing) {
3092        /*
3093         * We're switching between AIO contexts. Don't attempt to receive a new
3094         * request and kick the main context which may be waiting for us.
3095         */
3096        client->recv_coroutine = NULL;
3097        aio_wait_kick();
3098        goto done;
3099    }
3100
3101    /*
3102     * nbd_co_receive_request() returns -EAGAIN when nbd_drained_begin() has
3103     * set client->quiescing but by the time we get back nbd_drained_end() may
3104     * have already cleared client->quiescing. In that case we try again
3105     * because nothing else will spawn an nbd_trip() coroutine until we set
3106     * client->recv_coroutine = NULL further down.
3107     */
3108    do {
3109        assert(client->recv_coroutine == qemu_coroutine_self());
3110        qemu_mutex_unlock(&client->lock);
3111        ret = nbd_co_receive_request(req, &request, &local_err);
3112        qemu_mutex_lock(&client->lock);
3113    } while (ret == -EAGAIN && !client->quiescing);
3114
3115    client->recv_coroutine = NULL;
3116
3117    if (client->closing) {
3118        /*
3119         * The client may be closed when we are blocked in
3120         * nbd_co_receive_request()
3121         */
3122        goto done;
3123    }
3124
3125    if (ret == -EAGAIN) {
3126        goto done;
3127    }
3128
3129    nbd_client_receive_next_request(client);
3130
3131    if (ret == -EIO) {
3132        goto disconnect;
3133    }
3134
3135    qemu_mutex_unlock(&client->lock);
3136    qio_channel_set_cork(client->ioc, true);
3137
3138    if (ret < 0) {
3139        /* It wasn't -EIO, so, according to nbd_co_receive_request()
3140         * semantics, we should return the error to the client. */
3141        Error *export_err = local_err;
3142
3143        local_err = NULL;
3144        ret = nbd_send_generic_reply(client, &request, -EINVAL,
3145                                     error_get_pretty(export_err), &local_err);
3146        error_free(export_err);
3147    } else {
3148        ret = nbd_handle_request(client, &request, req->data, &local_err);
3149    }
3150    if (request.contexts && request.contexts != &client->contexts) {
3151        assert(request.type == NBD_CMD_BLOCK_STATUS);
3152        g_free(request.contexts->bitmaps);
3153        g_free(request.contexts);
3154    }
3155
3156    qio_channel_set_cork(client->ioc, false);
3157    qemu_mutex_lock(&client->lock);
3158
3159    if (ret < 0) {
3160        error_prepend(&local_err, "Failed to send reply: ");
3161        goto disconnect;
3162    }
3163
3164    /*
3165     * We must disconnect after NBD_CMD_WRITE or BLOCK_STATUS with
3166     * payload if we did not read the payload.
3167     */
3168    if (!req->complete) {
3169        error_setg(&local_err, "Request handling failed in intermediate state");
3170        goto disconnect;
3171    }
3172
3173done:
3174    nbd_request_put(req);
3175
3176    qemu_mutex_unlock(&client->lock);
3177
3178    if (!nbd_client_put_nonzero(client)) {
3179        aio_co_reschedule_self(qemu_get_aio_context());
3180        nbd_client_put(client);
3181    }
3182    return;
3183
3184disconnect:
3185    if (local_err) {
3186        error_reportf_err(local_err, "Disconnect client, due to: ");
3187    }
3188
3189    nbd_request_put(req);
3190    qemu_mutex_unlock(&client->lock);
3191
3192    aio_co_reschedule_self(qemu_get_aio_context());
3193    client_close(client, true);
3194    nbd_client_put(client);
3195}
3196
3197/*
3198 * Runs in export AioContext and main loop thread. Caller must hold
3199 * client->lock.
3200 */
3201static void nbd_client_receive_next_request(NBDClient *client)
3202{
3203    NBDRequestData *req;
3204
3205    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS &&
3206        !client->quiescing) {
3207        nbd_client_get(client);
3208        req = nbd_request_get(client);
3209        client->recv_coroutine = qemu_coroutine_create(nbd_trip, req);
3210        aio_co_schedule(client->exp->common.ctx, client->recv_coroutine);
3211    }
3212}
3213
3214static void nbd_handshake_timer_cb(void *opaque)
3215{
3216    QIOChannel *ioc = opaque;
3217
3218    trace_nbd_handshake_timer_cb();
3219    qio_channel_shutdown(ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
3220}
3221
3222static coroutine_fn void nbd_co_client_start(void *opaque)
3223{
3224    NBDClient *client = opaque;
3225    Error *local_err = NULL;
3226    QEMUTimer *handshake_timer = NULL;
3227
3228    qemu_co_mutex_init(&client->send_lock);
3229
3230    /*
3231     * Create a timer to bound the time spent in negotiation. If the
3232     * timer expires, it is likely nbd_negotiate will fail because the
3233     * socket was shutdown.
3234     */
3235    if (client->handshake_max_secs > 0) {
3236        handshake_timer = aio_timer_new(qemu_get_aio_context(),
3237                                        QEMU_CLOCK_REALTIME,
3238                                        SCALE_NS,
3239                                        nbd_handshake_timer_cb,
3240                                        client->sioc);
3241        timer_mod(handshake_timer,
3242                  qemu_clock_get_ns(QEMU_CLOCK_REALTIME) +
3243                  client->handshake_max_secs * NANOSECONDS_PER_SECOND);
3244    }
3245
3246    if (nbd_negotiate(client, &local_err)) {
3247        if (local_err) {
3248            error_report_err(local_err);
3249        }
3250        timer_free(handshake_timer);
3251        client_close(client, false);
3252        return;
3253    }
3254
3255    timer_free(handshake_timer);
3256    WITH_QEMU_LOCK_GUARD(&client->lock) {
3257        nbd_client_receive_next_request(client);
3258    }
3259}
3260
3261/*
3262 * Create a new client listener using the given channel @sioc and @owner.
3263 * Begin servicing it in a coroutine.  When the connection closes, call
3264 * @close_fn with an indication of whether the client completed negotiation
3265 * within @handshake_max_secs seconds (0 for unbounded).
3266 */
3267void nbd_client_new(QIOChannelSocket *sioc,
3268                    uint32_t handshake_max_secs,
3269                    QCryptoTLSCreds *tlscreds,
3270                    const char *tlsauthz,
3271                    void (*close_fn)(NBDClient *, bool),
3272                    void *owner)
3273{
3274    NBDClient *client;
3275    Coroutine *co;
3276
3277    client = g_new0(NBDClient, 1);
3278    qemu_mutex_init(&client->lock);
3279    client->refcount = 1;
3280    client->tlscreds = tlscreds;
3281    if (tlscreds) {
3282        object_ref(OBJECT(client->tlscreds));
3283    }
3284    client->tlsauthz = g_strdup(tlsauthz);
3285    client->handshake_max_secs = handshake_max_secs;
3286    client->sioc = sioc;
3287    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
3288    object_ref(OBJECT(client->sioc));
3289    client->ioc = QIO_CHANNEL(sioc);
3290    object_ref(OBJECT(client->ioc));
3291    client->close_fn = close_fn;
3292    client->owner = owner;
3293
3294    nbd_set_socket_send_buffer(sioc);
3295
3296    co = qemu_coroutine_create(nbd_co_client_start, client);
3297    qemu_coroutine_enter(co);
3298}
3299
3300void *
3301nbd_client_owner(NBDClient *client)
3302{
3303    return client->owner;
3304}
3305