qemu/nbd/server.c
<<
>>
Prefs
   1/*
   2 *  Copyright Red Hat
   3 *  Copyright (C) 2005  Anthony Liguori <anthony@codemonkey.ws>
   4 *
   5 *  Network Block Device Server Side
   6 *
   7 *  This program is free software; you can redistribute it and/or modify
   8 *  it under the terms of the GNU General Public License as published by
   9 *  the Free Software Foundation; under version 2 of the License.
  10 *
  11 *  This program is distributed in the hope that it will be useful,
  12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14 *  GNU General Public License for more details.
  15 *
  16 *  You should have received a copy of the GNU General Public License
  17 *  along with this program; if not, see <http://www.gnu.org/licenses/>.
  18 */
  19
  20#include "qemu/osdep.h"
  21
  22#include "block/block_int.h"
  23#include "block/export.h"
  24#include "block/dirty-bitmap.h"
  25#include "qapi/error.h"
  26#include "qemu/queue.h"
  27#include "trace.h"
  28#include "nbd-internal.h"
  29#include "qemu/units.h"
  30#include "qemu/memalign.h"
  31
  32#define NBD_META_ID_BASE_ALLOCATION 0
  33#define NBD_META_ID_ALLOCATION_DEPTH 1
  34/* Dirty bitmaps use 'NBD_META_ID_DIRTY_BITMAP + i', so keep this id last. */
  35#define NBD_META_ID_DIRTY_BITMAP 2
  36
  37/*
  38 * NBD_MAX_BLOCK_STATUS_EXTENTS: 1 MiB of extents data. An empirical
  39 * constant. If an increase is needed, note that the NBD protocol
  40 * recommends no larger than 32 mb, so that the client won't consider
  41 * the reply as a denial of service attack.
  42 */
  43#define NBD_MAX_BLOCK_STATUS_EXTENTS (1 * MiB / 8)
  44
  45static int system_errno_to_nbd_errno(int err)
  46{
  47    switch (err) {
  48    case 0:
  49        return NBD_SUCCESS;
  50    case EPERM:
  51    case EROFS:
  52        return NBD_EPERM;
  53    case EIO:
  54        return NBD_EIO;
  55    case ENOMEM:
  56        return NBD_ENOMEM;
  57#ifdef EDQUOT
  58    case EDQUOT:
  59#endif
  60    case EFBIG:
  61    case ENOSPC:
  62        return NBD_ENOSPC;
  63    case EOVERFLOW:
  64        return NBD_EOVERFLOW;
  65    case ENOTSUP:
  66#if ENOTSUP != EOPNOTSUPP
  67    case EOPNOTSUPP:
  68#endif
  69        return NBD_ENOTSUP;
  70    case ESHUTDOWN:
  71        return NBD_ESHUTDOWN;
  72    case EINVAL:
  73    default:
  74        return NBD_EINVAL;
  75    }
  76}
  77
  78/* Definitions for opaque data types */
  79
  80typedef struct NBDRequestData NBDRequestData;
  81
  82struct NBDRequestData {
  83    NBDClient *client;
  84    uint8_t *data;
  85    bool complete;
  86};
  87
  88struct NBDExport {
  89    BlockExport common;
  90
  91    char *name;
  92    char *description;
  93    uint64_t size;
  94    uint16_t nbdflags;
  95    QTAILQ_HEAD(, NBDClient) clients;
  96    QTAILQ_ENTRY(NBDExport) next;
  97
  98    BlockBackend *eject_notifier_blk;
  99    Notifier eject_notifier;
 100
 101    bool allocation_depth;
 102    BdrvDirtyBitmap **export_bitmaps;
 103    size_t nr_export_bitmaps;
 104};
 105
 106static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
 107
 108/* NBDExportMetaContexts represents a list of contexts to be exported,
 109 * as selected by NBD_OPT_SET_META_CONTEXT. Also used for
 110 * NBD_OPT_LIST_META_CONTEXT. */
 111typedef struct NBDExportMetaContexts {
 112    NBDExport *exp;
 113    size_t count; /* number of negotiated contexts */
 114    bool base_allocation; /* export base:allocation context (block status) */
 115    bool allocation_depth; /* export qemu:allocation-depth */
 116    bool *bitmaps; /*
 117                    * export qemu:dirty-bitmap:<export bitmap name>,
 118                    * sized by exp->nr_export_bitmaps
 119                    */
 120} NBDExportMetaContexts;
 121
 122struct NBDClient {
 123    int refcount;
 124    void (*close_fn)(NBDClient *client, bool negotiated);
 125
 126    NBDExport *exp;
 127    QCryptoTLSCreds *tlscreds;
 128    char *tlsauthz;
 129    QIOChannelSocket *sioc; /* The underlying data channel */
 130    QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
 131
 132    Coroutine *recv_coroutine;
 133
 134    CoMutex send_lock;
 135    Coroutine *send_coroutine;
 136
 137    bool read_yielding;
 138    bool quiescing;
 139
 140    QTAILQ_ENTRY(NBDClient) next;
 141    int nb_requests;
 142    bool closing;
 143
 144    uint32_t check_align; /* If non-zero, check for aligned client requests */
 145
 146    bool structured_reply;
 147    NBDExportMetaContexts export_meta;
 148
 149    uint32_t opt; /* Current option being negotiated */
 150    uint32_t optlen; /* remaining length of data in ioc for the option being
 151                        negotiated now */
 152};
 153
 154static void nbd_client_receive_next_request(NBDClient *client);
 155
 156/* Basic flow for negotiation
 157
 158   Server         Client
 159   Negotiate
 160
 161   or
 162
 163   Server         Client
 164   Negotiate #1
 165                  Option
 166   Negotiate #2
 167
 168   ----
 169
 170   followed by
 171
 172   Server         Client
 173                  Request
 174   Response
 175                  Request
 176   Response
 177                  ...
 178   ...
 179                  Request (type == 2)
 180
 181*/
 182
 183static inline void set_be_option_rep(NBDOptionReply *rep, uint32_t option,
 184                                     uint32_t type, uint32_t length)
 185{
 186    stq_be_p(&rep->magic, NBD_REP_MAGIC);
 187    stl_be_p(&rep->option, option);
 188    stl_be_p(&rep->type, type);
 189    stl_be_p(&rep->length, length);
 190}
 191
 192/* Send a reply header, including length, but no payload.
 193 * Return -errno on error, 0 on success. */
 194static int nbd_negotiate_send_rep_len(NBDClient *client, uint32_t type,
 195                                      uint32_t len, Error **errp)
 196{
 197    NBDOptionReply rep;
 198
 199    trace_nbd_negotiate_send_rep_len(client->opt, nbd_opt_lookup(client->opt),
 200                                     type, nbd_rep_lookup(type), len);
 201
 202    assert(len < NBD_MAX_BUFFER_SIZE);
 203
 204    set_be_option_rep(&rep, client->opt, type, len);
 205    return nbd_write(client->ioc, &rep, sizeof(rep), errp);
 206}
 207
 208/* Send a reply header with default 0 length.
 209 * Return -errno on error, 0 on success. */
 210static int nbd_negotiate_send_rep(NBDClient *client, uint32_t type,
 211                                  Error **errp)
 212{
 213    return nbd_negotiate_send_rep_len(client, type, 0, errp);
 214}
 215
 216/* Send an error reply.
 217 * Return -errno on error, 0 on success. */
 218static int G_GNUC_PRINTF(4, 0)
 219nbd_negotiate_send_rep_verr(NBDClient *client, uint32_t type,
 220                            Error **errp, const char *fmt, va_list va)
 221{
 222    ERRP_GUARD();
 223    g_autofree char *msg = NULL;
 224    int ret;
 225    size_t len;
 226
 227    msg = g_strdup_vprintf(fmt, va);
 228    len = strlen(msg);
 229    assert(len < NBD_MAX_STRING_SIZE);
 230    trace_nbd_negotiate_send_rep_err(msg);
 231    ret = nbd_negotiate_send_rep_len(client, type, len, errp);
 232    if (ret < 0) {
 233        return ret;
 234    }
 235    if (nbd_write(client->ioc, msg, len, errp) < 0) {
 236        error_prepend(errp, "write failed (error message): ");
 237        return -EIO;
 238    }
 239
 240    return 0;
 241}
 242
 243/*
 244 * Return a malloc'd copy of @name suitable for use in an error reply.
 245 */
 246static char *
 247nbd_sanitize_name(const char *name)
 248{
 249    if (strnlen(name, 80) < 80) {
 250        return g_strdup(name);
 251    }
 252    /* XXX Should we also try to sanitize any control characters? */
 253    return g_strdup_printf("%.80s...", name);
 254}
 255
 256/* Send an error reply.
 257 * Return -errno on error, 0 on success. */
 258static int G_GNUC_PRINTF(4, 5)
 259nbd_negotiate_send_rep_err(NBDClient *client, uint32_t type,
 260                           Error **errp, const char *fmt, ...)
 261{
 262    va_list va;
 263    int ret;
 264
 265    va_start(va, fmt);
 266    ret = nbd_negotiate_send_rep_verr(client, type, errp, fmt, va);
 267    va_end(va);
 268    return ret;
 269}
 270
 271/* Drop remainder of the current option, and send a reply with the
 272 * given error type and message. Return -errno on read or write
 273 * failure; or 0 if connection is still live. */
 274static int G_GNUC_PRINTF(4, 0)
 275nbd_opt_vdrop(NBDClient *client, uint32_t type, Error **errp,
 276              const char *fmt, va_list va)
 277{
 278    int ret = nbd_drop(client->ioc, client->optlen, errp);
 279
 280    client->optlen = 0;
 281    if (!ret) {
 282        ret = nbd_negotiate_send_rep_verr(client, type, errp, fmt, va);
 283    }
 284    return ret;
 285}
 286
 287static int G_GNUC_PRINTF(4, 5)
 288nbd_opt_drop(NBDClient *client, uint32_t type, Error **errp,
 289             const char *fmt, ...)
 290{
 291    int ret;
 292    va_list va;
 293
 294    va_start(va, fmt);
 295    ret = nbd_opt_vdrop(client, type, errp, fmt, va);
 296    va_end(va);
 297
 298    return ret;
 299}
 300
 301static int G_GNUC_PRINTF(3, 4)
 302nbd_opt_invalid(NBDClient *client, Error **errp, const char *fmt, ...)
 303{
 304    int ret;
 305    va_list va;
 306
 307    va_start(va, fmt);
 308    ret = nbd_opt_vdrop(client, NBD_REP_ERR_INVALID, errp, fmt, va);
 309    va_end(va);
 310
 311    return ret;
 312}
 313
 314/* Read size bytes from the unparsed payload of the current option.
 315 * If @check_nul, require that no NUL bytes appear in buffer.
 316 * Return -errno on I/O error, 0 if option was completely handled by
 317 * sending a reply about inconsistent lengths, or 1 on success. */
 318static int nbd_opt_read(NBDClient *client, void *buffer, size_t size,
 319                        bool check_nul, Error **errp)
 320{
 321    if (size > client->optlen) {
 322        return nbd_opt_invalid(client, errp,
 323                               "Inconsistent lengths in option %s",
 324                               nbd_opt_lookup(client->opt));
 325    }
 326    client->optlen -= size;
 327    if (qio_channel_read_all(client->ioc, buffer, size, errp) < 0) {
 328        return -EIO;
 329    }
 330
 331    if (check_nul && strnlen(buffer, size) != size) {
 332        return nbd_opt_invalid(client, errp,
 333                               "Unexpected embedded NUL in option %s",
 334                               nbd_opt_lookup(client->opt));
 335    }
 336    return 1;
 337}
 338
 339/* Drop size bytes from the unparsed payload of the current option.
 340 * Return -errno on I/O error, 0 if option was completely handled by
 341 * sending a reply about inconsistent lengths, or 1 on success. */
 342static int nbd_opt_skip(NBDClient *client, size_t size, Error **errp)
 343{
 344    if (size > client->optlen) {
 345        return nbd_opt_invalid(client, errp,
 346                               "Inconsistent lengths in option %s",
 347                               nbd_opt_lookup(client->opt));
 348    }
 349    client->optlen -= size;
 350    return nbd_drop(client->ioc, size, errp) < 0 ? -EIO : 1;
 351}
 352
 353/* nbd_opt_read_name
 354 *
 355 * Read a string with the format:
 356 *   uint32_t len     (<= NBD_MAX_STRING_SIZE)
 357 *   len bytes string (not 0-terminated)
 358 *
 359 * On success, @name will be allocated.
 360 * If @length is non-null, it will be set to the actual string length.
 361 *
 362 * Return -errno on I/O error, 0 if option was completely handled by
 363 * sending a reply about inconsistent lengths, or 1 on success.
 364 */
 365static int nbd_opt_read_name(NBDClient *client, char **name, uint32_t *length,
 366                             Error **errp)
 367{
 368    int ret;
 369    uint32_t len;
 370    g_autofree char *local_name = NULL;
 371
 372    *name = NULL;
 373    ret = nbd_opt_read(client, &len, sizeof(len), false, errp);
 374    if (ret <= 0) {
 375        return ret;
 376    }
 377    len = cpu_to_be32(len);
 378
 379    if (len > NBD_MAX_STRING_SIZE) {
 380        return nbd_opt_invalid(client, errp,
 381                               "Invalid name length: %" PRIu32, len);
 382    }
 383
 384    local_name = g_malloc(len + 1);
 385    ret = nbd_opt_read(client, local_name, len, true, errp);
 386    if (ret <= 0) {
 387        return ret;
 388    }
 389    local_name[len] = '\0';
 390
 391    if (length) {
 392        *length = len;
 393    }
 394    *name = g_steal_pointer(&local_name);
 395
 396    return 1;
 397}
 398
 399/* Send a single NBD_REP_SERVER reply to NBD_OPT_LIST, including payload.
 400 * Return -errno on error, 0 on success. */
 401static int nbd_negotiate_send_rep_list(NBDClient *client, NBDExport *exp,
 402                                       Error **errp)
 403{
 404    ERRP_GUARD();
 405    size_t name_len, desc_len;
 406    uint32_t len;
 407    const char *name = exp->name ? exp->name : "";
 408    const char *desc = exp->description ? exp->description : "";
 409    QIOChannel *ioc = client->ioc;
 410    int ret;
 411
 412    trace_nbd_negotiate_send_rep_list(name, desc);
 413    name_len = strlen(name);
 414    desc_len = strlen(desc);
 415    assert(name_len <= NBD_MAX_STRING_SIZE && desc_len <= NBD_MAX_STRING_SIZE);
 416    len = name_len + desc_len + sizeof(len);
 417    ret = nbd_negotiate_send_rep_len(client, NBD_REP_SERVER, len, errp);
 418    if (ret < 0) {
 419        return ret;
 420    }
 421
 422    len = cpu_to_be32(name_len);
 423    if (nbd_write(ioc, &len, sizeof(len), errp) < 0) {
 424        error_prepend(errp, "write failed (name length): ");
 425        return -EINVAL;
 426    }
 427
 428    if (nbd_write(ioc, name, name_len, errp) < 0) {
 429        error_prepend(errp, "write failed (name buffer): ");
 430        return -EINVAL;
 431    }
 432
 433    if (nbd_write(ioc, desc, desc_len, errp) < 0) {
 434        error_prepend(errp, "write failed (description buffer): ");
 435        return -EINVAL;
 436    }
 437
 438    return 0;
 439}
 440
 441/* Process the NBD_OPT_LIST command, with a potential series of replies.
 442 * Return -errno on error, 0 on success. */
 443static int nbd_negotiate_handle_list(NBDClient *client, Error **errp)
 444{
 445    NBDExport *exp;
 446    assert(client->opt == NBD_OPT_LIST);
 447
 448    /* For each export, send a NBD_REP_SERVER reply. */
 449    QTAILQ_FOREACH(exp, &exports, next) {
 450        if (nbd_negotiate_send_rep_list(client, exp, errp)) {
 451            return -EINVAL;
 452        }
 453    }
 454    /* Finish with a NBD_REP_ACK. */
 455    return nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
 456}
 457
 458static void nbd_check_meta_export(NBDClient *client)
 459{
 460    if (client->exp != client->export_meta.exp) {
 461        client->export_meta.count = 0;
 462    }
 463}
 464
 465/* Send a reply to NBD_OPT_EXPORT_NAME.
 466 * Return -errno on error, 0 on success. */
 467static int nbd_negotiate_handle_export_name(NBDClient *client, bool no_zeroes,
 468                                            Error **errp)
 469{
 470    ERRP_GUARD();
 471    g_autofree char *name = NULL;
 472    char buf[NBD_REPLY_EXPORT_NAME_SIZE] = "";
 473    size_t len;
 474    int ret;
 475    uint16_t myflags;
 476
 477    /* Client sends:
 478        [20 ..  xx]   export name (length bytes)
 479       Server replies:
 480        [ 0 ..   7]   size
 481        [ 8 ..   9]   export flags
 482        [10 .. 133]   reserved     (0) [unless no_zeroes]
 483     */
 484    trace_nbd_negotiate_handle_export_name();
 485    if (client->optlen > NBD_MAX_STRING_SIZE) {
 486        error_setg(errp, "Bad length received");
 487        return -EINVAL;
 488    }
 489    name = g_malloc(client->optlen + 1);
 490    if (nbd_read(client->ioc, name, client->optlen, "export name", errp) < 0) {
 491        return -EIO;
 492    }
 493    name[client->optlen] = '\0';
 494    client->optlen = 0;
 495
 496    trace_nbd_negotiate_handle_export_name_request(name);
 497
 498    client->exp = nbd_export_find(name);
 499    if (!client->exp) {
 500        error_setg(errp, "export not found");
 501        return -EINVAL;
 502    }
 503
 504    myflags = client->exp->nbdflags;
 505    if (client->structured_reply) {
 506        myflags |= NBD_FLAG_SEND_DF;
 507    }
 508    trace_nbd_negotiate_new_style_size_flags(client->exp->size, myflags);
 509    stq_be_p(buf, client->exp->size);
 510    stw_be_p(buf + 8, myflags);
 511    len = no_zeroes ? 10 : sizeof(buf);
 512    ret = nbd_write(client->ioc, buf, len, errp);
 513    if (ret < 0) {
 514        error_prepend(errp, "write failed: ");
 515        return ret;
 516    }
 517
 518    QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
 519    blk_exp_ref(&client->exp->common);
 520    nbd_check_meta_export(client);
 521
 522    return 0;
 523}
 524
 525/* Send a single NBD_REP_INFO, with a buffer @buf of @length bytes.
 526 * The buffer does NOT include the info type prefix.
 527 * Return -errno on error, 0 if ready to send more. */
 528static int nbd_negotiate_send_info(NBDClient *client,
 529                                   uint16_t info, uint32_t length, void *buf,
 530                                   Error **errp)
 531{
 532    int rc;
 533
 534    trace_nbd_negotiate_send_info(info, nbd_info_lookup(info), length);
 535    rc = nbd_negotiate_send_rep_len(client, NBD_REP_INFO,
 536                                    sizeof(info) + length, errp);
 537    if (rc < 0) {
 538        return rc;
 539    }
 540    info = cpu_to_be16(info);
 541    if (nbd_write(client->ioc, &info, sizeof(info), errp) < 0) {
 542        return -EIO;
 543    }
 544    if (nbd_write(client->ioc, buf, length, errp) < 0) {
 545        return -EIO;
 546    }
 547    return 0;
 548}
 549
 550/* nbd_reject_length: Handle any unexpected payload.
 551 * @fatal requests that we quit talking to the client, even if we are able
 552 * to successfully send an error reply.
 553 * Return:
 554 * -errno  transmission error occurred or @fatal was requested, errp is set
 555 * 0       error message successfully sent to client, errp is not set
 556 */
 557static int nbd_reject_length(NBDClient *client, bool fatal, Error **errp)
 558{
 559    int ret;
 560
 561    assert(client->optlen);
 562    ret = nbd_opt_invalid(client, errp, "option '%s' has unexpected length",
 563                          nbd_opt_lookup(client->opt));
 564    if (fatal && !ret) {
 565        error_setg(errp, "option '%s' has unexpected length",
 566                   nbd_opt_lookup(client->opt));
 567        return -EINVAL;
 568    }
 569    return ret;
 570}
 571
 572/* Handle NBD_OPT_INFO and NBD_OPT_GO.
 573 * Return -errno on error, 0 if ready for next option, and 1 to move
 574 * into transmission phase.  */
 575static int nbd_negotiate_handle_info(NBDClient *client, Error **errp)
 576{
 577    int rc;
 578    g_autofree char *name = NULL;
 579    NBDExport *exp;
 580    uint16_t requests;
 581    uint16_t request;
 582    uint32_t namelen = 0;
 583    bool sendname = false;
 584    bool blocksize = false;
 585    uint32_t sizes[3];
 586    char buf[sizeof(uint64_t) + sizeof(uint16_t)];
 587    uint32_t check_align = 0;
 588    uint16_t myflags;
 589
 590    /* Client sends:
 591        4 bytes: L, name length (can be 0)
 592        L bytes: export name
 593        2 bytes: N, number of requests (can be 0)
 594        N * 2 bytes: N requests
 595    */
 596    rc = nbd_opt_read_name(client, &name, &namelen, errp);
 597    if (rc <= 0) {
 598        return rc;
 599    }
 600    trace_nbd_negotiate_handle_export_name_request(name);
 601
 602    rc = nbd_opt_read(client, &requests, sizeof(requests), false, errp);
 603    if (rc <= 0) {
 604        return rc;
 605    }
 606    requests = be16_to_cpu(requests);
 607    trace_nbd_negotiate_handle_info_requests(requests);
 608    while (requests--) {
 609        rc = nbd_opt_read(client, &request, sizeof(request), false, errp);
 610        if (rc <= 0) {
 611            return rc;
 612        }
 613        request = be16_to_cpu(request);
 614        trace_nbd_negotiate_handle_info_request(request,
 615                                                nbd_info_lookup(request));
 616        /* We care about NBD_INFO_NAME and NBD_INFO_BLOCK_SIZE;
 617         * everything else is either a request we don't know or
 618         * something we send regardless of request */
 619        switch (request) {
 620        case NBD_INFO_NAME:
 621            sendname = true;
 622            break;
 623        case NBD_INFO_BLOCK_SIZE:
 624            blocksize = true;
 625            break;
 626        }
 627    }
 628    if (client->optlen) {
 629        return nbd_reject_length(client, false, errp);
 630    }
 631
 632    exp = nbd_export_find(name);
 633    if (!exp) {
 634        g_autofree char *sane_name = nbd_sanitize_name(name);
 635
 636        return nbd_negotiate_send_rep_err(client, NBD_REP_ERR_UNKNOWN,
 637                                          errp, "export '%s' not present",
 638                                          sane_name);
 639    }
 640
 641    /* Don't bother sending NBD_INFO_NAME unless client requested it */
 642    if (sendname) {
 643        rc = nbd_negotiate_send_info(client, NBD_INFO_NAME, namelen, name,
 644                                     errp);
 645        if (rc < 0) {
 646            return rc;
 647        }
 648    }
 649
 650    /* Send NBD_INFO_DESCRIPTION only if available, regardless of
 651     * client request */
 652    if (exp->description) {
 653        size_t len = strlen(exp->description);
 654
 655        assert(len <= NBD_MAX_STRING_SIZE);
 656        rc = nbd_negotiate_send_info(client, NBD_INFO_DESCRIPTION,
 657                                     len, exp->description, errp);
 658        if (rc < 0) {
 659            return rc;
 660        }
 661    }
 662
 663    /* Send NBD_INFO_BLOCK_SIZE always, but tweak the minimum size
 664     * according to whether the client requested it, and according to
 665     * whether this is OPT_INFO or OPT_GO. */
 666    /* minimum - 1 for back-compat, or actual if client will obey it. */
 667    if (client->opt == NBD_OPT_INFO || blocksize) {
 668        check_align = sizes[0] = blk_get_request_alignment(exp->common.blk);
 669    } else {
 670        sizes[0] = 1;
 671    }
 672    assert(sizes[0] <= NBD_MAX_BUFFER_SIZE);
 673    /* preferred - Hard-code to 4096 for now.
 674     * TODO: is blk_bs(blk)->bl.opt_transfer appropriate? */
 675    sizes[1] = MAX(4096, sizes[0]);
 676    /* maximum - At most 32M, but smaller as appropriate. */
 677    sizes[2] = MIN(blk_get_max_transfer(exp->common.blk), NBD_MAX_BUFFER_SIZE);
 678    trace_nbd_negotiate_handle_info_block_size(sizes[0], sizes[1], sizes[2]);
 679    sizes[0] = cpu_to_be32(sizes[0]);
 680    sizes[1] = cpu_to_be32(sizes[1]);
 681    sizes[2] = cpu_to_be32(sizes[2]);
 682    rc = nbd_negotiate_send_info(client, NBD_INFO_BLOCK_SIZE,
 683                                 sizeof(sizes), sizes, errp);
 684    if (rc < 0) {
 685        return rc;
 686    }
 687
 688    /* Send NBD_INFO_EXPORT always */
 689    myflags = exp->nbdflags;
 690    if (client->structured_reply) {
 691        myflags |= NBD_FLAG_SEND_DF;
 692    }
 693    trace_nbd_negotiate_new_style_size_flags(exp->size, myflags);
 694    stq_be_p(buf, exp->size);
 695    stw_be_p(buf + 8, myflags);
 696    rc = nbd_negotiate_send_info(client, NBD_INFO_EXPORT,
 697                                 sizeof(buf), buf, errp);
 698    if (rc < 0) {
 699        return rc;
 700    }
 701
 702    /*
 703     * If the client is just asking for NBD_OPT_INFO, but forgot to
 704     * request block sizes in a situation that would impact
 705     * performance, then return an error. But for NBD_OPT_GO, we
 706     * tolerate all clients, regardless of alignments.
 707     */
 708    if (client->opt == NBD_OPT_INFO && !blocksize &&
 709        blk_get_request_alignment(exp->common.blk) > 1) {
 710        return nbd_negotiate_send_rep_err(client,
 711                                          NBD_REP_ERR_BLOCK_SIZE_REQD,
 712                                          errp,
 713                                          "request NBD_INFO_BLOCK_SIZE to "
 714                                          "use this export");
 715    }
 716
 717    /* Final reply */
 718    rc = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
 719    if (rc < 0) {
 720        return rc;
 721    }
 722
 723    if (client->opt == NBD_OPT_GO) {
 724        client->exp = exp;
 725        client->check_align = check_align;
 726        QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
 727        blk_exp_ref(&client->exp->common);
 728        nbd_check_meta_export(client);
 729        rc = 1;
 730    }
 731    return rc;
 732}
 733
 734
 735/* Handle NBD_OPT_STARTTLS. Return NULL to drop connection, or else the
 736 * new channel for all further (now-encrypted) communication. */
 737static QIOChannel *nbd_negotiate_handle_starttls(NBDClient *client,
 738                                                 Error **errp)
 739{
 740    QIOChannel *ioc;
 741    QIOChannelTLS *tioc;
 742    struct NBDTLSHandshakeData data = { 0 };
 743
 744    assert(client->opt == NBD_OPT_STARTTLS);
 745
 746    trace_nbd_negotiate_handle_starttls();
 747    ioc = client->ioc;
 748
 749    if (nbd_negotiate_send_rep(client, NBD_REP_ACK, errp) < 0) {
 750        return NULL;
 751    }
 752
 753    tioc = qio_channel_tls_new_server(ioc,
 754                                      client->tlscreds,
 755                                      client->tlsauthz,
 756                                      errp);
 757    if (!tioc) {
 758        return NULL;
 759    }
 760
 761    qio_channel_set_name(QIO_CHANNEL(tioc), "nbd-server-tls");
 762    trace_nbd_negotiate_handle_starttls_handshake();
 763    data.loop = g_main_loop_new(g_main_context_default(), FALSE);
 764    qio_channel_tls_handshake(tioc,
 765                              nbd_tls_handshake,
 766                              &data,
 767                              NULL,
 768                              NULL);
 769
 770    if (!data.complete) {
 771        g_main_loop_run(data.loop);
 772    }
 773    g_main_loop_unref(data.loop);
 774    if (data.error) {
 775        object_unref(OBJECT(tioc));
 776        error_propagate(errp, data.error);
 777        return NULL;
 778    }
 779
 780    return QIO_CHANNEL(tioc);
 781}
 782
 783/* nbd_negotiate_send_meta_context
 784 *
 785 * Send one chunk of reply to NBD_OPT_{LIST,SET}_META_CONTEXT
 786 *
 787 * For NBD_OPT_LIST_META_CONTEXT @context_id is ignored, 0 is used instead.
 788 */
 789static int nbd_negotiate_send_meta_context(NBDClient *client,
 790                                           const char *context,
 791                                           uint32_t context_id,
 792                                           Error **errp)
 793{
 794    NBDOptionReplyMetaContext opt;
 795    struct iovec iov[] = {
 796        {.iov_base = &opt, .iov_len = sizeof(opt)},
 797        {.iov_base = (void *)context, .iov_len = strlen(context)}
 798    };
 799
 800    assert(iov[1].iov_len <= NBD_MAX_STRING_SIZE);
 801    if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
 802        context_id = 0;
 803    }
 804
 805    trace_nbd_negotiate_meta_query_reply(context, context_id);
 806    set_be_option_rep(&opt.h, client->opt, NBD_REP_META_CONTEXT,
 807                      sizeof(opt) - sizeof(opt.h) + iov[1].iov_len);
 808    stl_be_p(&opt.context_id, context_id);
 809
 810    return qio_channel_writev_all(client->ioc, iov, 2, errp) < 0 ? -EIO : 0;
 811}
 812
 813/*
 814 * Return true if @query matches @pattern, or if @query is empty when
 815 * the @client is performing _LIST_.
 816 */
 817static bool nbd_meta_empty_or_pattern(NBDClient *client, const char *pattern,
 818                                      const char *query)
 819{
 820    if (!*query) {
 821        trace_nbd_negotiate_meta_query_parse("empty");
 822        return client->opt == NBD_OPT_LIST_META_CONTEXT;
 823    }
 824    if (strcmp(query, pattern) == 0) {
 825        trace_nbd_negotiate_meta_query_parse(pattern);
 826        return true;
 827    }
 828    trace_nbd_negotiate_meta_query_skip("pattern not matched");
 829    return false;
 830}
 831
 832/*
 833 * Return true and adjust @str in place if it begins with @prefix.
 834 */
 835static bool nbd_strshift(const char **str, const char *prefix)
 836{
 837    size_t len = strlen(prefix);
 838
 839    if (strncmp(*str, prefix, len) == 0) {
 840        *str += len;
 841        return true;
 842    }
 843    return false;
 844}
 845
 846/* nbd_meta_base_query
 847 *
 848 * Handle queries to 'base' namespace. For now, only the base:allocation
 849 * context is available.  Return true if @query has been handled.
 850 */
 851static bool nbd_meta_base_query(NBDClient *client, NBDExportMetaContexts *meta,
 852                                const char *query)
 853{
 854    if (!nbd_strshift(&query, "base:")) {
 855        return false;
 856    }
 857    trace_nbd_negotiate_meta_query_parse("base:");
 858
 859    if (nbd_meta_empty_or_pattern(client, "allocation", query)) {
 860        meta->base_allocation = true;
 861    }
 862    return true;
 863}
 864
 865/* nbd_meta_qemu_query
 866 *
 867 * Handle queries to 'qemu' namespace. For now, only the qemu:dirty-bitmap:
 868 * and qemu:allocation-depth contexts are available.  Return true if @query
 869 * has been handled.
 870 */
 871static bool nbd_meta_qemu_query(NBDClient *client, NBDExportMetaContexts *meta,
 872                                const char *query)
 873{
 874    size_t i;
 875
 876    if (!nbd_strshift(&query, "qemu:")) {
 877        return false;
 878    }
 879    trace_nbd_negotiate_meta_query_parse("qemu:");
 880
 881    if (!*query) {
 882        if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
 883            meta->allocation_depth = meta->exp->allocation_depth;
 884            if (meta->exp->nr_export_bitmaps) {
 885                memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
 886            }
 887        }
 888        trace_nbd_negotiate_meta_query_parse("empty");
 889        return true;
 890    }
 891
 892    if (strcmp(query, "allocation-depth") == 0) {
 893        trace_nbd_negotiate_meta_query_parse("allocation-depth");
 894        meta->allocation_depth = meta->exp->allocation_depth;
 895        return true;
 896    }
 897
 898    if (nbd_strshift(&query, "dirty-bitmap:")) {
 899        trace_nbd_negotiate_meta_query_parse("dirty-bitmap:");
 900        if (!*query) {
 901            if (client->opt == NBD_OPT_LIST_META_CONTEXT &&
 902                meta->exp->nr_export_bitmaps) {
 903                memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
 904            }
 905            trace_nbd_negotiate_meta_query_parse("empty");
 906            return true;
 907        }
 908
 909        for (i = 0; i < meta->exp->nr_export_bitmaps; i++) {
 910            const char *bm_name;
 911
 912            bm_name = bdrv_dirty_bitmap_name(meta->exp->export_bitmaps[i]);
 913            if (strcmp(bm_name, query) == 0) {
 914                meta->bitmaps[i] = true;
 915                trace_nbd_negotiate_meta_query_parse(query);
 916                return true;
 917            }
 918        }
 919        trace_nbd_negotiate_meta_query_skip("no dirty-bitmap match");
 920        return true;
 921    }
 922
 923    trace_nbd_negotiate_meta_query_skip("unknown qemu context");
 924    return true;
 925}
 926
 927/* nbd_negotiate_meta_query
 928 *
 929 * Parse namespace name and call corresponding function to parse body of the
 930 * query.
 931 *
 932 * The only supported namespaces are 'base' and 'qemu'.
 933 *
 934 * Return -errno on I/O error, 0 if option was completely handled by
 935 * sending a reply about inconsistent lengths, or 1 on success. */
 936static int nbd_negotiate_meta_query(NBDClient *client,
 937                                    NBDExportMetaContexts *meta, Error **errp)
 938{
 939    int ret;
 940    g_autofree char *query = NULL;
 941    uint32_t len;
 942
 943    ret = nbd_opt_read(client, &len, sizeof(len), false, errp);
 944    if (ret <= 0) {
 945        return ret;
 946    }
 947    len = cpu_to_be32(len);
 948
 949    if (len > NBD_MAX_STRING_SIZE) {
 950        trace_nbd_negotiate_meta_query_skip("length too long");
 951        return nbd_opt_skip(client, len, errp);
 952    }
 953
 954    query = g_malloc(len + 1);
 955    ret = nbd_opt_read(client, query, len, true, errp);
 956    if (ret <= 0) {
 957        return ret;
 958    }
 959    query[len] = '\0';
 960
 961    if (nbd_meta_base_query(client, meta, query)) {
 962        return 1;
 963    }
 964    if (nbd_meta_qemu_query(client, meta, query)) {
 965        return 1;
 966    }
 967
 968    trace_nbd_negotiate_meta_query_skip("unknown namespace");
 969    return 1;
 970}
 971
 972/* nbd_negotiate_meta_queries
 973 * Handle NBD_OPT_LIST_META_CONTEXT and NBD_OPT_SET_META_CONTEXT
 974 *
 975 * Return -errno on I/O error, or 0 if option was completely handled. */
 976static int nbd_negotiate_meta_queries(NBDClient *client,
 977                                      NBDExportMetaContexts *meta, Error **errp)
 978{
 979    int ret;
 980    g_autofree char *export_name = NULL;
 981    /* Mark unused to work around https://bugs.llvm.org/show_bug.cgi?id=3888 */
 982    g_autofree G_GNUC_UNUSED bool *bitmaps = NULL;
 983    NBDExportMetaContexts local_meta = {0};
 984    uint32_t nb_queries;
 985    size_t i;
 986    size_t count = 0;
 987
 988    if (client->opt == NBD_OPT_SET_META_CONTEXT && !client->structured_reply) {
 989        return nbd_opt_invalid(client, errp,
 990                               "request option '%s' when structured reply "
 991                               "is not negotiated",
 992                               nbd_opt_lookup(client->opt));
 993    }
 994
 995    if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
 996        /* Only change the caller's meta on SET. */
 997        meta = &local_meta;
 998    }
 999
1000    g_free(meta->bitmaps);
1001    memset(meta, 0, sizeof(*meta));
1002
1003    ret = nbd_opt_read_name(client, &export_name, NULL, errp);
1004    if (ret <= 0) {
1005        return ret;
1006    }
1007
1008    meta->exp = nbd_export_find(export_name);
1009    if (meta->exp == NULL) {
1010        g_autofree char *sane_name = nbd_sanitize_name(export_name);
1011
1012        return nbd_opt_drop(client, NBD_REP_ERR_UNKNOWN, errp,
1013                            "export '%s' not present", sane_name);
1014    }
1015    meta->bitmaps = g_new0(bool, meta->exp->nr_export_bitmaps);
1016    if (client->opt == NBD_OPT_LIST_META_CONTEXT) {
1017        bitmaps = meta->bitmaps;
1018    }
1019
1020    ret = nbd_opt_read(client, &nb_queries, sizeof(nb_queries), false, errp);
1021    if (ret <= 0) {
1022        return ret;
1023    }
1024    nb_queries = cpu_to_be32(nb_queries);
1025    trace_nbd_negotiate_meta_context(nbd_opt_lookup(client->opt),
1026                                     export_name, nb_queries);
1027
1028    if (client->opt == NBD_OPT_LIST_META_CONTEXT && !nb_queries) {
1029        /* enable all known contexts */
1030        meta->base_allocation = true;
1031        meta->allocation_depth = meta->exp->allocation_depth;
1032        if (meta->exp->nr_export_bitmaps) {
1033            memset(meta->bitmaps, 1, meta->exp->nr_export_bitmaps);
1034        }
1035    } else {
1036        for (i = 0; i < nb_queries; ++i) {
1037            ret = nbd_negotiate_meta_query(client, meta, errp);
1038            if (ret <= 0) {
1039                return ret;
1040            }
1041        }
1042    }
1043
1044    if (meta->base_allocation) {
1045        ret = nbd_negotiate_send_meta_context(client, "base:allocation",
1046                                              NBD_META_ID_BASE_ALLOCATION,
1047                                              errp);
1048        if (ret < 0) {
1049            return ret;
1050        }
1051        count++;
1052    }
1053
1054    if (meta->allocation_depth) {
1055        ret = nbd_negotiate_send_meta_context(client, "qemu:allocation-depth",
1056                                              NBD_META_ID_ALLOCATION_DEPTH,
1057                                              errp);
1058        if (ret < 0) {
1059            return ret;
1060        }
1061        count++;
1062    }
1063
1064    for (i = 0; i < meta->exp->nr_export_bitmaps; i++) {
1065        const char *bm_name;
1066        g_autofree char *context = NULL;
1067
1068        if (!meta->bitmaps[i]) {
1069            continue;
1070        }
1071
1072        bm_name = bdrv_dirty_bitmap_name(meta->exp->export_bitmaps[i]);
1073        context = g_strdup_printf("qemu:dirty-bitmap:%s", bm_name);
1074
1075        ret = nbd_negotiate_send_meta_context(client, context,
1076                                              NBD_META_ID_DIRTY_BITMAP + i,
1077                                              errp);
1078        if (ret < 0) {
1079            return ret;
1080        }
1081        count++;
1082    }
1083
1084    ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
1085    if (ret == 0) {
1086        meta->count = count;
1087    }
1088
1089    return ret;
1090}
1091
1092/* nbd_negotiate_options
1093 * Process all NBD_OPT_* client option commands, during fixed newstyle
1094 * negotiation.
1095 * Return:
1096 * -errno  on error, errp is set
1097 * 0       on successful negotiation, errp is not set
1098 * 1       if client sent NBD_OPT_ABORT, i.e. on valid disconnect,
1099 *         errp is not set
1100 */
1101static int nbd_negotiate_options(NBDClient *client, Error **errp)
1102{
1103    uint32_t flags;
1104    bool fixedNewstyle = false;
1105    bool no_zeroes = false;
1106
1107    /* Client sends:
1108        [ 0 ..   3]   client flags
1109
1110       Then we loop until NBD_OPT_EXPORT_NAME or NBD_OPT_GO:
1111        [ 0 ..   7]   NBD_OPTS_MAGIC
1112        [ 8 ..  11]   NBD option
1113        [12 ..  15]   Data length
1114        ...           Rest of request
1115
1116        [ 0 ..   7]   NBD_OPTS_MAGIC
1117        [ 8 ..  11]   Second NBD option
1118        [12 ..  15]   Data length
1119        ...           Rest of request
1120    */
1121
1122    if (nbd_read32(client->ioc, &flags, "flags", errp) < 0) {
1123        return -EIO;
1124    }
1125    trace_nbd_negotiate_options_flags(flags);
1126    if (flags & NBD_FLAG_C_FIXED_NEWSTYLE) {
1127        fixedNewstyle = true;
1128        flags &= ~NBD_FLAG_C_FIXED_NEWSTYLE;
1129    }
1130    if (flags & NBD_FLAG_C_NO_ZEROES) {
1131        no_zeroes = true;
1132        flags &= ~NBD_FLAG_C_NO_ZEROES;
1133    }
1134    if (flags != 0) {
1135        error_setg(errp, "Unknown client flags 0x%" PRIx32 " received", flags);
1136        return -EINVAL;
1137    }
1138
1139    while (1) {
1140        int ret;
1141        uint32_t option, length;
1142        uint64_t magic;
1143
1144        if (nbd_read64(client->ioc, &magic, "opts magic", errp) < 0) {
1145            return -EINVAL;
1146        }
1147        trace_nbd_negotiate_options_check_magic(magic);
1148        if (magic != NBD_OPTS_MAGIC) {
1149            error_setg(errp, "Bad magic received");
1150            return -EINVAL;
1151        }
1152
1153        if (nbd_read32(client->ioc, &option, "option", errp) < 0) {
1154            return -EINVAL;
1155        }
1156        client->opt = option;
1157
1158        if (nbd_read32(client->ioc, &length, "option length", errp) < 0) {
1159            return -EINVAL;
1160        }
1161        assert(!client->optlen);
1162        client->optlen = length;
1163
1164        if (length > NBD_MAX_BUFFER_SIZE) {
1165            error_setg(errp, "len (%" PRIu32" ) is larger than max len (%u)",
1166                       length, NBD_MAX_BUFFER_SIZE);
1167            return -EINVAL;
1168        }
1169
1170        trace_nbd_negotiate_options_check_option(option,
1171                                                 nbd_opt_lookup(option));
1172        if (client->tlscreds &&
1173            client->ioc == (QIOChannel *)client->sioc) {
1174            QIOChannel *tioc;
1175            if (!fixedNewstyle) {
1176                error_setg(errp, "Unsupported option 0x%" PRIx32, option);
1177                return -EINVAL;
1178            }
1179            switch (option) {
1180            case NBD_OPT_STARTTLS:
1181                if (length) {
1182                    /* Unconditionally drop the connection if the client
1183                     * can't start a TLS negotiation correctly */
1184                    return nbd_reject_length(client, true, errp);
1185                }
1186                tioc = nbd_negotiate_handle_starttls(client, errp);
1187                if (!tioc) {
1188                    return -EIO;
1189                }
1190                ret = 0;
1191                object_unref(OBJECT(client->ioc));
1192                client->ioc = tioc;
1193                break;
1194
1195            case NBD_OPT_EXPORT_NAME:
1196                /* No way to return an error to client, so drop connection */
1197                error_setg(errp, "Option 0x%x not permitted before TLS",
1198                           option);
1199                return -EINVAL;
1200
1201            default:
1202                /* Let the client keep trying, unless they asked to
1203                 * quit. Always try to give an error back to the
1204                 * client; but when replying to OPT_ABORT, be aware
1205                 * that the client may hang up before receiving the
1206                 * error, in which case we are fine ignoring the
1207                 * resulting EPIPE. */
1208                ret = nbd_opt_drop(client, NBD_REP_ERR_TLS_REQD,
1209                                   option == NBD_OPT_ABORT ? NULL : errp,
1210                                   "Option 0x%" PRIx32
1211                                   " not permitted before TLS", option);
1212                if (option == NBD_OPT_ABORT) {
1213                    return 1;
1214                }
1215                break;
1216            }
1217        } else if (fixedNewstyle) {
1218            switch (option) {
1219            case NBD_OPT_LIST:
1220                if (length) {
1221                    ret = nbd_reject_length(client, false, errp);
1222                } else {
1223                    ret = nbd_negotiate_handle_list(client, errp);
1224                }
1225                break;
1226
1227            case NBD_OPT_ABORT:
1228                /* NBD spec says we must try to reply before
1229                 * disconnecting, but that we must also tolerate
1230                 * guests that don't wait for our reply. */
1231                nbd_negotiate_send_rep(client, NBD_REP_ACK, NULL);
1232                return 1;
1233
1234            case NBD_OPT_EXPORT_NAME:
1235                return nbd_negotiate_handle_export_name(client, no_zeroes,
1236                                                        errp);
1237
1238            case NBD_OPT_INFO:
1239            case NBD_OPT_GO:
1240                ret = nbd_negotiate_handle_info(client, errp);
1241                if (ret == 1) {
1242                    assert(option == NBD_OPT_GO);
1243                    return 0;
1244                }
1245                break;
1246
1247            case NBD_OPT_STARTTLS:
1248                if (length) {
1249                    ret = nbd_reject_length(client, false, errp);
1250                } else if (client->tlscreds) {
1251                    ret = nbd_negotiate_send_rep_err(client,
1252                                                     NBD_REP_ERR_INVALID, errp,
1253                                                     "TLS already enabled");
1254                } else {
1255                    ret = nbd_negotiate_send_rep_err(client,
1256                                                     NBD_REP_ERR_POLICY, errp,
1257                                                     "TLS not configured");
1258                }
1259                break;
1260
1261            case NBD_OPT_STRUCTURED_REPLY:
1262                if (length) {
1263                    ret = nbd_reject_length(client, false, errp);
1264                } else if (client->structured_reply) {
1265                    ret = nbd_negotiate_send_rep_err(
1266                        client, NBD_REP_ERR_INVALID, errp,
1267                        "structured reply already negotiated");
1268                } else {
1269                    ret = nbd_negotiate_send_rep(client, NBD_REP_ACK, errp);
1270                    client->structured_reply = true;
1271                }
1272                break;
1273
1274            case NBD_OPT_LIST_META_CONTEXT:
1275            case NBD_OPT_SET_META_CONTEXT:
1276                ret = nbd_negotiate_meta_queries(client, &client->export_meta,
1277                                                 errp);
1278                break;
1279
1280            default:
1281                ret = nbd_opt_drop(client, NBD_REP_ERR_UNSUP, errp,
1282                                   "Unsupported option %" PRIu32 " (%s)",
1283                                   option, nbd_opt_lookup(option));
1284                break;
1285            }
1286        } else {
1287            /*
1288             * If broken new-style we should drop the connection
1289             * for anything except NBD_OPT_EXPORT_NAME
1290             */
1291            switch (option) {
1292            case NBD_OPT_EXPORT_NAME:
1293                return nbd_negotiate_handle_export_name(client, no_zeroes,
1294                                                        errp);
1295
1296            default:
1297                error_setg(errp, "Unsupported option %" PRIu32 " (%s)",
1298                           option, nbd_opt_lookup(option));
1299                return -EINVAL;
1300            }
1301        }
1302        if (ret < 0) {
1303            return ret;
1304        }
1305    }
1306}
1307
1308/* nbd_negotiate
1309 * Return:
1310 * -errno  on error, errp is set
1311 * 0       on successful negotiation, errp is not set
1312 * 1       if client sent NBD_OPT_ABORT, i.e. on valid disconnect,
1313 *         errp is not set
1314 */
1315static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
1316{
1317    ERRP_GUARD();
1318    char buf[NBD_OLDSTYLE_NEGOTIATE_SIZE] = "";
1319    int ret;
1320
1321    /* Old style negotiation header, no room for options
1322        [ 0 ..   7]   passwd       ("NBDMAGIC")
1323        [ 8 ..  15]   magic        (NBD_CLIENT_MAGIC)
1324        [16 ..  23]   size
1325        [24 ..  27]   export flags (zero-extended)
1326        [28 .. 151]   reserved     (0)
1327
1328       New style negotiation header, client can send options
1329        [ 0 ..   7]   passwd       ("NBDMAGIC")
1330        [ 8 ..  15]   magic        (NBD_OPTS_MAGIC)
1331        [16 ..  17]   server flags (0)
1332        ....options sent, ending in NBD_OPT_EXPORT_NAME or NBD_OPT_GO....
1333     */
1334
1335    qio_channel_set_blocking(client->ioc, false, NULL);
1336
1337    trace_nbd_negotiate_begin();
1338    memcpy(buf, "NBDMAGIC", 8);
1339
1340    stq_be_p(buf + 8, NBD_OPTS_MAGIC);
1341    stw_be_p(buf + 16, NBD_FLAG_FIXED_NEWSTYLE | NBD_FLAG_NO_ZEROES);
1342
1343    if (nbd_write(client->ioc, buf, 18, errp) < 0) {
1344        error_prepend(errp, "write failed: ");
1345        return -EINVAL;
1346    }
1347    ret = nbd_negotiate_options(client, errp);
1348    if (ret != 0) {
1349        if (ret < 0) {
1350            error_prepend(errp, "option negotiation failed: ");
1351        }
1352        return ret;
1353    }
1354
1355    /* Attach the channel to the same AioContext as the export */
1356    if (client->exp && client->exp->common.ctx) {
1357        qio_channel_attach_aio_context(client->ioc, client->exp->common.ctx);
1358    }
1359
1360    assert(!client->optlen);
1361    trace_nbd_negotiate_success();
1362
1363    return 0;
1364}
1365
1366/* nbd_read_eof
1367 * Tries to read @size bytes from @ioc. This is a local implementation of
1368 * qio_channel_readv_all_eof. We have it here because we need it to be
1369 * interruptible and to know when the coroutine is yielding.
1370 * Returns 1 on success
1371 *         0 on eof, when no data was read (errp is not set)
1372 *         negative errno on failure (errp is set)
1373 */
1374static inline int coroutine_fn
1375nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
1376{
1377    bool partial = false;
1378
1379    assert(size);
1380    while (size > 0) {
1381        struct iovec iov = { .iov_base = buffer, .iov_len = size };
1382        ssize_t len;
1383
1384        len = qio_channel_readv(client->ioc, &iov, 1, errp);
1385        if (len == QIO_CHANNEL_ERR_BLOCK) {
1386            client->read_yielding = true;
1387            qio_channel_yield(client->ioc, G_IO_IN);
1388            client->read_yielding = false;
1389            if (client->quiescing) {
1390                return -EAGAIN;
1391            }
1392            continue;
1393        } else if (len < 0) {
1394            return -EIO;
1395        } else if (len == 0) {
1396            if (partial) {
1397                error_setg(errp,
1398                           "Unexpected end-of-file before all bytes were read");
1399                return -EIO;
1400            } else {
1401                return 0;
1402            }
1403        }
1404
1405        partial = true;
1406        size -= len;
1407        buffer = (uint8_t *) buffer + len;
1408    }
1409    return 1;
1410}
1411
1412static int coroutine_fn nbd_receive_request(NBDClient *client, NBDRequest *request,
1413                                            Error **errp)
1414{
1415    uint8_t buf[NBD_REQUEST_SIZE];
1416    uint32_t magic;
1417    int ret;
1418
1419    ret = nbd_read_eof(client, buf, sizeof(buf), errp);
1420    if (ret < 0) {
1421        return ret;
1422    }
1423    if (ret == 0) {
1424        return -EIO;
1425    }
1426
1427    /* Request
1428       [ 0 ..  3]   magic   (NBD_REQUEST_MAGIC)
1429       [ 4 ..  5]   flags   (NBD_CMD_FLAG_FUA, ...)
1430       [ 6 ..  7]   type    (NBD_CMD_READ, ...)
1431       [ 8 .. 15]   cookie
1432       [16 .. 23]   from
1433       [24 .. 27]   len
1434     */
1435
1436    magic = ldl_be_p(buf);
1437    request->flags  = lduw_be_p(buf + 4);
1438    request->type   = lduw_be_p(buf + 6);
1439    request->cookie = ldq_be_p(buf + 8);
1440    request->from   = ldq_be_p(buf + 16);
1441    request->len    = ldl_be_p(buf + 24);
1442
1443    trace_nbd_receive_request(magic, request->flags, request->type,
1444                              request->from, request->len);
1445
1446    if (magic != NBD_REQUEST_MAGIC) {
1447        error_setg(errp, "invalid magic (got 0x%" PRIx32 ")", magic);
1448        return -EINVAL;
1449    }
1450    return 0;
1451}
1452
1453#define MAX_NBD_REQUESTS 16
1454
1455void nbd_client_get(NBDClient *client)
1456{
1457    client->refcount++;
1458}
1459
1460void nbd_client_put(NBDClient *client)
1461{
1462    if (--client->refcount == 0) {
1463        /* The last reference should be dropped by client->close,
1464         * which is called by client_close.
1465         */
1466        assert(client->closing);
1467
1468        qio_channel_detach_aio_context(client->ioc);
1469        object_unref(OBJECT(client->sioc));
1470        object_unref(OBJECT(client->ioc));
1471        if (client->tlscreds) {
1472            object_unref(OBJECT(client->tlscreds));
1473        }
1474        g_free(client->tlsauthz);
1475        if (client->exp) {
1476            QTAILQ_REMOVE(&client->exp->clients, client, next);
1477            blk_exp_unref(&client->exp->common);
1478        }
1479        g_free(client->export_meta.bitmaps);
1480        g_free(client);
1481    }
1482}
1483
1484static void client_close(NBDClient *client, bool negotiated)
1485{
1486    if (client->closing) {
1487        return;
1488    }
1489
1490    client->closing = true;
1491
1492    /* Force requests to finish.  They will drop their own references,
1493     * then we'll close the socket and free the NBDClient.
1494     */
1495    qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH,
1496                         NULL);
1497
1498    /* Also tell the client, so that they release their reference.  */
1499    if (client->close_fn) {
1500        client->close_fn(client, negotiated);
1501    }
1502}
1503
1504static NBDRequestData *nbd_request_get(NBDClient *client)
1505{
1506    NBDRequestData *req;
1507
1508    assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
1509    client->nb_requests++;
1510
1511    req = g_new0(NBDRequestData, 1);
1512    nbd_client_get(client);
1513    req->client = client;
1514    return req;
1515}
1516
1517static void nbd_request_put(NBDRequestData *req)
1518{
1519    NBDClient *client = req->client;
1520
1521    if (req->data) {
1522        qemu_vfree(req->data);
1523    }
1524    g_free(req);
1525
1526    client->nb_requests--;
1527
1528    if (client->quiescing && client->nb_requests == 0) {
1529        aio_wait_kick();
1530    }
1531
1532    nbd_client_receive_next_request(client);
1533
1534    nbd_client_put(client);
1535}
1536
1537static void blk_aio_attached(AioContext *ctx, void *opaque)
1538{
1539    NBDExport *exp = opaque;
1540    NBDClient *client;
1541
1542    trace_nbd_blk_aio_attached(exp->name, ctx);
1543
1544    exp->common.ctx = ctx;
1545
1546    QTAILQ_FOREACH(client, &exp->clients, next) {
1547        qio_channel_attach_aio_context(client->ioc, ctx);
1548
1549        assert(client->nb_requests == 0);
1550        assert(client->recv_coroutine == NULL);
1551        assert(client->send_coroutine == NULL);
1552    }
1553}
1554
1555static void blk_aio_detach(void *opaque)
1556{
1557    NBDExport *exp = opaque;
1558    NBDClient *client;
1559
1560    trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
1561
1562    QTAILQ_FOREACH(client, &exp->clients, next) {
1563        qio_channel_detach_aio_context(client->ioc);
1564    }
1565
1566    exp->common.ctx = NULL;
1567}
1568
1569static void nbd_drained_begin(void *opaque)
1570{
1571    NBDExport *exp = opaque;
1572    NBDClient *client;
1573
1574    QTAILQ_FOREACH(client, &exp->clients, next) {
1575        client->quiescing = true;
1576    }
1577}
1578
1579static void nbd_drained_end(void *opaque)
1580{
1581    NBDExport *exp = opaque;
1582    NBDClient *client;
1583
1584    QTAILQ_FOREACH(client, &exp->clients, next) {
1585        client->quiescing = false;
1586        nbd_client_receive_next_request(client);
1587    }
1588}
1589
1590static bool nbd_drained_poll(void *opaque)
1591{
1592    NBDExport *exp = opaque;
1593    NBDClient *client;
1594
1595    QTAILQ_FOREACH(client, &exp->clients, next) {
1596        if (client->nb_requests != 0) {
1597            /*
1598             * If there's a coroutine waiting for a request on nbd_read_eof()
1599             * enter it here so we don't depend on the client to wake it up.
1600             */
1601            if (client->recv_coroutine != NULL && client->read_yielding) {
1602                qio_channel_wake_read(client->ioc);
1603            }
1604
1605            return true;
1606        }
1607    }
1608
1609    return false;
1610}
1611
1612static void nbd_eject_notifier(Notifier *n, void *data)
1613{
1614    NBDExport *exp = container_of(n, NBDExport, eject_notifier);
1615
1616    blk_exp_request_shutdown(&exp->common);
1617}
1618
1619void nbd_export_set_on_eject_blk(BlockExport *exp, BlockBackend *blk)
1620{
1621    NBDExport *nbd_exp = container_of(exp, NBDExport, common);
1622    assert(exp->drv == &blk_exp_nbd);
1623    assert(nbd_exp->eject_notifier_blk == NULL);
1624
1625    blk_ref(blk);
1626    nbd_exp->eject_notifier_blk = blk;
1627    nbd_exp->eject_notifier.notify = nbd_eject_notifier;
1628    blk_add_remove_bs_notifier(blk, &nbd_exp->eject_notifier);
1629}
1630
1631static const BlockDevOps nbd_block_ops = {
1632    .drained_begin = nbd_drained_begin,
1633    .drained_end = nbd_drained_end,
1634    .drained_poll = nbd_drained_poll,
1635};
1636
1637static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args,
1638                             Error **errp)
1639{
1640    NBDExport *exp = container_of(blk_exp, NBDExport, common);
1641    BlockExportOptionsNbd *arg = &exp_args->u.nbd;
1642    const char *name = arg->name ?: exp_args->node_name;
1643    BlockBackend *blk = blk_exp->blk;
1644    int64_t size;
1645    uint64_t perm, shared_perm;
1646    bool readonly = !exp_args->writable;
1647    BlockDirtyBitmapOrStrList *bitmaps;
1648    size_t i;
1649    int ret;
1650
1651    assert(exp_args->type == BLOCK_EXPORT_TYPE_NBD);
1652
1653    if (!nbd_server_is_running()) {
1654        error_setg(errp, "NBD server not running");
1655        return -EINVAL;
1656    }
1657
1658    if (strlen(name) > NBD_MAX_STRING_SIZE) {
1659        error_setg(errp, "export name '%s' too long", name);
1660        return -EINVAL;
1661    }
1662
1663    if (arg->description && strlen(arg->description) > NBD_MAX_STRING_SIZE) {
1664        error_setg(errp, "description '%s' too long", arg->description);
1665        return -EINVAL;
1666    }
1667
1668    if (nbd_export_find(name)) {
1669        error_setg(errp, "NBD server already has export named '%s'", name);
1670        return -EEXIST;
1671    }
1672
1673    size = blk_getlength(blk);
1674    if (size < 0) {
1675        error_setg_errno(errp, -size,
1676                         "Failed to determine the NBD export's length");
1677        return size;
1678    }
1679
1680    /* Don't allow resize while the NBD server is running, otherwise we don't
1681     * care what happens with the node. */
1682    blk_get_perm(blk, &perm, &shared_perm);
1683    ret = blk_set_perm(blk, perm, shared_perm & ~BLK_PERM_RESIZE, errp);
1684    if (ret < 0) {
1685        return ret;
1686    }
1687
1688    QTAILQ_INIT(&exp->clients);
1689    exp->name = g_strdup(name);
1690    exp->description = g_strdup(arg->description);
1691    exp->nbdflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_FLUSH |
1692                     NBD_FLAG_SEND_FUA | NBD_FLAG_SEND_CACHE);
1693
1694    if (nbd_server_max_connections() != 1) {
1695        exp->nbdflags |= NBD_FLAG_CAN_MULTI_CONN;
1696    }
1697    if (readonly) {
1698        exp->nbdflags |= NBD_FLAG_READ_ONLY;
1699    } else {
1700        exp->nbdflags |= (NBD_FLAG_SEND_TRIM | NBD_FLAG_SEND_WRITE_ZEROES |
1701                          NBD_FLAG_SEND_FAST_ZERO);
1702    }
1703    exp->size = QEMU_ALIGN_DOWN(size, BDRV_SECTOR_SIZE);
1704
1705    for (bitmaps = arg->bitmaps; bitmaps; bitmaps = bitmaps->next) {
1706        exp->nr_export_bitmaps++;
1707    }
1708    exp->export_bitmaps = g_new0(BdrvDirtyBitmap *, exp->nr_export_bitmaps);
1709    for (i = 0, bitmaps = arg->bitmaps; bitmaps;
1710         i++, bitmaps = bitmaps->next)
1711    {
1712        const char *bitmap;
1713        BlockDriverState *bs = blk_bs(blk);
1714        BdrvDirtyBitmap *bm = NULL;
1715
1716        switch (bitmaps->value->type) {
1717        case QTYPE_QSTRING:
1718            bitmap = bitmaps->value->u.local;
1719            while (bs) {
1720                bm = bdrv_find_dirty_bitmap(bs, bitmap);
1721                if (bm != NULL) {
1722                    break;
1723                }
1724
1725                bs = bdrv_filter_or_cow_bs(bs);
1726            }
1727
1728            if (bm == NULL) {
1729                ret = -ENOENT;
1730                error_setg(errp, "Bitmap '%s' is not found",
1731                           bitmaps->value->u.local);
1732                goto fail;
1733            }
1734
1735            if (readonly && bdrv_is_writable(bs) &&
1736                bdrv_dirty_bitmap_enabled(bm)) {
1737                ret = -EINVAL;
1738                error_setg(errp, "Enabled bitmap '%s' incompatible with "
1739                           "readonly export", bitmap);
1740                goto fail;
1741            }
1742            break;
1743        case QTYPE_QDICT:
1744            bitmap = bitmaps->value->u.external.name;
1745            bm = block_dirty_bitmap_lookup(bitmaps->value->u.external.node,
1746                                           bitmap, NULL, errp);
1747            if (!bm) {
1748                ret = -ENOENT;
1749                goto fail;
1750            }
1751            break;
1752        default:
1753            abort();
1754        }
1755
1756        assert(bm);
1757
1758        if (bdrv_dirty_bitmap_check(bm, BDRV_BITMAP_ALLOW_RO, errp)) {
1759            ret = -EINVAL;
1760            goto fail;
1761        }
1762
1763        exp->export_bitmaps[i] = bm;
1764        assert(strlen(bitmap) <= BDRV_BITMAP_MAX_NAME_SIZE);
1765    }
1766
1767    /* Mark bitmaps busy in a separate loop, to simplify roll-back concerns. */
1768    for (i = 0; i < exp->nr_export_bitmaps; i++) {
1769        bdrv_dirty_bitmap_set_busy(exp->export_bitmaps[i], true);
1770    }
1771
1772    exp->allocation_depth = arg->allocation_depth;
1773
1774    /*
1775     * We need to inhibit request queuing in the block layer to ensure we can
1776     * be properly quiesced when entering a drained section, as our coroutines
1777     * servicing pending requests might enter blk_pread().
1778     */
1779    blk_set_disable_request_queuing(blk, true);
1780
1781    blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
1782
1783    blk_set_dev_ops(blk, &nbd_block_ops, exp);
1784
1785    QTAILQ_INSERT_TAIL(&exports, exp, next);
1786
1787    return 0;
1788
1789fail:
1790    g_free(exp->export_bitmaps);
1791    g_free(exp->name);
1792    g_free(exp->description);
1793    return ret;
1794}
1795
1796NBDExport *nbd_export_find(const char *name)
1797{
1798    NBDExport *exp;
1799    QTAILQ_FOREACH(exp, &exports, next) {
1800        if (strcmp(name, exp->name) == 0) {
1801            return exp;
1802        }
1803    }
1804
1805    return NULL;
1806}
1807
1808AioContext *
1809nbd_export_aio_context(NBDExport *exp)
1810{
1811    return exp->common.ctx;
1812}
1813
1814static void nbd_export_request_shutdown(BlockExport *blk_exp)
1815{
1816    NBDExport *exp = container_of(blk_exp, NBDExport, common);
1817    NBDClient *client, *next;
1818
1819    blk_exp_ref(&exp->common);
1820    /*
1821     * TODO: Should we expand QMP NbdServerRemoveNode enum to allow a
1822     * close mode that stops advertising the export to new clients but
1823     * still permits existing clients to run to completion? Because of
1824     * that possibility, nbd_export_close() can be called more than
1825     * once on an export.
1826     */
1827    QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) {
1828        client_close(client, true);
1829    }
1830    if (exp->name) {
1831        g_free(exp->name);
1832        exp->name = NULL;
1833        QTAILQ_REMOVE(&exports, exp, next);
1834    }
1835    blk_exp_unref(&exp->common);
1836}
1837
1838static void nbd_export_delete(BlockExport *blk_exp)
1839{
1840    size_t i;
1841    NBDExport *exp = container_of(blk_exp, NBDExport, common);
1842
1843    assert(exp->name == NULL);
1844    assert(QTAILQ_EMPTY(&exp->clients));
1845
1846    g_free(exp->description);
1847    exp->description = NULL;
1848
1849    if (exp->eject_notifier_blk) {
1850        notifier_remove(&exp->eject_notifier);
1851        blk_unref(exp->eject_notifier_blk);
1852    }
1853    blk_remove_aio_context_notifier(exp->common.blk, blk_aio_attached,
1854                                    blk_aio_detach, exp);
1855    blk_set_disable_request_queuing(exp->common.blk, false);
1856
1857    for (i = 0; i < exp->nr_export_bitmaps; i++) {
1858        bdrv_dirty_bitmap_set_busy(exp->export_bitmaps[i], false);
1859    }
1860}
1861
1862const BlockExportDriver blk_exp_nbd = {
1863    .type               = BLOCK_EXPORT_TYPE_NBD,
1864    .instance_size      = sizeof(NBDExport),
1865    .create             = nbd_export_create,
1866    .delete             = nbd_export_delete,
1867    .request_shutdown   = nbd_export_request_shutdown,
1868};
1869
1870static int coroutine_fn nbd_co_send_iov(NBDClient *client, struct iovec *iov,
1871                                        unsigned niov, Error **errp)
1872{
1873    int ret;
1874
1875    g_assert(qemu_in_coroutine());
1876    qemu_co_mutex_lock(&client->send_lock);
1877    client->send_coroutine = qemu_coroutine_self();
1878
1879    ret = qio_channel_writev_all(client->ioc, iov, niov, errp) < 0 ? -EIO : 0;
1880
1881    client->send_coroutine = NULL;
1882    qemu_co_mutex_unlock(&client->send_lock);
1883
1884    return ret;
1885}
1886
1887static inline void set_be_simple_reply(NBDSimpleReply *reply, uint64_t error,
1888                                       uint64_t cookie)
1889{
1890    stl_be_p(&reply->magic, NBD_SIMPLE_REPLY_MAGIC);
1891    stl_be_p(&reply->error, error);
1892    stq_be_p(&reply->cookie, cookie);
1893}
1894
1895static int coroutine_fn nbd_co_send_simple_reply(NBDClient *client,
1896                                                 NBDRequest *request,
1897                                                 uint32_t error,
1898                                                 void *data,
1899                                                 size_t len,
1900                                                 Error **errp)
1901{
1902    NBDSimpleReply reply;
1903    int nbd_err = system_errno_to_nbd_errno(error);
1904    struct iovec iov[] = {
1905        {.iov_base = &reply, .iov_len = sizeof(reply)},
1906        {.iov_base = data, .iov_len = len}
1907    };
1908
1909    assert(!len || !nbd_err);
1910    assert(!client->structured_reply || request->type != NBD_CMD_READ);
1911    trace_nbd_co_send_simple_reply(request->cookie, nbd_err,
1912                                   nbd_err_lookup(nbd_err), len);
1913    set_be_simple_reply(&reply, nbd_err, request->cookie);
1914
1915    return nbd_co_send_iov(client, iov, 2, errp);
1916}
1917
1918/*
1919 * Prepare the header of a reply chunk for network transmission.
1920 *
1921 * On input, @iov is partially initialized: iov[0].iov_base must point
1922 * to an uninitialized NBDReply, while the remaining @niov elements
1923 * (if any) must be ready for transmission.  This function then
1924 * populates iov[0] for transmission.
1925 */
1926static inline void set_be_chunk(NBDClient *client, struct iovec *iov,
1927                                size_t niov, uint16_t flags, uint16_t type,
1928                                NBDRequest *request)
1929{
1930    /* TODO - handle structured vs. extended replies */
1931    NBDStructuredReplyChunk *chunk = iov->iov_base;
1932    size_t i, length = 0;
1933
1934    for (i = 1; i < niov; i++) {
1935        length += iov[i].iov_len;
1936    }
1937    assert(length <= NBD_MAX_BUFFER_SIZE + sizeof(NBDStructuredReadData));
1938
1939    iov[0].iov_len = sizeof(*chunk);
1940    stl_be_p(&chunk->magic, NBD_STRUCTURED_REPLY_MAGIC);
1941    stw_be_p(&chunk->flags, flags);
1942    stw_be_p(&chunk->type, type);
1943    stq_be_p(&chunk->cookie, request->cookie);
1944    stl_be_p(&chunk->length, length);
1945}
1946
1947static int coroutine_fn nbd_co_send_chunk_done(NBDClient *client,
1948                                               NBDRequest *request,
1949                                               Error **errp)
1950{
1951    NBDReply hdr;
1952    struct iovec iov[] = {
1953        {.iov_base = &hdr},
1954    };
1955
1956    trace_nbd_co_send_chunk_done(request->cookie);
1957    set_be_chunk(client, iov, 1, NBD_REPLY_FLAG_DONE,
1958                 NBD_REPLY_TYPE_NONE, request);
1959    return nbd_co_send_iov(client, iov, 1, errp);
1960}
1961
1962static int coroutine_fn nbd_co_send_chunk_read(NBDClient *client,
1963                                               NBDRequest *request,
1964                                               uint64_t offset,
1965                                               void *data,
1966                                               size_t size,
1967                                               bool final,
1968                                               Error **errp)
1969{
1970    NBDReply hdr;
1971    NBDStructuredReadData chunk;
1972    struct iovec iov[] = {
1973        {.iov_base = &hdr},
1974        {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1975        {.iov_base = data, .iov_len = size}
1976    };
1977
1978    assert(size);
1979    trace_nbd_co_send_chunk_read(request->cookie, offset, data, size);
1980    set_be_chunk(client, iov, 3, final ? NBD_REPLY_FLAG_DONE : 0,
1981                 NBD_REPLY_TYPE_OFFSET_DATA, request);
1982    stq_be_p(&chunk.offset, offset);
1983
1984    return nbd_co_send_iov(client, iov, 3, errp);
1985}
1986/*ebb*/
1987static int coroutine_fn nbd_co_send_chunk_error(NBDClient *client,
1988                                                NBDRequest *request,
1989                                                uint32_t error,
1990                                                const char *msg,
1991                                                Error **errp)
1992{
1993    NBDReply hdr;
1994    NBDStructuredError chunk;
1995    int nbd_err = system_errno_to_nbd_errno(error);
1996    struct iovec iov[] = {
1997        {.iov_base = &hdr},
1998        {.iov_base = &chunk, .iov_len = sizeof(chunk)},
1999        {.iov_base = (char *)msg, .iov_len = msg ? strlen(msg) : 0},
2000    };
2001
2002    assert(nbd_err);
2003    trace_nbd_co_send_chunk_error(request->cookie, nbd_err,
2004                                  nbd_err_lookup(nbd_err), msg ? msg : "");
2005    set_be_chunk(client, iov, 3, NBD_REPLY_FLAG_DONE,
2006                 NBD_REPLY_TYPE_ERROR, request);
2007    stl_be_p(&chunk.error, nbd_err);
2008    stw_be_p(&chunk.message_length, iov[2].iov_len);
2009
2010    return nbd_co_send_iov(client, iov, 3, errp);
2011}
2012
2013/* Do a sparse read and send the structured reply to the client.
2014 * Returns -errno if sending fails. blk_co_block_status_above() failure is
2015 * reported to the client, at which point this function succeeds.
2016 */
2017static int coroutine_fn nbd_co_send_sparse_read(NBDClient *client,
2018                                                NBDRequest *request,
2019                                                uint64_t offset,
2020                                                uint8_t *data,
2021                                                size_t size,
2022                                                Error **errp)
2023{
2024    int ret = 0;
2025    NBDExport *exp = client->exp;
2026    size_t progress = 0;
2027
2028    while (progress < size) {
2029        int64_t pnum;
2030        int status = blk_co_block_status_above(exp->common.blk, NULL,
2031                                               offset + progress,
2032                                               size - progress, &pnum, NULL,
2033                                               NULL);
2034        bool final;
2035
2036        if (status < 0) {
2037            char *msg = g_strdup_printf("unable to check for holes: %s",
2038                                        strerror(-status));
2039
2040            ret = nbd_co_send_chunk_error(client, request, -status, msg, errp);
2041            g_free(msg);
2042            return ret;
2043        }
2044        assert(pnum && pnum <= size - progress);
2045        final = progress + pnum == size;
2046        if (status & BDRV_BLOCK_ZERO) {
2047            NBDReply hdr;
2048            NBDStructuredReadHole chunk;
2049            struct iovec iov[] = {
2050                {.iov_base = &hdr},
2051                {.iov_base = &chunk, .iov_len = sizeof(chunk)},
2052            };
2053
2054            trace_nbd_co_send_chunk_read_hole(request->cookie,
2055                                              offset + progress, pnum);
2056            set_be_chunk(client, iov, 2,
2057                         final ? NBD_REPLY_FLAG_DONE : 0,
2058                         NBD_REPLY_TYPE_OFFSET_HOLE, request);
2059            stq_be_p(&chunk.offset, offset + progress);
2060            stl_be_p(&chunk.length, pnum);
2061            ret = nbd_co_send_iov(client, iov, 2, errp);
2062        } else {
2063            ret = blk_co_pread(exp->common.blk, offset + progress, pnum,
2064                               data + progress, 0);
2065            if (ret < 0) {
2066                error_setg_errno(errp, -ret, "reading from file failed");
2067                break;
2068            }
2069            ret = nbd_co_send_chunk_read(client, request, offset + progress,
2070                                         data + progress, pnum, final, errp);
2071        }
2072
2073        if (ret < 0) {
2074            break;
2075        }
2076        progress += pnum;
2077    }
2078    return ret;
2079}
2080
2081typedef struct NBDExtentArray {
2082    NBDExtent *extents;
2083    unsigned int nb_alloc;
2084    unsigned int count;
2085    uint64_t total_length;
2086    bool can_add;
2087    bool converted_to_be;
2088} NBDExtentArray;
2089
2090static NBDExtentArray *nbd_extent_array_new(unsigned int nb_alloc)
2091{
2092    NBDExtentArray *ea = g_new0(NBDExtentArray, 1);
2093
2094    ea->nb_alloc = nb_alloc;
2095    ea->extents = g_new(NBDExtent, nb_alloc);
2096    ea->can_add = true;
2097
2098    return ea;
2099}
2100
2101static void nbd_extent_array_free(NBDExtentArray *ea)
2102{
2103    g_free(ea->extents);
2104    g_free(ea);
2105}
2106G_DEFINE_AUTOPTR_CLEANUP_FUNC(NBDExtentArray, nbd_extent_array_free)
2107
2108/* Further modifications of the array after conversion are abandoned */
2109static void nbd_extent_array_convert_to_be(NBDExtentArray *ea)
2110{
2111    int i;
2112
2113    assert(!ea->converted_to_be);
2114    ea->can_add = false;
2115    ea->converted_to_be = true;
2116
2117    for (i = 0; i < ea->count; i++) {
2118        ea->extents[i].flags = cpu_to_be32(ea->extents[i].flags);
2119        ea->extents[i].length = cpu_to_be32(ea->extents[i].length);
2120    }
2121}
2122
2123/*
2124 * Add extent to NBDExtentArray. If extent can't be added (no available space),
2125 * return -1.
2126 * For safety, when returning -1 for the first time, .can_add is set to false,
2127 * and further calls to nbd_extent_array_add() will crash.
2128 * (this avoids the situation where a caller ignores failure to add one extent,
2129 * where adding another extent that would squash into the last array entry
2130 * would result in an incorrect range reported to the client)
2131 */
2132static int nbd_extent_array_add(NBDExtentArray *ea,
2133                                uint32_t length, uint32_t flags)
2134{
2135    assert(ea->can_add);
2136
2137    if (!length) {
2138        return 0;
2139    }
2140
2141    /* Extend previous extent if flags are the same */
2142    if (ea->count > 0 && flags == ea->extents[ea->count - 1].flags) {
2143        uint64_t sum = (uint64_t)length + ea->extents[ea->count - 1].length;
2144
2145        if (sum <= UINT32_MAX) {
2146            ea->extents[ea->count - 1].length = sum;
2147            ea->total_length += length;
2148            return 0;
2149        }
2150    }
2151
2152    if (ea->count >= ea->nb_alloc) {
2153        ea->can_add = false;
2154        return -1;
2155    }
2156
2157    ea->total_length += length;
2158    ea->extents[ea->count] = (NBDExtent) {.length = length, .flags = flags};
2159    ea->count++;
2160
2161    return 0;
2162}
2163
2164static int coroutine_fn blockstatus_to_extents(BlockBackend *blk,
2165                                               uint64_t offset, uint64_t bytes,
2166                                               NBDExtentArray *ea)
2167{
2168    while (bytes) {
2169        uint32_t flags;
2170        int64_t num;
2171        int ret = blk_co_block_status_above(blk, NULL, offset, bytes, &num,
2172                                            NULL, NULL);
2173
2174        if (ret < 0) {
2175            return ret;
2176        }
2177
2178        flags = (ret & BDRV_BLOCK_DATA ? 0 : NBD_STATE_HOLE) |
2179                (ret & BDRV_BLOCK_ZERO ? NBD_STATE_ZERO : 0);
2180
2181        if (nbd_extent_array_add(ea, num, flags) < 0) {
2182            return 0;
2183        }
2184
2185        offset += num;
2186        bytes -= num;
2187    }
2188
2189    return 0;
2190}
2191
2192static int coroutine_fn blockalloc_to_extents(BlockBackend *blk,
2193                                              uint64_t offset, uint64_t bytes,
2194                                              NBDExtentArray *ea)
2195{
2196    while (bytes) {
2197        int64_t num;
2198        int ret = blk_co_is_allocated_above(blk, NULL, false, offset, bytes,
2199                                            &num);
2200
2201        if (ret < 0) {
2202            return ret;
2203        }
2204
2205        if (nbd_extent_array_add(ea, num, ret) < 0) {
2206            return 0;
2207        }
2208
2209        offset += num;
2210        bytes -= num;
2211    }
2212
2213    return 0;
2214}
2215
2216/*
2217 * nbd_co_send_extents
2218 *
2219 * @ea is converted to BE by the function
2220 * @last controls whether NBD_REPLY_FLAG_DONE is sent.
2221 */
2222static int coroutine_fn
2223nbd_co_send_extents(NBDClient *client, NBDRequest *request, NBDExtentArray *ea,
2224                    bool last, uint32_t context_id, Error **errp)
2225{
2226    NBDReply hdr;
2227    NBDStructuredMeta chunk;
2228    struct iovec iov[] = {
2229        {.iov_base = &hdr},
2230        {.iov_base = &chunk, .iov_len = sizeof(chunk)},
2231        {.iov_base = ea->extents, .iov_len = ea->count * sizeof(ea->extents[0])}
2232    };
2233
2234    nbd_extent_array_convert_to_be(ea);
2235
2236    trace_nbd_co_send_extents(request->cookie, ea->count, context_id,
2237                              ea->total_length, last);
2238    set_be_chunk(client, iov, 3, last ? NBD_REPLY_FLAG_DONE : 0,
2239                 NBD_REPLY_TYPE_BLOCK_STATUS, request);
2240    stl_be_p(&chunk.context_id, context_id);
2241
2242    return nbd_co_send_iov(client, iov, 3, errp);
2243}
2244
2245/* Get block status from the exported device and send it to the client */
2246static int
2247coroutine_fn nbd_co_send_block_status(NBDClient *client, NBDRequest *request,
2248                                      BlockBackend *blk, uint64_t offset,
2249                                      uint32_t length, bool dont_fragment,
2250                                      bool last, uint32_t context_id,
2251                                      Error **errp)
2252{
2253    int ret;
2254    unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
2255    g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents);
2256
2257    if (context_id == NBD_META_ID_BASE_ALLOCATION) {
2258        ret = blockstatus_to_extents(blk, offset, length, ea);
2259    } else {
2260        ret = blockalloc_to_extents(blk, offset, length, ea);
2261    }
2262    if (ret < 0) {
2263        return nbd_co_send_chunk_error(client, request, -ret,
2264                                       "can't get block status", errp);
2265    }
2266
2267    return nbd_co_send_extents(client, request, ea, last, context_id, errp);
2268}
2269
2270/* Populate @ea from a dirty bitmap. */
2271static void bitmap_to_extents(BdrvDirtyBitmap *bitmap,
2272                              uint64_t offset, uint64_t length,
2273                              NBDExtentArray *es)
2274{
2275    int64_t start, dirty_start, dirty_count;
2276    int64_t end = offset + length;
2277    bool full = false;
2278
2279    bdrv_dirty_bitmap_lock(bitmap);
2280
2281    for (start = offset;
2282         bdrv_dirty_bitmap_next_dirty_area(bitmap, start, end, INT32_MAX,
2283                                           &dirty_start, &dirty_count);
2284         start = dirty_start + dirty_count)
2285    {
2286        if ((nbd_extent_array_add(es, dirty_start - start, 0) < 0) ||
2287            (nbd_extent_array_add(es, dirty_count, NBD_STATE_DIRTY) < 0))
2288        {
2289            full = true;
2290            break;
2291        }
2292    }
2293
2294    if (!full) {
2295        /* last non dirty extent, nothing to do if array is now full */
2296        (void) nbd_extent_array_add(es, end - start, 0);
2297    }
2298
2299    bdrv_dirty_bitmap_unlock(bitmap);
2300}
2301
2302static int coroutine_fn nbd_co_send_bitmap(NBDClient *client,
2303                                           NBDRequest *request,
2304                                           BdrvDirtyBitmap *bitmap,
2305                                           uint64_t offset,
2306                                           uint32_t length, bool dont_fragment,
2307                                           bool last, uint32_t context_id,
2308                                           Error **errp)
2309{
2310    unsigned int nb_extents = dont_fragment ? 1 : NBD_MAX_BLOCK_STATUS_EXTENTS;
2311    g_autoptr(NBDExtentArray) ea = nbd_extent_array_new(nb_extents);
2312
2313    bitmap_to_extents(bitmap, offset, length, ea);
2314
2315    return nbd_co_send_extents(client, request, ea, last, context_id, errp);
2316}
2317
2318/* nbd_co_receive_request
2319 * Collect a client request. Return 0 if request looks valid, -EIO to drop
2320 * connection right away, -EAGAIN to indicate we were interrupted and the
2321 * channel should be quiesced, and any other negative value to report an error
2322 * to the client (although the caller may still need to disconnect after
2323 * reporting the error).
2324 */
2325static int coroutine_fn nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
2326                                               Error **errp)
2327{
2328    NBDClient *client = req->client;
2329    int valid_flags;
2330    int ret;
2331
2332    g_assert(qemu_in_coroutine());
2333    assert(client->recv_coroutine == qemu_coroutine_self());
2334    ret = nbd_receive_request(client, request, errp);
2335    if (ret < 0) {
2336        return ret;
2337    }
2338
2339    trace_nbd_co_receive_request_decode_type(request->cookie, request->type,
2340                                             nbd_cmd_lookup(request->type));
2341
2342    if (request->type != NBD_CMD_WRITE) {
2343        /* No payload, we are ready to read the next request.  */
2344        req->complete = true;
2345    }
2346
2347    if (request->type == NBD_CMD_DISC) {
2348        /* Special case: we're going to disconnect without a reply,
2349         * whether or not flags, from, or len are bogus */
2350        return -EIO;
2351    }
2352
2353    if (request->type == NBD_CMD_READ || request->type == NBD_CMD_WRITE ||
2354        request->type == NBD_CMD_CACHE)
2355    {
2356        if (request->len > NBD_MAX_BUFFER_SIZE) {
2357            error_setg(errp, "len (%" PRIu32" ) is larger than max len (%u)",
2358                       request->len, NBD_MAX_BUFFER_SIZE);
2359            return -EINVAL;
2360        }
2361
2362        if (request->type != NBD_CMD_CACHE) {
2363            req->data = blk_try_blockalign(client->exp->common.blk,
2364                                           request->len);
2365            if (req->data == NULL) {
2366                error_setg(errp, "No memory");
2367                return -ENOMEM;
2368            }
2369        }
2370    }
2371
2372    if (request->type == NBD_CMD_WRITE) {
2373        if (nbd_read(client->ioc, req->data, request->len, "CMD_WRITE data",
2374                     errp) < 0)
2375        {
2376            return -EIO;
2377        }
2378        req->complete = true;
2379
2380        trace_nbd_co_receive_request_payload_received(request->cookie,
2381                                                      request->len);
2382    }
2383
2384    /* Sanity checks. */
2385    if (client->exp->nbdflags & NBD_FLAG_READ_ONLY &&
2386        (request->type == NBD_CMD_WRITE ||
2387         request->type == NBD_CMD_WRITE_ZEROES ||
2388         request->type == NBD_CMD_TRIM)) {
2389        error_setg(errp, "Export is read-only");
2390        return -EROFS;
2391    }
2392    if (request->from > client->exp->size ||
2393        request->len > client->exp->size - request->from) {
2394        error_setg(errp, "operation past EOF; From: %" PRIu64 ", Len: %" PRIu32
2395                   ", Size: %" PRIu64, request->from, request->len,
2396                   client->exp->size);
2397        return (request->type == NBD_CMD_WRITE ||
2398                request->type == NBD_CMD_WRITE_ZEROES) ? -ENOSPC : -EINVAL;
2399    }
2400    if (client->check_align && !QEMU_IS_ALIGNED(request->from | request->len,
2401                                                client->check_align)) {
2402        /*
2403         * The block layer gracefully handles unaligned requests, but
2404         * it's still worth tracing client non-compliance
2405         */
2406        trace_nbd_co_receive_align_compliance(nbd_cmd_lookup(request->type),
2407                                              request->from,
2408                                              request->len,
2409                                              client->check_align);
2410    }
2411    valid_flags = NBD_CMD_FLAG_FUA;
2412    if (request->type == NBD_CMD_READ && client->structured_reply) {
2413        valid_flags |= NBD_CMD_FLAG_DF;
2414    } else if (request->type == NBD_CMD_WRITE_ZEROES) {
2415        valid_flags |= NBD_CMD_FLAG_NO_HOLE | NBD_CMD_FLAG_FAST_ZERO;
2416    } else if (request->type == NBD_CMD_BLOCK_STATUS) {
2417        valid_flags |= NBD_CMD_FLAG_REQ_ONE;
2418    }
2419    if (request->flags & ~valid_flags) {
2420        error_setg(errp, "unsupported flags for command %s (got 0x%x)",
2421                   nbd_cmd_lookup(request->type), request->flags);
2422        return -EINVAL;
2423    }
2424
2425    return 0;
2426}
2427
2428/* Send simple reply without a payload, or a structured error
2429 * @error_msg is ignored if @ret >= 0
2430 * Returns 0 if connection is still live, -errno on failure to talk to client
2431 */
2432static coroutine_fn int nbd_send_generic_reply(NBDClient *client,
2433                                               NBDRequest *request,
2434                                               int ret,
2435                                               const char *error_msg,
2436                                               Error **errp)
2437{
2438    if (client->structured_reply && ret < 0) {
2439        return nbd_co_send_chunk_error(client, request, -ret, error_msg, errp);
2440    } else {
2441        return nbd_co_send_simple_reply(client, request, ret < 0 ? -ret : 0,
2442                                        NULL, 0, errp);
2443    }
2444}
2445
2446/* Handle NBD_CMD_READ request.
2447 * Return -errno if sending fails. Other errors are reported directly to the
2448 * client as an error reply. */
2449static coroutine_fn int nbd_do_cmd_read(NBDClient *client, NBDRequest *request,
2450                                        uint8_t *data, Error **errp)
2451{
2452    int ret;
2453    NBDExport *exp = client->exp;
2454
2455    assert(request->type == NBD_CMD_READ);
2456
2457    /* XXX: NBD Protocol only documents use of FUA with WRITE */
2458    if (request->flags & NBD_CMD_FLAG_FUA) {
2459        ret = blk_co_flush(exp->common.blk);
2460        if (ret < 0) {
2461            return nbd_send_generic_reply(client, request, ret,
2462                                          "flush failed", errp);
2463        }
2464    }
2465
2466    if (client->structured_reply && !(request->flags & NBD_CMD_FLAG_DF) &&
2467        request->len)
2468    {
2469        return nbd_co_send_sparse_read(client, request, request->from,
2470                                       data, request->len, errp);
2471    }
2472
2473    ret = blk_co_pread(exp->common.blk, request->from, request->len, data, 0);
2474    if (ret < 0) {
2475        return nbd_send_generic_reply(client, request, ret,
2476                                      "reading from file failed", errp);
2477    }
2478
2479    if (client->structured_reply) {
2480        if (request->len) {
2481            return nbd_co_send_chunk_read(client, request, request->from, data,
2482                                          request->len, true, errp);
2483        } else {
2484            return nbd_co_send_chunk_done(client, request, errp);
2485        }
2486    } else {
2487        return nbd_co_send_simple_reply(client, request, 0,
2488                                        data, request->len, errp);
2489    }
2490}
2491
2492/*
2493 * nbd_do_cmd_cache
2494 *
2495 * Handle NBD_CMD_CACHE request.
2496 * Return -errno if sending fails. Other errors are reported directly to the
2497 * client as an error reply.
2498 */
2499static coroutine_fn int nbd_do_cmd_cache(NBDClient *client, NBDRequest *request,
2500                                         Error **errp)
2501{
2502    int ret;
2503    NBDExport *exp = client->exp;
2504
2505    assert(request->type == NBD_CMD_CACHE);
2506
2507    ret = blk_co_preadv(exp->common.blk, request->from, request->len,
2508                        NULL, BDRV_REQ_COPY_ON_READ | BDRV_REQ_PREFETCH);
2509
2510    return nbd_send_generic_reply(client, request, ret,
2511                                  "caching data failed", errp);
2512}
2513
2514/* Handle NBD request.
2515 * Return -errno if sending fails. Other errors are reported directly to the
2516 * client as an error reply. */
2517static coroutine_fn int nbd_handle_request(NBDClient *client,
2518                                           NBDRequest *request,
2519                                           uint8_t *data, Error **errp)
2520{
2521    int ret;
2522    int flags;
2523    NBDExport *exp = client->exp;
2524    char *msg;
2525    size_t i;
2526
2527    switch (request->type) {
2528    case NBD_CMD_CACHE:
2529        return nbd_do_cmd_cache(client, request, errp);
2530
2531    case NBD_CMD_READ:
2532        return nbd_do_cmd_read(client, request, data, errp);
2533
2534    case NBD_CMD_WRITE:
2535        flags = 0;
2536        if (request->flags & NBD_CMD_FLAG_FUA) {
2537            flags |= BDRV_REQ_FUA;
2538        }
2539        ret = blk_co_pwrite(exp->common.blk, request->from, request->len, data,
2540                            flags);
2541        return nbd_send_generic_reply(client, request, ret,
2542                                      "writing to file failed", errp);
2543
2544    case NBD_CMD_WRITE_ZEROES:
2545        flags = 0;
2546        if (request->flags & NBD_CMD_FLAG_FUA) {
2547            flags |= BDRV_REQ_FUA;
2548        }
2549        if (!(request->flags & NBD_CMD_FLAG_NO_HOLE)) {
2550            flags |= BDRV_REQ_MAY_UNMAP;
2551        }
2552        if (request->flags & NBD_CMD_FLAG_FAST_ZERO) {
2553            flags |= BDRV_REQ_NO_FALLBACK;
2554        }
2555        ret = blk_co_pwrite_zeroes(exp->common.blk, request->from, request->len,
2556                                   flags);
2557        return nbd_send_generic_reply(client, request, ret,
2558                                      "writing to file failed", errp);
2559
2560    case NBD_CMD_DISC:
2561        /* unreachable, thanks to special case in nbd_co_receive_request() */
2562        abort();
2563
2564    case NBD_CMD_FLUSH:
2565        ret = blk_co_flush(exp->common.blk);
2566        return nbd_send_generic_reply(client, request, ret,
2567                                      "flush failed", errp);
2568
2569    case NBD_CMD_TRIM:
2570        ret = blk_co_pdiscard(exp->common.blk, request->from, request->len);
2571        if (ret >= 0 && request->flags & NBD_CMD_FLAG_FUA) {
2572            ret = blk_co_flush(exp->common.blk);
2573        }
2574        return nbd_send_generic_reply(client, request, ret,
2575                                      "discard failed", errp);
2576
2577    case NBD_CMD_BLOCK_STATUS:
2578        if (!request->len) {
2579            return nbd_send_generic_reply(client, request, -EINVAL,
2580                                          "need non-zero length", errp);
2581        }
2582        if (client->export_meta.count) {
2583            bool dont_fragment = request->flags & NBD_CMD_FLAG_REQ_ONE;
2584            int contexts_remaining = client->export_meta.count;
2585
2586            if (client->export_meta.base_allocation) {
2587                ret = nbd_co_send_block_status(client, request,
2588                                               exp->common.blk,
2589                                               request->from,
2590                                               request->len, dont_fragment,
2591                                               !--contexts_remaining,
2592                                               NBD_META_ID_BASE_ALLOCATION,
2593                                               errp);
2594                if (ret < 0) {
2595                    return ret;
2596                }
2597            }
2598
2599            if (client->export_meta.allocation_depth) {
2600                ret = nbd_co_send_block_status(client, request,
2601                                               exp->common.blk,
2602                                               request->from, request->len,
2603                                               dont_fragment,
2604                                               !--contexts_remaining,
2605                                               NBD_META_ID_ALLOCATION_DEPTH,
2606                                               errp);
2607                if (ret < 0) {
2608                    return ret;
2609                }
2610            }
2611
2612            for (i = 0; i < client->exp->nr_export_bitmaps; i++) {
2613                if (!client->export_meta.bitmaps[i]) {
2614                    continue;
2615                }
2616                ret = nbd_co_send_bitmap(client, request,
2617                                         client->exp->export_bitmaps[i],
2618                                         request->from, request->len,
2619                                         dont_fragment, !--contexts_remaining,
2620                                         NBD_META_ID_DIRTY_BITMAP + i, errp);
2621                if (ret < 0) {
2622                    return ret;
2623                }
2624            }
2625
2626            assert(!contexts_remaining);
2627
2628            return 0;
2629        } else {
2630            return nbd_send_generic_reply(client, request, -EINVAL,
2631                                          "CMD_BLOCK_STATUS not negotiated",
2632                                          errp);
2633        }
2634
2635    default:
2636        msg = g_strdup_printf("invalid request type (%" PRIu32 ") received",
2637                              request->type);
2638        ret = nbd_send_generic_reply(client, request, -EINVAL, msg,
2639                                     errp);
2640        g_free(msg);
2641        return ret;
2642    }
2643}
2644
2645/* Owns a reference to the NBDClient passed as opaque.  */
2646static coroutine_fn void nbd_trip(void *opaque)
2647{
2648    NBDClient *client = opaque;
2649    NBDRequestData *req;
2650    NBDRequest request = { 0 };    /* GCC thinks it can be used uninitialized */
2651    int ret;
2652    Error *local_err = NULL;
2653
2654    trace_nbd_trip();
2655    if (client->closing) {
2656        nbd_client_put(client);
2657        return;
2658    }
2659
2660    if (client->quiescing) {
2661        /*
2662         * We're switching between AIO contexts. Don't attempt to receive a new
2663         * request and kick the main context which may be waiting for us.
2664         */
2665        nbd_client_put(client);
2666        client->recv_coroutine = NULL;
2667        aio_wait_kick();
2668        return;
2669    }
2670
2671    req = nbd_request_get(client);
2672    ret = nbd_co_receive_request(req, &request, &local_err);
2673    client->recv_coroutine = NULL;
2674
2675    if (client->closing) {
2676        /*
2677         * The client may be closed when we are blocked in
2678         * nbd_co_receive_request()
2679         */
2680        goto done;
2681    }
2682
2683    if (ret == -EAGAIN) {
2684        assert(client->quiescing);
2685        goto done;
2686    }
2687
2688    nbd_client_receive_next_request(client);
2689    if (ret == -EIO) {
2690        goto disconnect;
2691    }
2692
2693    qio_channel_set_cork(client->ioc, true);
2694
2695    if (ret < 0) {
2696        /* It wasn't -EIO, so, according to nbd_co_receive_request()
2697         * semantics, we should return the error to the client. */
2698        Error *export_err = local_err;
2699
2700        local_err = NULL;
2701        ret = nbd_send_generic_reply(client, &request, -EINVAL,
2702                                     error_get_pretty(export_err), &local_err);
2703        error_free(export_err);
2704    } else {
2705        ret = nbd_handle_request(client, &request, req->data, &local_err);
2706    }
2707    if (ret < 0) {
2708        error_prepend(&local_err, "Failed to send reply: ");
2709        goto disconnect;
2710    }
2711
2712    /* We must disconnect after NBD_CMD_WRITE if we did not
2713     * read the payload.
2714     */
2715    if (!req->complete) {
2716        error_setg(&local_err, "Request handling failed in intermediate state");
2717        goto disconnect;
2718    }
2719
2720    qio_channel_set_cork(client->ioc, false);
2721done:
2722    nbd_request_put(req);
2723    nbd_client_put(client);
2724    return;
2725
2726disconnect:
2727    if (local_err) {
2728        error_reportf_err(local_err, "Disconnect client, due to: ");
2729    }
2730    nbd_request_put(req);
2731    client_close(client, true);
2732    nbd_client_put(client);
2733}
2734
2735static void nbd_client_receive_next_request(NBDClient *client)
2736{
2737    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS &&
2738        !client->quiescing) {
2739        nbd_client_get(client);
2740        client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
2741        aio_co_schedule(client->exp->common.ctx, client->recv_coroutine);
2742    }
2743}
2744
2745static coroutine_fn void nbd_co_client_start(void *opaque)
2746{
2747    NBDClient *client = opaque;
2748    Error *local_err = NULL;
2749
2750    qemu_co_mutex_init(&client->send_lock);
2751
2752    if (nbd_negotiate(client, &local_err)) {
2753        if (local_err) {
2754            error_report_err(local_err);
2755        }
2756        client_close(client, false);
2757        return;
2758    }
2759
2760    nbd_client_receive_next_request(client);
2761}
2762
2763/*
2764 * Create a new client listener using the given channel @sioc.
2765 * Begin servicing it in a coroutine.  When the connection closes, call
2766 * @close_fn with an indication of whether the client completed negotiation.
2767 */
2768void nbd_client_new(QIOChannelSocket *sioc,
2769                    QCryptoTLSCreds *tlscreds,
2770                    const char *tlsauthz,
2771                    void (*close_fn)(NBDClient *, bool))
2772{
2773    NBDClient *client;
2774    Coroutine *co;
2775
2776    client = g_new0(NBDClient, 1);
2777    client->refcount = 1;
2778    client->tlscreds = tlscreds;
2779    if (tlscreds) {
2780        object_ref(OBJECT(client->tlscreds));
2781    }
2782    client->tlsauthz = g_strdup(tlsauthz);
2783    client->sioc = sioc;
2784    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
2785    object_ref(OBJECT(client->sioc));
2786    client->ioc = QIO_CHANNEL(sioc);
2787    object_ref(OBJECT(client->ioc));
2788    client->close_fn = close_fn;
2789
2790    co = qemu_coroutine_create(nbd_co_client_start, client);
2791    qemu_coroutine_enter(co);
2792}
2793