qemu/nbd/server.c
<<
>>
Prefs
   1/*
   2 *  Copyright (C) 2016-2017 Red Hat, Inc.
   3 *  Copyright (C) 2005  Anthony Liguori <anthony@codemonkey.ws>
   4 *
   5 *  Network Block Device Server Side
   6 *
   7 *  This program is free software; you can redistribute it and/or modify
   8 *  it under the terms of the GNU General Public License as published by
   9 *  the Free Software Foundation; under version 2 of the License.
  10 *
  11 *  This program is distributed in the hope that it will be useful,
  12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14 *  GNU General Public License for more details.
  15 *
  16 *  You should have received a copy of the GNU General Public License
  17 *  along with this program; if not, see <http://www.gnu.org/licenses/>.
  18 */
  19
  20#include "qemu/osdep.h"
  21#include "qapi/error.h"
  22#include "trace.h"
  23#include "nbd-internal.h"
  24
  25static int system_errno_to_nbd_errno(int err)
  26{
  27    switch (err) {
  28    case 0:
  29        return NBD_SUCCESS;
  30    case EPERM:
  31    case EROFS:
  32        return NBD_EPERM;
  33    case EIO:
  34        return NBD_EIO;
  35    case ENOMEM:
  36        return NBD_ENOMEM;
  37#ifdef EDQUOT
  38    case EDQUOT:
  39#endif
  40    case EFBIG:
  41    case ENOSPC:
  42        return NBD_ENOSPC;
  43    case ESHUTDOWN:
  44        return NBD_ESHUTDOWN;
  45    case EINVAL:
  46    default:
  47        return NBD_EINVAL;
  48    }
  49}
  50
  51/* Definitions for opaque data types */
  52
  53typedef struct NBDRequestData NBDRequestData;
  54
  55struct NBDRequestData {
  56    QSIMPLEQ_ENTRY(NBDRequestData) entry;
  57    NBDClient *client;
  58    uint8_t *data;
  59    bool complete;
  60};
  61
  62struct NBDExport {
  63    int refcount;
  64    void (*close)(NBDExport *exp);
  65
  66    BlockBackend *blk;
  67    char *name;
  68    char *description;
  69    off_t dev_offset;
  70    off_t size;
  71    uint16_t nbdflags;
  72    QTAILQ_HEAD(, NBDClient) clients;
  73    QTAILQ_ENTRY(NBDExport) next;
  74
  75    AioContext *ctx;
  76
  77    BlockBackend *eject_notifier_blk;
  78    Notifier eject_notifier;
  79};
  80
  81static QTAILQ_HEAD(, NBDExport) exports = QTAILQ_HEAD_INITIALIZER(exports);
  82
  83struct NBDClient {
  84    int refcount;
  85    void (*close_fn)(NBDClient *client, bool negotiated);
  86
  87    NBDExport *exp;
  88    QCryptoTLSCreds *tlscreds;
  89    char *tlsaclname;
  90    QIOChannelSocket *sioc; /* The underlying data channel */
  91    QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
  92
  93    Coroutine *recv_coroutine;
  94
  95    CoMutex send_lock;
  96    Coroutine *send_coroutine;
  97
  98    QTAILQ_ENTRY(NBDClient) next;
  99    int nb_requests;
 100    bool closing;
 101};
 102
 103/* That's all folks */
 104
 105static void nbd_client_receive_next_request(NBDClient *client);
 106
 107/* Basic flow for negotiation
 108
 109   Server         Client
 110   Negotiate
 111
 112   or
 113
 114   Server         Client
 115   Negotiate #1
 116                  Option
 117   Negotiate #2
 118
 119   ----
 120
 121   followed by
 122
 123   Server         Client
 124                  Request
 125   Response
 126                  Request
 127   Response
 128                  ...
 129   ...
 130                  Request (type == 2)
 131
 132*/
 133
 134/* Send a reply header, including length, but no payload.
 135 * Return -errno on error, 0 on success. */
 136static int nbd_negotiate_send_rep_len(QIOChannel *ioc, uint32_t type,
 137                                      uint32_t opt, uint32_t len, Error **errp)
 138{
 139    uint64_t magic;
 140
 141    trace_nbd_negotiate_send_rep_len(opt, nbd_opt_lookup(opt),
 142                                     type, nbd_rep_lookup(type), len);
 143
 144    assert(len < NBD_MAX_BUFFER_SIZE);
 145    magic = cpu_to_be64(NBD_REP_MAGIC);
 146    if (nbd_write(ioc, &magic, sizeof(magic), errp) < 0) {
 147        error_prepend(errp, "write failed (rep magic): ");
 148        return -EINVAL;
 149    }
 150
 151    opt = cpu_to_be32(opt);
 152    if (nbd_write(ioc, &opt, sizeof(opt), errp) < 0) {
 153        error_prepend(errp, "write failed (rep opt): ");
 154        return -EINVAL;
 155    }
 156
 157    type = cpu_to_be32(type);
 158    if (nbd_write(ioc, &type, sizeof(type), errp) < 0) {
 159        error_prepend(errp, "write failed (rep type): ");
 160        return -EINVAL;
 161    }
 162
 163    len = cpu_to_be32(len);
 164    if (nbd_write(ioc, &len, sizeof(len), errp) < 0) {
 165        error_prepend(errp, "write failed (rep data length): ");
 166        return -EINVAL;
 167    }
 168    return 0;
 169}
 170
 171/* Send a reply header with default 0 length.
 172 * Return -errno on error, 0 on success. */
 173static int nbd_negotiate_send_rep(QIOChannel *ioc, uint32_t type, uint32_t opt,
 174                                  Error **errp)
 175{
 176    return nbd_negotiate_send_rep_len(ioc, type, opt, 0, errp);
 177}
 178
 179/* Send an error reply.
 180 * Return -errno on error, 0 on success. */
 181static int GCC_FMT_ATTR(5, 6)
 182nbd_negotiate_send_rep_err(QIOChannel *ioc, uint32_t type,
 183                           uint32_t opt, Error **errp, const char *fmt, ...)
 184{
 185    va_list va;
 186    char *msg;
 187    int ret;
 188    size_t len;
 189
 190    va_start(va, fmt);
 191    msg = g_strdup_vprintf(fmt, va);
 192    va_end(va);
 193    len = strlen(msg);
 194    assert(len < 4096);
 195    trace_nbd_negotiate_send_rep_err(msg);
 196    ret = nbd_negotiate_send_rep_len(ioc, type, opt, len, errp);
 197    if (ret < 0) {
 198        goto out;
 199    }
 200    if (nbd_write(ioc, msg, len, errp) < 0) {
 201        error_prepend(errp, "write failed (error message): ");
 202        ret = -EIO;
 203    } else {
 204        ret = 0;
 205    }
 206
 207out:
 208    g_free(msg);
 209    return ret;
 210}
 211
 212/* Send a single NBD_REP_SERVER reply to NBD_OPT_LIST, including payload.
 213 * Return -errno on error, 0 on success. */
 214static int nbd_negotiate_send_rep_list(QIOChannel *ioc, NBDExport *exp,
 215                                       Error **errp)
 216{
 217    size_t name_len, desc_len;
 218    uint32_t len;
 219    const char *name = exp->name ? exp->name : "";
 220    const char *desc = exp->description ? exp->description : "";
 221    int ret;
 222
 223    trace_nbd_negotiate_send_rep_list(name, desc);
 224    name_len = strlen(name);
 225    desc_len = strlen(desc);
 226    len = name_len + desc_len + sizeof(len);
 227    ret = nbd_negotiate_send_rep_len(ioc, NBD_REP_SERVER, NBD_OPT_LIST, len,
 228                                     errp);
 229    if (ret < 0) {
 230        return ret;
 231    }
 232
 233    len = cpu_to_be32(name_len);
 234    if (nbd_write(ioc, &len, sizeof(len), errp) < 0) {
 235        error_prepend(errp, "write failed (name length): ");
 236        return -EINVAL;
 237    }
 238
 239    if (nbd_write(ioc, name, name_len, errp) < 0) {
 240        error_prepend(errp, "write failed (name buffer): ");
 241        return -EINVAL;
 242    }
 243
 244    if (nbd_write(ioc, desc, desc_len, errp) < 0) {
 245        error_prepend(errp, "write failed (description buffer): ");
 246        return -EINVAL;
 247    }
 248
 249    return 0;
 250}
 251
 252/* Process the NBD_OPT_LIST command, with a potential series of replies.
 253 * Return -errno on error, 0 on success. */
 254static int nbd_negotiate_handle_list(NBDClient *client, uint32_t length,
 255                                     Error **errp)
 256{
 257    NBDExport *exp;
 258
 259    if (length) {
 260        if (nbd_drop(client->ioc, length, errp) < 0) {
 261            return -EIO;
 262        }
 263        return nbd_negotiate_send_rep_err(client->ioc,
 264                                          NBD_REP_ERR_INVALID, NBD_OPT_LIST,
 265                                          errp,
 266                                          "OPT_LIST should not have length");
 267    }
 268
 269    /* For each export, send a NBD_REP_SERVER reply. */
 270    QTAILQ_FOREACH(exp, &exports, next) {
 271        if (nbd_negotiate_send_rep_list(client->ioc, exp, errp)) {
 272            return -EINVAL;
 273        }
 274    }
 275    /* Finish with a NBD_REP_ACK. */
 276    return nbd_negotiate_send_rep(client->ioc, NBD_REP_ACK, NBD_OPT_LIST, errp);
 277}
 278
 279/* Send a reply to NBD_OPT_EXPORT_NAME.
 280 * Return -errno on error, 0 on success. */
 281static int nbd_negotiate_handle_export_name(NBDClient *client, uint32_t length,
 282                                            uint16_t myflags, bool no_zeroes,
 283                                            Error **errp)
 284{
 285    char name[NBD_MAX_NAME_SIZE + 1];
 286    char buf[NBD_REPLY_EXPORT_NAME_SIZE] = "";
 287    size_t len;
 288    int ret;
 289
 290    /* Client sends:
 291        [20 ..  xx]   export name (length bytes)
 292       Server replies:
 293        [ 0 ..   7]   size
 294        [ 8 ..   9]   export flags
 295        [10 .. 133]   reserved     (0) [unless no_zeroes]
 296     */
 297    trace_nbd_negotiate_handle_export_name();
 298    if (length >= sizeof(name)) {
 299        error_setg(errp, "Bad length received");
 300        return -EINVAL;
 301    }
 302    if (nbd_read(client->ioc, name, length, errp) < 0) {
 303        error_prepend(errp, "read failed: ");
 304        return -EINVAL;
 305    }
 306    name[length] = '\0';
 307
 308    trace_nbd_negotiate_handle_export_name_request(name);
 309
 310    client->exp = nbd_export_find(name);
 311    if (!client->exp) {
 312        error_setg(errp, "export not found");
 313        return -EINVAL;
 314    }
 315
 316    trace_nbd_negotiate_new_style_size_flags(client->exp->size,
 317                                             client->exp->nbdflags | myflags);
 318    stq_be_p(buf, client->exp->size);
 319    stw_be_p(buf + 8, client->exp->nbdflags | myflags);
 320    len = no_zeroes ? 10 : sizeof(buf);
 321    ret = nbd_write(client->ioc, buf, len, errp);
 322    if (ret < 0) {
 323        error_prepend(errp, "write failed: ");
 324        return ret;
 325    }
 326
 327    QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
 328    nbd_export_get(client->exp);
 329
 330    return 0;
 331}
 332
 333/* Send a single NBD_REP_INFO, with a buffer @buf of @length bytes.
 334 * The buffer does NOT include the info type prefix.
 335 * Return -errno on error, 0 if ready to send more. */
 336static int nbd_negotiate_send_info(NBDClient *client, uint32_t opt,
 337                                   uint16_t info, uint32_t length, void *buf,
 338                                   Error **errp)
 339{
 340    int rc;
 341
 342    trace_nbd_negotiate_send_info(info, nbd_info_lookup(info), length);
 343    rc = nbd_negotiate_send_rep_len(client->ioc, NBD_REP_INFO, opt,
 344                                    sizeof(info) + length, errp);
 345    if (rc < 0) {
 346        return rc;
 347    }
 348    cpu_to_be16s(&info);
 349    if (nbd_write(client->ioc, &info, sizeof(info), errp) < 0) {
 350        return -EIO;
 351    }
 352    if (nbd_write(client->ioc, buf, length, errp) < 0) {
 353        return -EIO;
 354    }
 355    return 0;
 356}
 357
 358/* Handle NBD_OPT_INFO and NBD_OPT_GO.
 359 * Return -errno on error, 0 if ready for next option, and 1 to move
 360 * into transmission phase.  */
 361static int nbd_negotiate_handle_info(NBDClient *client, uint32_t length,
 362                                     uint32_t opt, uint16_t myflags,
 363                                     Error **errp)
 364{
 365    int rc;
 366    char name[NBD_MAX_NAME_SIZE + 1];
 367    NBDExport *exp;
 368    uint16_t requests;
 369    uint16_t request;
 370    uint32_t namelen;
 371    bool sendname = false;
 372    bool blocksize = false;
 373    uint32_t sizes[3];
 374    char buf[sizeof(uint64_t) + sizeof(uint16_t)];
 375    const char *msg;
 376
 377    /* Client sends:
 378        4 bytes: L, name length (can be 0)
 379        L bytes: export name
 380        2 bytes: N, number of requests (can be 0)
 381        N * 2 bytes: N requests
 382    */
 383    if (length < sizeof(namelen) + sizeof(requests)) {
 384        msg = "overall request too short";
 385        goto invalid;
 386    }
 387    if (nbd_read(client->ioc, &namelen, sizeof(namelen), errp) < 0) {
 388        return -EIO;
 389    }
 390    be32_to_cpus(&namelen);
 391    length -= sizeof(namelen);
 392    if (namelen > length - sizeof(requests) || (length - namelen) % 2) {
 393        msg = "name length is incorrect";
 394        goto invalid;
 395    }
 396    if (nbd_read(client->ioc, name, namelen, errp) < 0) {
 397        return -EIO;
 398    }
 399    name[namelen] = '\0';
 400    length -= namelen;
 401    trace_nbd_negotiate_handle_export_name_request(name);
 402
 403    if (nbd_read(client->ioc, &requests, sizeof(requests), errp) < 0) {
 404        return -EIO;
 405    }
 406    be16_to_cpus(&requests);
 407    length -= sizeof(requests);
 408    trace_nbd_negotiate_handle_info_requests(requests);
 409    if (requests != length / sizeof(request)) {
 410        msg = "incorrect number of  requests for overall length";
 411        goto invalid;
 412    }
 413    while (requests--) {
 414        if (nbd_read(client->ioc, &request, sizeof(request), errp) < 0) {
 415            return -EIO;
 416        }
 417        be16_to_cpus(&request);
 418        length -= sizeof(request);
 419        trace_nbd_negotiate_handle_info_request(request,
 420                                                nbd_info_lookup(request));
 421        /* We care about NBD_INFO_NAME and NBD_INFO_BLOCK_SIZE;
 422         * everything else is either a request we don't know or
 423         * something we send regardless of request */
 424        switch (request) {
 425        case NBD_INFO_NAME:
 426            sendname = true;
 427            break;
 428        case NBD_INFO_BLOCK_SIZE:
 429            blocksize = true;
 430            break;
 431        }
 432    }
 433
 434    exp = nbd_export_find(name);
 435    if (!exp) {
 436        return nbd_negotiate_send_rep_err(client->ioc, NBD_REP_ERR_UNKNOWN,
 437                                          opt, errp, "export '%s' not present",
 438                                          name);
 439    }
 440
 441    /* Don't bother sending NBD_INFO_NAME unless client requested it */
 442    if (sendname) {
 443        rc = nbd_negotiate_send_info(client, opt, NBD_INFO_NAME, length, name,
 444                                     errp);
 445        if (rc < 0) {
 446            return rc;
 447        }
 448    }
 449
 450    /* Send NBD_INFO_DESCRIPTION only if available, regardless of
 451     * client request */
 452    if (exp->description) {
 453        size_t len = strlen(exp->description);
 454
 455        rc = nbd_negotiate_send_info(client, opt, NBD_INFO_DESCRIPTION,
 456                                     len, exp->description, errp);
 457        if (rc < 0) {
 458            return rc;
 459        }
 460    }
 461
 462    /* Send NBD_INFO_BLOCK_SIZE always, but tweak the minimum size
 463     * according to whether the client requested it, and according to
 464     * whether this is OPT_INFO or OPT_GO. */
 465    /* minimum - 1 for back-compat, or 512 if client is new enough.
 466     * TODO: consult blk_bs(blk)->bl.request_alignment? */
 467    sizes[0] = (opt == NBD_OPT_INFO || blocksize) ? BDRV_SECTOR_SIZE : 1;
 468    /* preferred - Hard-code to 4096 for now.
 469     * TODO: is blk_bs(blk)->bl.opt_transfer appropriate? */
 470    sizes[1] = 4096;
 471    /* maximum - At most 32M, but smaller as appropriate. */
 472    sizes[2] = MIN(blk_get_max_transfer(exp->blk), NBD_MAX_BUFFER_SIZE);
 473    trace_nbd_negotiate_handle_info_block_size(sizes[0], sizes[1], sizes[2]);
 474    cpu_to_be32s(&sizes[0]);
 475    cpu_to_be32s(&sizes[1]);
 476    cpu_to_be32s(&sizes[2]);
 477    rc = nbd_negotiate_send_info(client, opt, NBD_INFO_BLOCK_SIZE,
 478                                 sizeof(sizes), sizes, errp);
 479    if (rc < 0) {
 480        return rc;
 481    }
 482
 483    /* Send NBD_INFO_EXPORT always */
 484    trace_nbd_negotiate_new_style_size_flags(exp->size,
 485                                             exp->nbdflags | myflags);
 486    stq_be_p(buf, exp->size);
 487    stw_be_p(buf + 8, exp->nbdflags | myflags);
 488    rc = nbd_negotiate_send_info(client, opt, NBD_INFO_EXPORT,
 489                                 sizeof(buf), buf, errp);
 490    if (rc < 0) {
 491        return rc;
 492    }
 493
 494    /* If the client is just asking for NBD_OPT_INFO, but forgot to
 495     * request block sizes, return an error.
 496     * TODO: consult blk_bs(blk)->request_align, and only error if it
 497     * is not 1? */
 498    if (opt == NBD_OPT_INFO && !blocksize) {
 499        return nbd_negotiate_send_rep_err(client->ioc,
 500                                          NBD_REP_ERR_BLOCK_SIZE_REQD, opt,
 501                                          errp,
 502                                          "request NBD_INFO_BLOCK_SIZE to "
 503                                          "use this export");
 504    }
 505
 506    /* Final reply */
 507    rc = nbd_negotiate_send_rep(client->ioc, NBD_REP_ACK, opt, errp);
 508    if (rc < 0) {
 509        return rc;
 510    }
 511
 512    if (opt == NBD_OPT_GO) {
 513        client->exp = exp;
 514        QTAILQ_INSERT_TAIL(&client->exp->clients, client, next);
 515        nbd_export_get(client->exp);
 516        rc = 1;
 517    }
 518    return rc;
 519
 520 invalid:
 521    if (nbd_drop(client->ioc, length, errp) < 0) {
 522        return -EIO;
 523    }
 524    return nbd_negotiate_send_rep_err(client->ioc, NBD_REP_ERR_INVALID, opt,
 525                                      errp, "%s", msg);
 526}
 527
 528
 529/* Handle NBD_OPT_STARTTLS. Return NULL to drop connection, or else the
 530 * new channel for all further (now-encrypted) communication. */
 531static QIOChannel *nbd_negotiate_handle_starttls(NBDClient *client,
 532                                                 uint32_t length,
 533                                                 Error **errp)
 534{
 535    QIOChannel *ioc;
 536    QIOChannelTLS *tioc;
 537    struct NBDTLSHandshakeData data = { 0 };
 538
 539    trace_nbd_negotiate_handle_starttls();
 540    ioc = client->ioc;
 541    if (length) {
 542        if (nbd_drop(ioc, length, errp) < 0) {
 543            return NULL;
 544        }
 545        nbd_negotiate_send_rep_err(ioc, NBD_REP_ERR_INVALID, NBD_OPT_STARTTLS,
 546                                   errp,
 547                                   "OPT_STARTTLS should not have length");
 548        return NULL;
 549    }
 550
 551    if (nbd_negotiate_send_rep(client->ioc, NBD_REP_ACK,
 552                               NBD_OPT_STARTTLS, errp) < 0) {
 553        return NULL;
 554    }
 555
 556    tioc = qio_channel_tls_new_server(ioc,
 557                                      client->tlscreds,
 558                                      client->tlsaclname,
 559                                      errp);
 560    if (!tioc) {
 561        return NULL;
 562    }
 563
 564    qio_channel_set_name(QIO_CHANNEL(tioc), "nbd-server-tls");
 565    trace_nbd_negotiate_handle_starttls_handshake();
 566    data.loop = g_main_loop_new(g_main_context_default(), FALSE);
 567    qio_channel_tls_handshake(tioc,
 568                              nbd_tls_handshake,
 569                              &data,
 570                              NULL);
 571
 572    if (!data.complete) {
 573        g_main_loop_run(data.loop);
 574    }
 575    g_main_loop_unref(data.loop);
 576    if (data.error) {
 577        object_unref(OBJECT(tioc));
 578        error_propagate(errp, data.error);
 579        return NULL;
 580    }
 581
 582    return QIO_CHANNEL(tioc);
 583}
 584
 585/* nbd_negotiate_options
 586 * Process all NBD_OPT_* client option commands, during fixed newstyle
 587 * negotiation.
 588 * Return:
 589 * -errno  on error, errp is set
 590 * 0       on successful negotiation, errp is not set
 591 * 1       if client sent NBD_OPT_ABORT, i.e. on valid disconnect,
 592 *         errp is not set
 593 */
 594static int nbd_negotiate_options(NBDClient *client, uint16_t myflags,
 595                                 Error **errp)
 596{
 597    uint32_t flags;
 598    bool fixedNewstyle = false;
 599    bool no_zeroes = false;
 600
 601    /* Client sends:
 602        [ 0 ..   3]   client flags
 603
 604       Then we loop until NBD_OPT_EXPORT_NAME or NBD_OPT_GO:
 605        [ 0 ..   7]   NBD_OPTS_MAGIC
 606        [ 8 ..  11]   NBD option
 607        [12 ..  15]   Data length
 608        ...           Rest of request
 609
 610        [ 0 ..   7]   NBD_OPTS_MAGIC
 611        [ 8 ..  11]   Second NBD option
 612        [12 ..  15]   Data length
 613        ...           Rest of request
 614    */
 615
 616    if (nbd_read(client->ioc, &flags, sizeof(flags), errp) < 0) {
 617        error_prepend(errp, "read failed: ");
 618        return -EIO;
 619    }
 620    be32_to_cpus(&flags);
 621    trace_nbd_negotiate_options_flags(flags);
 622    if (flags & NBD_FLAG_C_FIXED_NEWSTYLE) {
 623        fixedNewstyle = true;
 624        flags &= ~NBD_FLAG_C_FIXED_NEWSTYLE;
 625    }
 626    if (flags & NBD_FLAG_C_NO_ZEROES) {
 627        no_zeroes = true;
 628        flags &= ~NBD_FLAG_C_NO_ZEROES;
 629    }
 630    if (flags != 0) {
 631        error_setg(errp, "Unknown client flags 0x%" PRIx32 " received", flags);
 632        return -EINVAL;
 633    }
 634
 635    while (1) {
 636        int ret;
 637        uint32_t option, length;
 638        uint64_t magic;
 639
 640        if (nbd_read(client->ioc, &magic, sizeof(magic), errp) < 0) {
 641            error_prepend(errp, "read failed: ");
 642            return -EINVAL;
 643        }
 644        magic = be64_to_cpu(magic);
 645        trace_nbd_negotiate_options_check_magic(magic);
 646        if (magic != NBD_OPTS_MAGIC) {
 647            error_setg(errp, "Bad magic received");
 648            return -EINVAL;
 649        }
 650
 651        if (nbd_read(client->ioc, &option,
 652                     sizeof(option), errp) < 0) {
 653            error_prepend(errp, "read failed: ");
 654            return -EINVAL;
 655        }
 656        option = be32_to_cpu(option);
 657
 658        if (nbd_read(client->ioc, &length, sizeof(length), errp) < 0) {
 659            error_prepend(errp, "read failed: ");
 660            return -EINVAL;
 661        }
 662        length = be32_to_cpu(length);
 663
 664        trace_nbd_negotiate_options_check_option(option,
 665                                                 nbd_opt_lookup(option));
 666        if (client->tlscreds &&
 667            client->ioc == (QIOChannel *)client->sioc) {
 668            QIOChannel *tioc;
 669            if (!fixedNewstyle) {
 670                error_setg(errp, "Unsupported option 0x%" PRIx32, option);
 671                return -EINVAL;
 672            }
 673            switch (option) {
 674            case NBD_OPT_STARTTLS:
 675                tioc = nbd_negotiate_handle_starttls(client, length, errp);
 676                if (!tioc) {
 677                    return -EIO;
 678                }
 679                object_unref(OBJECT(client->ioc));
 680                client->ioc = QIO_CHANNEL(tioc);
 681                break;
 682
 683            case NBD_OPT_EXPORT_NAME:
 684                /* No way to return an error to client, so drop connection */
 685                error_setg(errp, "Option 0x%x not permitted before TLS",
 686                           option);
 687                return -EINVAL;
 688
 689            default:
 690                if (nbd_drop(client->ioc, length, errp) < 0) {
 691                    return -EIO;
 692                }
 693                ret = nbd_negotiate_send_rep_err(client->ioc,
 694                                                 NBD_REP_ERR_TLS_REQD,
 695                                                 option, errp,
 696                                                 "Option 0x%" PRIx32
 697                                                 "not permitted before TLS",
 698                                                 option);
 699                if (ret < 0) {
 700                    return ret;
 701                }
 702                /* Let the client keep trying, unless they asked to
 703                 * quit. In this mode, we've already sent an error, so
 704                 * we can't ack the abort.  */
 705                if (option == NBD_OPT_ABORT) {
 706                    return 1;
 707                }
 708                break;
 709            }
 710        } else if (fixedNewstyle) {
 711            switch (option) {
 712            case NBD_OPT_LIST:
 713                ret = nbd_negotiate_handle_list(client, length, errp);
 714                if (ret < 0) {
 715                    return ret;
 716                }
 717                break;
 718
 719            case NBD_OPT_ABORT:
 720                /* NBD spec says we must try to reply before
 721                 * disconnecting, but that we must also tolerate
 722                 * guests that don't wait for our reply. */
 723                nbd_negotiate_send_rep(client->ioc, NBD_REP_ACK, option, NULL);
 724                return 1;
 725
 726            case NBD_OPT_EXPORT_NAME:
 727                return nbd_negotiate_handle_export_name(client, length,
 728                                                        myflags, no_zeroes,
 729                                                        errp);
 730
 731            case NBD_OPT_INFO:
 732            case NBD_OPT_GO:
 733                ret = nbd_negotiate_handle_info(client, length, option,
 734                                                myflags, errp);
 735                if (ret == 1) {
 736                    assert(option == NBD_OPT_GO);
 737                    return 0;
 738                }
 739                if (ret) {
 740                    return ret;
 741                }
 742                break;
 743
 744            case NBD_OPT_STARTTLS:
 745                if (nbd_drop(client->ioc, length, errp) < 0) {
 746                    return -EIO;
 747                }
 748                if (client->tlscreds) {
 749                    ret = nbd_negotiate_send_rep_err(client->ioc,
 750                                                     NBD_REP_ERR_INVALID,
 751                                                     option, errp,
 752                                                     "TLS already enabled");
 753                } else {
 754                    ret = nbd_negotiate_send_rep_err(client->ioc,
 755                                                     NBD_REP_ERR_POLICY,
 756                                                     option, errp,
 757                                                     "TLS not configured");
 758                }
 759                if (ret < 0) {
 760                    return ret;
 761                }
 762                break;
 763            default:
 764                if (nbd_drop(client->ioc, length, errp) < 0) {
 765                    return -EIO;
 766                }
 767                ret = nbd_negotiate_send_rep_err(client->ioc,
 768                                                 NBD_REP_ERR_UNSUP,
 769                                                 option, errp,
 770                                                 "Unsupported option 0x%"
 771                                                 PRIx32 " (%s)", option,
 772                                                 nbd_opt_lookup(option));
 773                if (ret < 0) {
 774                    return ret;
 775                }
 776                break;
 777            }
 778        } else {
 779            /*
 780             * If broken new-style we should drop the connection
 781             * for anything except NBD_OPT_EXPORT_NAME
 782             */
 783            switch (option) {
 784            case NBD_OPT_EXPORT_NAME:
 785                return nbd_negotiate_handle_export_name(client, length,
 786                                                        myflags, no_zeroes,
 787                                                        errp);
 788
 789            default:
 790                error_setg(errp, "Unsupported option 0x%" PRIx32 " (%s)",
 791                           option, nbd_opt_lookup(option));
 792                return -EINVAL;
 793            }
 794        }
 795    }
 796}
 797
 798/* nbd_negotiate
 799 * Return:
 800 * -errno  on error, errp is set
 801 * 0       on successful negotiation, errp is not set
 802 * 1       if client sent NBD_OPT_ABORT, i.e. on valid disconnect,
 803 *         errp is not set
 804 */
 805static coroutine_fn int nbd_negotiate(NBDClient *client, Error **errp)
 806{
 807    char buf[NBD_OLDSTYLE_NEGOTIATE_SIZE] = "";
 808    int ret;
 809    const uint16_t myflags = (NBD_FLAG_HAS_FLAGS | NBD_FLAG_SEND_TRIM |
 810                              NBD_FLAG_SEND_FLUSH | NBD_FLAG_SEND_FUA |
 811                              NBD_FLAG_SEND_WRITE_ZEROES);
 812    bool oldStyle;
 813
 814    /* Old style negotiation header, no room for options
 815        [ 0 ..   7]   passwd       ("NBDMAGIC")
 816        [ 8 ..  15]   magic        (NBD_CLIENT_MAGIC)
 817        [16 ..  23]   size
 818        [24 ..  27]   export flags (zero-extended)
 819        [28 .. 151]   reserved     (0)
 820
 821       New style negotiation header, client can send options
 822        [ 0 ..   7]   passwd       ("NBDMAGIC")
 823        [ 8 ..  15]   magic        (NBD_OPTS_MAGIC)
 824        [16 ..  17]   server flags (0)
 825        ....options sent, ending in NBD_OPT_EXPORT_NAME or NBD_OPT_GO....
 826     */
 827
 828    qio_channel_set_blocking(client->ioc, false, NULL);
 829
 830    trace_nbd_negotiate_begin();
 831    memcpy(buf, "NBDMAGIC", 8);
 832
 833    oldStyle = client->exp != NULL && !client->tlscreds;
 834    if (oldStyle) {
 835        trace_nbd_negotiate_old_style(client->exp->size,
 836                                      client->exp->nbdflags | myflags);
 837        stq_be_p(buf + 8, NBD_CLIENT_MAGIC);
 838        stq_be_p(buf + 16, client->exp->size);
 839        stl_be_p(buf + 24, client->exp->nbdflags | myflags);
 840
 841        if (nbd_write(client->ioc, buf, sizeof(buf), errp) < 0) {
 842            error_prepend(errp, "write failed: ");
 843            return -EINVAL;
 844        }
 845    } else {
 846        stq_be_p(buf + 8, NBD_OPTS_MAGIC);
 847        stw_be_p(buf + 16, NBD_FLAG_FIXED_NEWSTYLE | NBD_FLAG_NO_ZEROES);
 848
 849        if (nbd_write(client->ioc, buf, 18, errp) < 0) {
 850            error_prepend(errp, "write failed: ");
 851            return -EINVAL;
 852        }
 853        ret = nbd_negotiate_options(client, myflags, errp);
 854        if (ret != 0) {
 855            if (ret < 0) {
 856                error_prepend(errp, "option negotiation failed: ");
 857            }
 858            return ret;
 859        }
 860    }
 861
 862    trace_nbd_negotiate_success();
 863
 864    return 0;
 865}
 866
 867static int nbd_receive_request(QIOChannel *ioc, NBDRequest *request,
 868                               Error **errp)
 869{
 870    uint8_t buf[NBD_REQUEST_SIZE];
 871    uint32_t magic;
 872    int ret;
 873
 874    ret = nbd_read(ioc, buf, sizeof(buf), errp);
 875    if (ret < 0) {
 876        return ret;
 877    }
 878
 879    /* Request
 880       [ 0 ..  3]   magic   (NBD_REQUEST_MAGIC)
 881       [ 4 ..  5]   flags   (NBD_CMD_FLAG_FUA, ...)
 882       [ 6 ..  7]   type    (NBD_CMD_READ, ...)
 883       [ 8 .. 15]   handle
 884       [16 .. 23]   from
 885       [24 .. 27]   len
 886     */
 887
 888    magic = ldl_be_p(buf);
 889    request->flags  = lduw_be_p(buf + 4);
 890    request->type   = lduw_be_p(buf + 6);
 891    request->handle = ldq_be_p(buf + 8);
 892    request->from   = ldq_be_p(buf + 16);
 893    request->len    = ldl_be_p(buf + 24);
 894
 895    trace_nbd_receive_request(magic, request->flags, request->type,
 896                              request->from, request->len);
 897
 898    if (magic != NBD_REQUEST_MAGIC) {
 899        error_setg(errp, "invalid magic (got 0x%" PRIx32 ")", magic);
 900        return -EINVAL;
 901    }
 902    return 0;
 903}
 904
 905static int nbd_send_reply(QIOChannel *ioc, NBDReply *reply, Error **errp)
 906{
 907    uint8_t buf[NBD_REPLY_SIZE];
 908
 909    reply->error = system_errno_to_nbd_errno(reply->error);
 910
 911    trace_nbd_send_reply(reply->error, reply->handle);
 912
 913    /* Reply
 914       [ 0 ..  3]    magic   (NBD_REPLY_MAGIC)
 915       [ 4 ..  7]    error   (0 == no error)
 916       [ 7 .. 15]    handle
 917     */
 918    stl_be_p(buf, NBD_REPLY_MAGIC);
 919    stl_be_p(buf + 4, reply->error);
 920    stq_be_p(buf + 8, reply->handle);
 921
 922    return nbd_write(ioc, buf, sizeof(buf), errp);
 923}
 924
 925#define MAX_NBD_REQUESTS 16
 926
 927void nbd_client_get(NBDClient *client)
 928{
 929    client->refcount++;
 930}
 931
 932void nbd_client_put(NBDClient *client)
 933{
 934    if (--client->refcount == 0) {
 935        /* The last reference should be dropped by client->close,
 936         * which is called by client_close.
 937         */
 938        assert(client->closing);
 939
 940        qio_channel_detach_aio_context(client->ioc);
 941        object_unref(OBJECT(client->sioc));
 942        object_unref(OBJECT(client->ioc));
 943        if (client->tlscreds) {
 944            object_unref(OBJECT(client->tlscreds));
 945        }
 946        g_free(client->tlsaclname);
 947        if (client->exp) {
 948            QTAILQ_REMOVE(&client->exp->clients, client, next);
 949            nbd_export_put(client->exp);
 950        }
 951        g_free(client);
 952    }
 953}
 954
 955static void client_close(NBDClient *client, bool negotiated)
 956{
 957    if (client->closing) {
 958        return;
 959    }
 960
 961    client->closing = true;
 962
 963    /* Force requests to finish.  They will drop their own references,
 964     * then we'll close the socket and free the NBDClient.
 965     */
 966    qio_channel_shutdown(client->ioc, QIO_CHANNEL_SHUTDOWN_BOTH,
 967                         NULL);
 968
 969    /* Also tell the client, so that they release their reference.  */
 970    if (client->close_fn) {
 971        client->close_fn(client, negotiated);
 972    }
 973}
 974
 975static NBDRequestData *nbd_request_get(NBDClient *client)
 976{
 977    NBDRequestData *req;
 978
 979    assert(client->nb_requests <= MAX_NBD_REQUESTS - 1);
 980    client->nb_requests++;
 981
 982    req = g_new0(NBDRequestData, 1);
 983    nbd_client_get(client);
 984    req->client = client;
 985    return req;
 986}
 987
 988static void nbd_request_put(NBDRequestData *req)
 989{
 990    NBDClient *client = req->client;
 991
 992    if (req->data) {
 993        qemu_vfree(req->data);
 994    }
 995    g_free(req);
 996
 997    client->nb_requests--;
 998    nbd_client_receive_next_request(client);
 999
1000    nbd_client_put(client);
1001}
1002
1003static void blk_aio_attached(AioContext *ctx, void *opaque)
1004{
1005    NBDExport *exp = opaque;
1006    NBDClient *client;
1007
1008    trace_nbd_blk_aio_attached(exp->name, ctx);
1009
1010    exp->ctx = ctx;
1011
1012    QTAILQ_FOREACH(client, &exp->clients, next) {
1013        qio_channel_attach_aio_context(client->ioc, ctx);
1014        if (client->recv_coroutine) {
1015            aio_co_schedule(ctx, client->recv_coroutine);
1016        }
1017        if (client->send_coroutine) {
1018            aio_co_schedule(ctx, client->send_coroutine);
1019        }
1020    }
1021}
1022
1023static void blk_aio_detach(void *opaque)
1024{
1025    NBDExport *exp = opaque;
1026    NBDClient *client;
1027
1028    trace_nbd_blk_aio_detach(exp->name, exp->ctx);
1029
1030    QTAILQ_FOREACH(client, &exp->clients, next) {
1031        qio_channel_detach_aio_context(client->ioc);
1032    }
1033
1034    exp->ctx = NULL;
1035}
1036
1037static void nbd_eject_notifier(Notifier *n, void *data)
1038{
1039    NBDExport *exp = container_of(n, NBDExport, eject_notifier);
1040    nbd_export_close(exp);
1041}
1042
1043NBDExport *nbd_export_new(BlockDriverState *bs, off_t dev_offset, off_t size,
1044                          uint16_t nbdflags, void (*close)(NBDExport *),
1045                          bool writethrough, BlockBackend *on_eject_blk,
1046                          Error **errp)
1047{
1048    AioContext *ctx;
1049    BlockBackend *blk;
1050    NBDExport *exp = g_malloc0(sizeof(NBDExport));
1051    uint64_t perm;
1052    int ret;
1053
1054    /*
1055     * NBD exports are used for non-shared storage migration.  Make sure
1056     * that BDRV_O_INACTIVE is cleared and the image is ready for write
1057     * access since the export could be available before migration handover.
1058     */
1059    ctx = bdrv_get_aio_context(bs);
1060    aio_context_acquire(ctx);
1061    bdrv_invalidate_cache(bs, NULL);
1062    aio_context_release(ctx);
1063
1064    /* Don't allow resize while the NBD server is running, otherwise we don't
1065     * care what happens with the node. */
1066    perm = BLK_PERM_CONSISTENT_READ;
1067    if ((nbdflags & NBD_FLAG_READ_ONLY) == 0) {
1068        perm |= BLK_PERM_WRITE;
1069    }
1070    blk = blk_new(perm, BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED |
1071                        BLK_PERM_WRITE | BLK_PERM_GRAPH_MOD);
1072    ret = blk_insert_bs(blk, bs, errp);
1073    if (ret < 0) {
1074        goto fail;
1075    }
1076    blk_set_enable_write_cache(blk, !writethrough);
1077
1078    exp->refcount = 1;
1079    QTAILQ_INIT(&exp->clients);
1080    exp->blk = blk;
1081    exp->dev_offset = dev_offset;
1082    exp->nbdflags = nbdflags;
1083    exp->size = size < 0 ? blk_getlength(blk) : size;
1084    if (exp->size < 0) {
1085        error_setg_errno(errp, -exp->size,
1086                         "Failed to determine the NBD export's length");
1087        goto fail;
1088    }
1089    exp->size -= exp->size % BDRV_SECTOR_SIZE;
1090
1091    exp->close = close;
1092    exp->ctx = blk_get_aio_context(blk);
1093    blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
1094
1095    if (on_eject_blk) {
1096        blk_ref(on_eject_blk);
1097        exp->eject_notifier_blk = on_eject_blk;
1098        exp->eject_notifier.notify = nbd_eject_notifier;
1099        blk_add_remove_bs_notifier(on_eject_blk, &exp->eject_notifier);
1100    }
1101    return exp;
1102
1103fail:
1104    blk_unref(blk);
1105    g_free(exp);
1106    return NULL;
1107}
1108
1109NBDExport *nbd_export_find(const char *name)
1110{
1111    NBDExport *exp;
1112    QTAILQ_FOREACH(exp, &exports, next) {
1113        if (strcmp(name, exp->name) == 0) {
1114            return exp;
1115        }
1116    }
1117
1118    return NULL;
1119}
1120
1121void nbd_export_set_name(NBDExport *exp, const char *name)
1122{
1123    if (exp->name == name) {
1124        return;
1125    }
1126
1127    nbd_export_get(exp);
1128    if (exp->name != NULL) {
1129        g_free(exp->name);
1130        exp->name = NULL;
1131        QTAILQ_REMOVE(&exports, exp, next);
1132        nbd_export_put(exp);
1133    }
1134    if (name != NULL) {
1135        nbd_export_get(exp);
1136        exp->name = g_strdup(name);
1137        QTAILQ_INSERT_TAIL(&exports, exp, next);
1138    }
1139    nbd_export_put(exp);
1140}
1141
1142void nbd_export_set_description(NBDExport *exp, const char *description)
1143{
1144    g_free(exp->description);
1145    exp->description = g_strdup(description);
1146}
1147
1148void nbd_export_close(NBDExport *exp)
1149{
1150    NBDClient *client, *next;
1151
1152    nbd_export_get(exp);
1153    QTAILQ_FOREACH_SAFE(client, &exp->clients, next, next) {
1154        client_close(client, true);
1155    }
1156    nbd_export_set_name(exp, NULL);
1157    nbd_export_set_description(exp, NULL);
1158    nbd_export_put(exp);
1159}
1160
1161void nbd_export_get(NBDExport *exp)
1162{
1163    assert(exp->refcount > 0);
1164    exp->refcount++;
1165}
1166
1167void nbd_export_put(NBDExport *exp)
1168{
1169    assert(exp->refcount > 0);
1170    if (exp->refcount == 1) {
1171        nbd_export_close(exp);
1172    }
1173
1174    if (--exp->refcount == 0) {
1175        assert(exp->name == NULL);
1176        assert(exp->description == NULL);
1177
1178        if (exp->close) {
1179            exp->close(exp);
1180        }
1181
1182        if (exp->blk) {
1183            if (exp->eject_notifier_blk) {
1184                notifier_remove(&exp->eject_notifier);
1185                blk_unref(exp->eject_notifier_blk);
1186            }
1187            blk_remove_aio_context_notifier(exp->blk, blk_aio_attached,
1188                                            blk_aio_detach, exp);
1189            blk_unref(exp->blk);
1190            exp->blk = NULL;
1191        }
1192
1193        g_free(exp);
1194    }
1195}
1196
1197BlockBackend *nbd_export_get_blockdev(NBDExport *exp)
1198{
1199    return exp->blk;
1200}
1201
1202void nbd_export_close_all(void)
1203{
1204    NBDExport *exp, *next;
1205
1206    QTAILQ_FOREACH_SAFE(exp, &exports, next, next) {
1207        nbd_export_close(exp);
1208    }
1209}
1210
1211static int nbd_co_send_reply(NBDRequestData *req, NBDReply *reply, int len,
1212                             Error **errp)
1213{
1214    NBDClient *client = req->client;
1215    int ret;
1216
1217    g_assert(qemu_in_coroutine());
1218
1219    trace_nbd_co_send_reply(reply->handle, reply->error, len);
1220
1221    qemu_co_mutex_lock(&client->send_lock);
1222    client->send_coroutine = qemu_coroutine_self();
1223
1224    if (!len) {
1225        ret = nbd_send_reply(client->ioc, reply, errp);
1226    } else {
1227        qio_channel_set_cork(client->ioc, true);
1228        ret = nbd_send_reply(client->ioc, reply, errp);
1229        if (ret == 0) {
1230            ret = nbd_write(client->ioc, req->data, len, errp);
1231            if (ret < 0) {
1232                ret = -EIO;
1233            }
1234        }
1235        qio_channel_set_cork(client->ioc, false);
1236    }
1237
1238    client->send_coroutine = NULL;
1239    qemu_co_mutex_unlock(&client->send_lock);
1240    return ret;
1241}
1242
1243/* nbd_co_receive_request
1244 * Collect a client request. Return 0 if request looks valid, -EIO to drop
1245 * connection right away, and any other negative value to report an error to
1246 * the client (although the caller may still need to disconnect after reporting
1247 * the error).
1248 */
1249static int nbd_co_receive_request(NBDRequestData *req, NBDRequest *request,
1250                                  Error **errp)
1251{
1252    NBDClient *client = req->client;
1253
1254    g_assert(qemu_in_coroutine());
1255    assert(client->recv_coroutine == qemu_coroutine_self());
1256    if (nbd_receive_request(client->ioc, request, errp) < 0) {
1257        return -EIO;
1258    }
1259
1260    trace_nbd_co_receive_request_decode_type(request->handle, request->type,
1261                                             nbd_cmd_lookup(request->type));
1262
1263    if (request->type != NBD_CMD_WRITE) {
1264        /* No payload, we are ready to read the next request.  */
1265        req->complete = true;
1266    }
1267
1268    if (request->type == NBD_CMD_DISC) {
1269        /* Special case: we're going to disconnect without a reply,
1270         * whether or not flags, from, or len are bogus */
1271        return -EIO;
1272    }
1273
1274    /* Check for sanity in the parameters, part 1.  Defer as many
1275     * checks as possible until after reading any NBD_CMD_WRITE
1276     * payload, so we can try and keep the connection alive.  */
1277    if ((request->from + request->len) < request->from) {
1278        error_setg(errp,
1279                   "integer overflow detected, you're probably being attacked");
1280        return -EINVAL;
1281    }
1282
1283    if (request->type == NBD_CMD_READ || request->type == NBD_CMD_WRITE) {
1284        if (request->len > NBD_MAX_BUFFER_SIZE) {
1285            error_setg(errp, "len (%" PRIu32" ) is larger than max len (%u)",
1286                       request->len, NBD_MAX_BUFFER_SIZE);
1287            return -EINVAL;
1288        }
1289
1290        req->data = blk_try_blockalign(client->exp->blk, request->len);
1291        if (req->data == NULL) {
1292            error_setg(errp, "No memory");
1293            return -ENOMEM;
1294        }
1295    }
1296    if (request->type == NBD_CMD_WRITE) {
1297        if (nbd_read(client->ioc, req->data, request->len, errp) < 0) {
1298            error_prepend(errp, "reading from socket failed: ");
1299            return -EIO;
1300        }
1301        req->complete = true;
1302
1303        trace_nbd_co_receive_request_payload_received(request->handle,
1304                                                      request->len);
1305    }
1306
1307    /* Sanity checks, part 2. */
1308    if (request->from + request->len > client->exp->size) {
1309        error_setg(errp, "operation past EOF; From: %" PRIu64 ", Len: %" PRIu32
1310                   ", Size: %" PRIu64, request->from, request->len,
1311                   (uint64_t)client->exp->size);
1312        return request->type == NBD_CMD_WRITE ? -ENOSPC : -EINVAL;
1313    }
1314    if (request->flags & ~(NBD_CMD_FLAG_FUA | NBD_CMD_FLAG_NO_HOLE)) {
1315        error_setg(errp, "unsupported flags (got 0x%x)", request->flags);
1316        return -EINVAL;
1317    }
1318    if (request->type != NBD_CMD_WRITE_ZEROES &&
1319        (request->flags & NBD_CMD_FLAG_NO_HOLE)) {
1320        error_setg(errp, "unexpected flags (got 0x%x)", request->flags);
1321        return -EINVAL;
1322    }
1323
1324    return 0;
1325}
1326
1327/* Owns a reference to the NBDClient passed as opaque.  */
1328static coroutine_fn void nbd_trip(void *opaque)
1329{
1330    NBDClient *client = opaque;
1331    NBDExport *exp = client->exp;
1332    NBDRequestData *req;
1333    NBDRequest request = { 0 };    /* GCC thinks it can be used uninitialized */
1334    NBDReply reply;
1335    int ret;
1336    int flags;
1337    int reply_data_len = 0;
1338    Error *local_err = NULL;
1339
1340    trace_nbd_trip();
1341    if (client->closing) {
1342        nbd_client_put(client);
1343        return;
1344    }
1345
1346    req = nbd_request_get(client);
1347    ret = nbd_co_receive_request(req, &request, &local_err);
1348    client->recv_coroutine = NULL;
1349    nbd_client_receive_next_request(client);
1350    if (ret == -EIO) {
1351        goto disconnect;
1352    }
1353
1354    reply.handle = request.handle;
1355    reply.error = 0;
1356
1357    if (ret < 0) {
1358        reply.error = -ret;
1359        goto reply;
1360    }
1361
1362    if (client->closing) {
1363        /*
1364         * The client may be closed when we are blocked in
1365         * nbd_co_receive_request()
1366         */
1367        goto done;
1368    }
1369
1370    switch (request.type) {
1371    case NBD_CMD_READ:
1372        /* XXX: NBD Protocol only documents use of FUA with WRITE */
1373        if (request.flags & NBD_CMD_FLAG_FUA) {
1374            ret = blk_co_flush(exp->blk);
1375            if (ret < 0) {
1376                error_setg_errno(&local_err, -ret, "flush failed");
1377                reply.error = -ret;
1378                break;
1379            }
1380        }
1381
1382        ret = blk_pread(exp->blk, request.from + exp->dev_offset,
1383                        req->data, request.len);
1384        if (ret < 0) {
1385            error_setg_errno(&local_err, -ret, "reading from file failed");
1386            reply.error = -ret;
1387            break;
1388        }
1389
1390        reply_data_len = request.len;
1391
1392        break;
1393    case NBD_CMD_WRITE:
1394        if (exp->nbdflags & NBD_FLAG_READ_ONLY) {
1395            reply.error = EROFS;
1396            break;
1397        }
1398
1399        flags = 0;
1400        if (request.flags & NBD_CMD_FLAG_FUA) {
1401            flags |= BDRV_REQ_FUA;
1402        }
1403        ret = blk_pwrite(exp->blk, request.from + exp->dev_offset,
1404                         req->data, request.len, flags);
1405        if (ret < 0) {
1406            error_setg_errno(&local_err, -ret, "writing to file failed");
1407            reply.error = -ret;
1408        }
1409
1410        break;
1411    case NBD_CMD_WRITE_ZEROES:
1412        if (exp->nbdflags & NBD_FLAG_READ_ONLY) {
1413            error_setg(&local_err, "Server is read-only, return error");
1414            reply.error = EROFS;
1415            break;
1416        }
1417
1418        flags = 0;
1419        if (request.flags & NBD_CMD_FLAG_FUA) {
1420            flags |= BDRV_REQ_FUA;
1421        }
1422        if (!(request.flags & NBD_CMD_FLAG_NO_HOLE)) {
1423            flags |= BDRV_REQ_MAY_UNMAP;
1424        }
1425        ret = blk_pwrite_zeroes(exp->blk, request.from + exp->dev_offset,
1426                                request.len, flags);
1427        if (ret < 0) {
1428            error_setg_errno(&local_err, -ret, "writing to file failed");
1429            reply.error = -ret;
1430        }
1431
1432        break;
1433    case NBD_CMD_DISC:
1434        /* unreachable, thanks to special case in nbd_co_receive_request() */
1435        abort();
1436
1437    case NBD_CMD_FLUSH:
1438        ret = blk_co_flush(exp->blk);
1439        if (ret < 0) {
1440            error_setg_errno(&local_err, -ret, "flush failed");
1441            reply.error = -ret;
1442        }
1443
1444        break;
1445    case NBD_CMD_TRIM:
1446        ret = blk_co_pdiscard(exp->blk, request.from + exp->dev_offset,
1447                              request.len);
1448        if (ret < 0) {
1449            error_setg_errno(&local_err, -ret, "discard failed");
1450            reply.error = -ret;
1451        }
1452
1453        break;
1454    default:
1455        error_setg(&local_err, "invalid request type (%" PRIu32 ") received",
1456                   request.type);
1457        reply.error = EINVAL;
1458    }
1459
1460reply:
1461    if (local_err) {
1462        /* If we are here local_err is not fatal error, already stored in
1463         * reply.error */
1464        error_report_err(local_err);
1465        local_err = NULL;
1466    }
1467
1468    if (nbd_co_send_reply(req, &reply, reply_data_len, &local_err) < 0) {
1469        error_prepend(&local_err, "Failed to send reply: ");
1470        goto disconnect;
1471    }
1472
1473    /* We must disconnect after NBD_CMD_WRITE if we did not
1474     * read the payload.
1475     */
1476    if (!req->complete) {
1477        error_setg(&local_err, "Request handling failed in intermediate state");
1478        goto disconnect;
1479    }
1480
1481done:
1482    nbd_request_put(req);
1483    nbd_client_put(client);
1484    return;
1485
1486disconnect:
1487    if (local_err) {
1488        error_reportf_err(local_err, "Disconnect client, due to: ");
1489    }
1490    nbd_request_put(req);
1491    client_close(client, true);
1492    nbd_client_put(client);
1493}
1494
1495static void nbd_client_receive_next_request(NBDClient *client)
1496{
1497    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS) {
1498        nbd_client_get(client);
1499        client->recv_coroutine = qemu_coroutine_create(nbd_trip, client);
1500        aio_co_schedule(client->exp->ctx, client->recv_coroutine);
1501    }
1502}
1503
1504static coroutine_fn void nbd_co_client_start(void *opaque)
1505{
1506    NBDClient *client = opaque;
1507    NBDExport *exp = client->exp;
1508    Error *local_err = NULL;
1509
1510    if (exp) {
1511        nbd_export_get(exp);
1512        QTAILQ_INSERT_TAIL(&exp->clients, client, next);
1513    }
1514    qemu_co_mutex_init(&client->send_lock);
1515
1516    if (nbd_negotiate(client, &local_err)) {
1517        if (local_err) {
1518            error_report_err(local_err);
1519        }
1520        client_close(client, false);
1521        return;
1522    }
1523
1524    nbd_client_receive_next_request(client);
1525}
1526
1527/*
1528 * Create a new client listener on the given export @exp, using the
1529 * given channel @sioc.  Begin servicing it in a coroutine.  When the
1530 * connection closes, call @close_fn with an indication of whether the
1531 * client completed negotiation.
1532 */
1533void nbd_client_new(NBDExport *exp,
1534                    QIOChannelSocket *sioc,
1535                    QCryptoTLSCreds *tlscreds,
1536                    const char *tlsaclname,
1537                    void (*close_fn)(NBDClient *, bool))
1538{
1539    NBDClient *client;
1540    Coroutine *co;
1541
1542    client = g_malloc0(sizeof(NBDClient));
1543    client->refcount = 1;
1544    client->exp = exp;
1545    client->tlscreds = tlscreds;
1546    if (tlscreds) {
1547        object_ref(OBJECT(client->tlscreds));
1548    }
1549    client->tlsaclname = g_strdup(tlsaclname);
1550    client->sioc = sioc;
1551    object_ref(OBJECT(client->sioc));
1552    client->ioc = QIO_CHANNEL(sioc);
1553    object_ref(OBJECT(client->ioc));
1554    client->close_fn = close_fn;
1555
1556    co = qemu_coroutine_create(nbd_co_client_start, client);
1557    qemu_coroutine_enter(co);
1558}
1559