qemu/block/nbd-client.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for  NBD
   3 *
   4 * Copyright (C) 2016 Red Hat, Inc.
   5 * Copyright (C) 2008 Bull S.A.S.
   6 *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
   7 *
   8 * Some parts:
   9 *    Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
  10 *
  11 * Permission is hereby granted, free of charge, to any person obtaining a copy
  12 * of this software and associated documentation files (the "Software"), to deal
  13 * in the Software without restriction, including without limitation the rights
  14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  15 * copies of the Software, and to permit persons to whom the Software is
  16 * furnished to do so, subject to the following conditions:
  17 *
  18 * The above copyright notice and this permission notice shall be included in
  19 * all copies or substantial portions of the Software.
  20 *
  21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  24 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  27 * THE SOFTWARE.
  28 */
  29
  30#include "qemu/osdep.h"
  31#include "qapi/error.h"
  32#include "nbd-client.h"
  33
  34#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ (uint64_t)(intptr_t)(bs))
  35#define INDEX_TO_HANDLE(bs, index)  ((index)  ^ (uint64_t)(intptr_t)(bs))
  36
  37static void nbd_recv_coroutines_wake_all(NBDClientSession *s)
  38{
  39    int i;
  40
  41    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
  42        NBDClientRequest *req = &s->requests[i];
  43
  44        if (req->coroutine && req->receiving) {
  45            aio_co_wake(req->coroutine);
  46        }
  47    }
  48}
  49
  50static void nbd_teardown_connection(BlockDriverState *bs)
  51{
  52    NBDClientSession *client = nbd_get_client_session(bs);
  53
  54    if (!client->ioc) { /* Already closed */
  55        return;
  56    }
  57
  58    /* finish any pending coroutines */
  59    qio_channel_shutdown(client->ioc,
  60                         QIO_CHANNEL_SHUTDOWN_BOTH,
  61                         NULL);
  62    BDRV_POLL_WHILE(bs, client->read_reply_co);
  63
  64    nbd_client_detach_aio_context(bs);
  65    object_unref(OBJECT(client->sioc));
  66    client->sioc = NULL;
  67    object_unref(OBJECT(client->ioc));
  68    client->ioc = NULL;
  69}
  70
  71static coroutine_fn void nbd_read_reply_entry(void *opaque)
  72{
  73    NBDClientSession *s = opaque;
  74    uint64_t i;
  75    int ret = 0;
  76    Error *local_err = NULL;
  77
  78    while (!s->quit) {
  79        assert(s->reply.handle == 0);
  80        ret = nbd_receive_reply(s->ioc, &s->reply, &local_err);
  81        if (local_err) {
  82            error_report_err(local_err);
  83        }
  84        if (ret <= 0) {
  85            break;
  86        }
  87
  88        /* There's no need for a mutex on the receive side, because the
  89         * handler acts as a synchronization point and ensures that only
  90         * one coroutine is called until the reply finishes.
  91         */
  92        i = HANDLE_TO_INDEX(s, s->reply.handle);
  93        if (i >= MAX_NBD_REQUESTS ||
  94            !s->requests[i].coroutine ||
  95            !s->requests[i].receiving ||
  96            (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
  97        {
  98            break;
  99        }
 100
 101        /* We're woken up again by the request itself.  Note that there
 102         * is no race between yielding and reentering read_reply_co.  This
 103         * is because:
 104         *
 105         * - if the request runs on the same AioContext, it is only
 106         *   entered after we yield
 107         *
 108         * - if the request runs on a different AioContext, reentering
 109         *   read_reply_co happens through a bottom half, which can only
 110         *   run after we yield.
 111         */
 112        aio_co_wake(s->requests[i].coroutine);
 113        qemu_coroutine_yield();
 114    }
 115
 116    s->quit = true;
 117    nbd_recv_coroutines_wake_all(s);
 118    s->read_reply_co = NULL;
 119    aio_wait_kick();
 120}
 121
 122static int nbd_co_send_request(BlockDriverState *bs,
 123                               NBDRequest *request,
 124                               QEMUIOVector *qiov)
 125{
 126    NBDClientSession *s = nbd_get_client_session(bs);
 127    int rc, i;
 128
 129    qemu_co_mutex_lock(&s->send_mutex);
 130    while (s->in_flight == MAX_NBD_REQUESTS) {
 131        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
 132    }
 133    s->in_flight++;
 134
 135    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
 136        if (s->requests[i].coroutine == NULL) {
 137            break;
 138        }
 139    }
 140
 141    g_assert(qemu_in_coroutine());
 142    assert(i < MAX_NBD_REQUESTS);
 143
 144    s->requests[i].coroutine = qemu_coroutine_self();
 145    s->requests[i].offset = request->from;
 146    s->requests[i].receiving = false;
 147
 148    request->handle = INDEX_TO_HANDLE(s, i);
 149
 150    if (s->quit) {
 151        rc = -EIO;
 152        goto err;
 153    }
 154    if (!s->ioc) {
 155        rc = -EPIPE;
 156        goto err;
 157    }
 158
 159    if (qiov) {
 160        qio_channel_set_cork(s->ioc, true);
 161        rc = nbd_send_request(s->ioc, request);
 162        if (rc >= 0 && !s->quit) {
 163            if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
 164                                       NULL) < 0) {
 165                rc = -EIO;
 166            }
 167        } else if (rc >= 0) {
 168            rc = -EIO;
 169        }
 170        qio_channel_set_cork(s->ioc, false);
 171    } else {
 172        rc = nbd_send_request(s->ioc, request);
 173    }
 174
 175err:
 176    if (rc < 0) {
 177        s->quit = true;
 178        s->requests[i].coroutine = NULL;
 179        s->in_flight--;
 180        qemu_co_queue_next(&s->free_sema);
 181    }
 182    qemu_co_mutex_unlock(&s->send_mutex);
 183    return rc;
 184}
 185
 186static inline uint16_t payload_advance16(uint8_t **payload)
 187{
 188    *payload += 2;
 189    return lduw_be_p(*payload - 2);
 190}
 191
 192static inline uint32_t payload_advance32(uint8_t **payload)
 193{
 194    *payload += 4;
 195    return ldl_be_p(*payload - 4);
 196}
 197
 198static inline uint64_t payload_advance64(uint8_t **payload)
 199{
 200    *payload += 8;
 201    return ldq_be_p(*payload - 8);
 202}
 203
 204static int nbd_parse_offset_hole_payload(NBDStructuredReplyChunk *chunk,
 205                                         uint8_t *payload, uint64_t orig_offset,
 206                                         QEMUIOVector *qiov, Error **errp)
 207{
 208    uint64_t offset;
 209    uint32_t hole_size;
 210
 211    if (chunk->length != sizeof(offset) + sizeof(hole_size)) {
 212        error_setg(errp, "Protocol error: invalid payload for "
 213                         "NBD_REPLY_TYPE_OFFSET_HOLE");
 214        return -EINVAL;
 215    }
 216
 217    offset = payload_advance64(&payload);
 218    hole_size = payload_advance32(&payload);
 219
 220    if (!hole_size || offset < orig_offset || hole_size > qiov->size ||
 221        offset > orig_offset + qiov->size - hole_size) {
 222        error_setg(errp, "Protocol error: server sent chunk exceeding requested"
 223                         " region");
 224        return -EINVAL;
 225    }
 226
 227    qemu_iovec_memset(qiov, offset - orig_offset, 0, hole_size);
 228
 229    return 0;
 230}
 231
 232/* nbd_parse_blockstatus_payload
 233 * support only one extent in reply and only for
 234 * base:allocation context
 235 */
 236static int nbd_parse_blockstatus_payload(NBDClientSession *client,
 237                                         NBDStructuredReplyChunk *chunk,
 238                                         uint8_t *payload, uint64_t orig_length,
 239                                         NBDExtent *extent, Error **errp)
 240{
 241    uint32_t context_id;
 242
 243    if (chunk->length != sizeof(context_id) + sizeof(*extent)) {
 244        error_setg(errp, "Protocol error: invalid payload for "
 245                         "NBD_REPLY_TYPE_BLOCK_STATUS");
 246        return -EINVAL;
 247    }
 248
 249    context_id = payload_advance32(&payload);
 250    if (client->info.meta_base_allocation_id != context_id) {
 251        error_setg(errp, "Protocol error: unexpected context id %d for "
 252                         "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
 253                         "id is %d", context_id,
 254                         client->info.meta_base_allocation_id);
 255        return -EINVAL;
 256    }
 257
 258    extent->length = payload_advance32(&payload);
 259    extent->flags = payload_advance32(&payload);
 260
 261    if (extent->length == 0 ||
 262        (client->info.min_block && !QEMU_IS_ALIGNED(extent->length,
 263                                                    client->info.min_block))) {
 264        error_setg(errp, "Protocol error: server sent status chunk with "
 265                   "invalid length");
 266        return -EINVAL;
 267    }
 268
 269    /* The server is allowed to send us extra information on the final
 270     * extent; just clamp it to the length we requested. */
 271    if (extent->length > orig_length) {
 272        extent->length = orig_length;
 273    }
 274
 275    return 0;
 276}
 277
 278/* nbd_parse_error_payload
 279 * on success @errp contains message describing nbd error reply
 280 */
 281static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
 282                                   uint8_t *payload, int *request_ret,
 283                                   Error **errp)
 284{
 285    uint32_t error;
 286    uint16_t message_size;
 287
 288    assert(chunk->type & (1 << 15));
 289
 290    if (chunk->length < sizeof(error) + sizeof(message_size)) {
 291        error_setg(errp,
 292                   "Protocol error: invalid payload for structured error");
 293        return -EINVAL;
 294    }
 295
 296    error = nbd_errno_to_system_errno(payload_advance32(&payload));
 297    if (error == 0) {
 298        error_setg(errp, "Protocol error: server sent structured error chunk "
 299                         "with error = 0");
 300        return -EINVAL;
 301    }
 302
 303    *request_ret = -error;
 304    message_size = payload_advance16(&payload);
 305
 306    if (message_size > chunk->length - sizeof(error) - sizeof(message_size)) {
 307        error_setg(errp, "Protocol error: server sent structured error chunk "
 308                         "with incorrect message size");
 309        return -EINVAL;
 310    }
 311
 312    /* TODO: Add a trace point to mention the server complaint */
 313
 314    /* TODO handle ERROR_OFFSET */
 315
 316    return 0;
 317}
 318
 319static int nbd_co_receive_offset_data_payload(NBDClientSession *s,
 320                                              uint64_t orig_offset,
 321                                              QEMUIOVector *qiov, Error **errp)
 322{
 323    QEMUIOVector sub_qiov;
 324    uint64_t offset;
 325    size_t data_size;
 326    int ret;
 327    NBDStructuredReplyChunk *chunk = &s->reply.structured;
 328
 329    assert(nbd_reply_is_structured(&s->reply));
 330
 331    /* The NBD spec requires at least one byte of payload */
 332    if (chunk->length <= sizeof(offset)) {
 333        error_setg(errp, "Protocol error: invalid payload for "
 334                         "NBD_REPLY_TYPE_OFFSET_DATA");
 335        return -EINVAL;
 336    }
 337
 338    if (nbd_read(s->ioc, &offset, sizeof(offset), errp) < 0) {
 339        return -EIO;
 340    }
 341    be64_to_cpus(&offset);
 342
 343    data_size = chunk->length - sizeof(offset);
 344    assert(data_size);
 345    if (offset < orig_offset || data_size > qiov->size ||
 346        offset > orig_offset + qiov->size - data_size) {
 347        error_setg(errp, "Protocol error: server sent chunk exceeding requested"
 348                         " region");
 349        return -EINVAL;
 350    }
 351
 352    qemu_iovec_init(&sub_qiov, qiov->niov);
 353    qemu_iovec_concat(&sub_qiov, qiov, offset - orig_offset, data_size);
 354    ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, errp);
 355    qemu_iovec_destroy(&sub_qiov);
 356
 357    return ret < 0 ? -EIO : 0;
 358}
 359
 360#define NBD_MAX_MALLOC_PAYLOAD 1000
 361/* nbd_co_receive_structured_payload
 362 */
 363static coroutine_fn int nbd_co_receive_structured_payload(
 364        NBDClientSession *s, void **payload, Error **errp)
 365{
 366    int ret;
 367    uint32_t len;
 368
 369    assert(nbd_reply_is_structured(&s->reply));
 370
 371    len = s->reply.structured.length;
 372
 373    if (len == 0) {
 374        return 0;
 375    }
 376
 377    if (payload == NULL) {
 378        error_setg(errp, "Unexpected structured payload");
 379        return -EINVAL;
 380    }
 381
 382    if (len > NBD_MAX_MALLOC_PAYLOAD) {
 383        error_setg(errp, "Payload too large");
 384        return -EINVAL;
 385    }
 386
 387    *payload = g_new(char, len);
 388    ret = nbd_read(s->ioc, *payload, len, errp);
 389    if (ret < 0) {
 390        g_free(*payload);
 391        *payload = NULL;
 392        return ret;
 393    }
 394
 395    return 0;
 396}
 397
 398/* nbd_co_do_receive_one_chunk
 399 * for simple reply:
 400 *   set request_ret to received reply error
 401 *   if qiov is not NULL: read payload to @qiov
 402 * for structured reply chunk:
 403 *   if error chunk: read payload, set @request_ret, do not set @payload
 404 *   else if offset_data chunk: read payload data to @qiov, do not set @payload
 405 *   else: read payload to @payload
 406 *
 407 * If function fails, @errp contains corresponding error message, and the
 408 * connection with the server is suspect.  If it returns 0, then the
 409 * transaction succeeded (although @request_ret may be a negative errno
 410 * corresponding to the server's error reply), and errp is unchanged.
 411 */
 412static coroutine_fn int nbd_co_do_receive_one_chunk(
 413        NBDClientSession *s, uint64_t handle, bool only_structured,
 414        int *request_ret, QEMUIOVector *qiov, void **payload, Error **errp)
 415{
 416    int ret;
 417    int i = HANDLE_TO_INDEX(s, handle);
 418    void *local_payload = NULL;
 419    NBDStructuredReplyChunk *chunk;
 420
 421    if (payload) {
 422        *payload = NULL;
 423    }
 424    *request_ret = 0;
 425
 426    /* Wait until we're woken up by nbd_read_reply_entry.  */
 427    s->requests[i].receiving = true;
 428    qemu_coroutine_yield();
 429    s->requests[i].receiving = false;
 430    if (!s->ioc || s->quit) {
 431        error_setg(errp, "Connection closed");
 432        return -EIO;
 433    }
 434
 435    assert(s->reply.handle == handle);
 436
 437    if (nbd_reply_is_simple(&s->reply)) {
 438        if (only_structured) {
 439            error_setg(errp, "Protocol error: simple reply when structured "
 440                             "reply chunk was expected");
 441            return -EINVAL;
 442        }
 443
 444        *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error);
 445        if (*request_ret < 0 || !qiov) {
 446            return 0;
 447        }
 448
 449        return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
 450                                     errp) < 0 ? -EIO : 0;
 451    }
 452
 453    /* handle structured reply chunk */
 454    assert(s->info.structured_reply);
 455    chunk = &s->reply.structured;
 456
 457    if (chunk->type == NBD_REPLY_TYPE_NONE) {
 458        if (!(chunk->flags & NBD_REPLY_FLAG_DONE)) {
 459            error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
 460                       " NBD_REPLY_FLAG_DONE flag set");
 461            return -EINVAL;
 462        }
 463        if (chunk->length) {
 464            error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
 465                       " nonzero length");
 466            return -EINVAL;
 467        }
 468        return 0;
 469    }
 470
 471    if (chunk->type == NBD_REPLY_TYPE_OFFSET_DATA) {
 472        if (!qiov) {
 473            error_setg(errp, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
 474            return -EINVAL;
 475        }
 476
 477        return nbd_co_receive_offset_data_payload(s, s->requests[i].offset,
 478                                                  qiov, errp);
 479    }
 480
 481    if (nbd_reply_type_is_error(chunk->type)) {
 482        payload = &local_payload;
 483    }
 484
 485    ret = nbd_co_receive_structured_payload(s, payload, errp);
 486    if (ret < 0) {
 487        return ret;
 488    }
 489
 490    if (nbd_reply_type_is_error(chunk->type)) {
 491        ret = nbd_parse_error_payload(chunk, local_payload, request_ret, errp);
 492        g_free(local_payload);
 493        return ret;
 494    }
 495
 496    return 0;
 497}
 498
 499/* nbd_co_receive_one_chunk
 500 * Read reply, wake up read_reply_co and set s->quit if needed.
 501 * Return value is a fatal error code or normal nbd reply error code
 502 */
 503static coroutine_fn int nbd_co_receive_one_chunk(
 504        NBDClientSession *s, uint64_t handle, bool only_structured,
 505        QEMUIOVector *qiov, NBDReply *reply, void **payload, Error **errp)
 506{
 507    int request_ret;
 508    int ret = nbd_co_do_receive_one_chunk(s, handle, only_structured,
 509                                          &request_ret, qiov, payload, errp);
 510
 511    if (ret < 0) {
 512        s->quit = true;
 513    } else {
 514        /* For assert at loop start in nbd_read_reply_entry */
 515        if (reply) {
 516            *reply = s->reply;
 517        }
 518        s->reply.handle = 0;
 519        ret = request_ret;
 520    }
 521
 522    if (s->read_reply_co) {
 523        aio_co_wake(s->read_reply_co);
 524    }
 525
 526    return ret;
 527}
 528
 529typedef struct NBDReplyChunkIter {
 530    int ret;
 531    bool fatal;
 532    Error *err;
 533    bool done, only_structured;
 534} NBDReplyChunkIter;
 535
 536static void nbd_iter_error(NBDReplyChunkIter *iter, bool fatal,
 537                           int ret, Error **local_err)
 538{
 539    assert(ret < 0);
 540
 541    if ((fatal && !iter->fatal) || iter->ret == 0) {
 542        if (iter->ret != 0) {
 543            error_free(iter->err);
 544            iter->err = NULL;
 545        }
 546        iter->fatal = fatal;
 547        iter->ret = ret;
 548        error_propagate(&iter->err, *local_err);
 549    } else {
 550        error_free(*local_err);
 551    }
 552
 553    *local_err = NULL;
 554}
 555
 556/* NBD_FOREACH_REPLY_CHUNK
 557 */
 558#define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \
 559                                qiov, reply, payload) \
 560    for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
 561         nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);)
 562
 563/* nbd_reply_chunk_iter_receive
 564 */
 565static bool nbd_reply_chunk_iter_receive(NBDClientSession *s,
 566                                         NBDReplyChunkIter *iter,
 567                                         uint64_t handle,
 568                                         QEMUIOVector *qiov, NBDReply *reply,
 569                                         void **payload)
 570{
 571    int ret;
 572    NBDReply local_reply;
 573    NBDStructuredReplyChunk *chunk;
 574    Error *local_err = NULL;
 575    if (s->quit) {
 576        error_setg(&local_err, "Connection closed");
 577        nbd_iter_error(iter, true, -EIO, &local_err);
 578        goto break_loop;
 579    }
 580
 581    if (iter->done) {
 582        /* Previous iteration was last. */
 583        goto break_loop;
 584    }
 585
 586    if (reply == NULL) {
 587        reply = &local_reply;
 588    }
 589
 590    ret = nbd_co_receive_one_chunk(s, handle, iter->only_structured,
 591                                   qiov, reply, payload, &local_err);
 592    if (ret < 0) {
 593        /* If it is a fatal error s->quit is set by nbd_co_receive_one_chunk */
 594        nbd_iter_error(iter, s->quit, ret, &local_err);
 595    }
 596
 597    /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
 598    if (nbd_reply_is_simple(&s->reply) || s->quit) {
 599        goto break_loop;
 600    }
 601
 602    chunk = &reply->structured;
 603    iter->only_structured = true;
 604
 605    if (chunk->type == NBD_REPLY_TYPE_NONE) {
 606        /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
 607        assert(chunk->flags & NBD_REPLY_FLAG_DONE);
 608        goto break_loop;
 609    }
 610
 611    if (chunk->flags & NBD_REPLY_FLAG_DONE) {
 612        /* This iteration is last. */
 613        iter->done = true;
 614    }
 615
 616    /* Execute the loop body */
 617    return true;
 618
 619break_loop:
 620    s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
 621
 622    qemu_co_mutex_lock(&s->send_mutex);
 623    s->in_flight--;
 624    qemu_co_queue_next(&s->free_sema);
 625    qemu_co_mutex_unlock(&s->send_mutex);
 626
 627    return false;
 628}
 629
 630static int nbd_co_receive_return_code(NBDClientSession *s, uint64_t handle,
 631                                      Error **errp)
 632{
 633    NBDReplyChunkIter iter;
 634
 635    NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, NULL, NULL) {
 636        /* nbd_reply_chunk_iter_receive does all the work */
 637    }
 638
 639    error_propagate(errp, iter.err);
 640    return iter.ret;
 641}
 642
 643static int nbd_co_receive_cmdread_reply(NBDClientSession *s, uint64_t handle,
 644                                        uint64_t offset, QEMUIOVector *qiov,
 645                                        Error **errp)
 646{
 647    NBDReplyChunkIter iter;
 648    NBDReply reply;
 649    void *payload = NULL;
 650    Error *local_err = NULL;
 651
 652    NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
 653                            qiov, &reply, &payload)
 654    {
 655        int ret;
 656        NBDStructuredReplyChunk *chunk = &reply.structured;
 657
 658        assert(nbd_reply_is_structured(&reply));
 659
 660        switch (chunk->type) {
 661        case NBD_REPLY_TYPE_OFFSET_DATA:
 662            /* special cased in nbd_co_receive_one_chunk, data is already
 663             * in qiov */
 664            break;
 665        case NBD_REPLY_TYPE_OFFSET_HOLE:
 666            ret = nbd_parse_offset_hole_payload(&reply.structured, payload,
 667                                                offset, qiov, &local_err);
 668            if (ret < 0) {
 669                s->quit = true;
 670                nbd_iter_error(&iter, true, ret, &local_err);
 671            }
 672            break;
 673        default:
 674            if (!nbd_reply_type_is_error(chunk->type)) {
 675                /* not allowed reply type */
 676                s->quit = true;
 677                error_setg(&local_err,
 678                           "Unexpected reply type: %d (%s) for CMD_READ",
 679                           chunk->type, nbd_reply_type_lookup(chunk->type));
 680                nbd_iter_error(&iter, true, -EINVAL, &local_err);
 681            }
 682        }
 683
 684        g_free(payload);
 685        payload = NULL;
 686    }
 687
 688    error_propagate(errp, iter.err);
 689    return iter.ret;
 690}
 691
 692static int nbd_co_receive_blockstatus_reply(NBDClientSession *s,
 693                                            uint64_t handle, uint64_t length,
 694                                            NBDExtent *extent, Error **errp)
 695{
 696    NBDReplyChunkIter iter;
 697    NBDReply reply;
 698    void *payload = NULL;
 699    Error *local_err = NULL;
 700    bool received = false;
 701
 702    assert(!extent->length);
 703    NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
 704                            NULL, &reply, &payload)
 705    {
 706        int ret;
 707        NBDStructuredReplyChunk *chunk = &reply.structured;
 708
 709        assert(nbd_reply_is_structured(&reply));
 710
 711        switch (chunk->type) {
 712        case NBD_REPLY_TYPE_BLOCK_STATUS:
 713            if (received) {
 714                s->quit = true;
 715                error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
 716                nbd_iter_error(&iter, true, -EINVAL, &local_err);
 717            }
 718            received = true;
 719
 720            ret = nbd_parse_blockstatus_payload(s, &reply.structured,
 721                                                payload, length, extent,
 722                                                &local_err);
 723            if (ret < 0) {
 724                s->quit = true;
 725                nbd_iter_error(&iter, true, ret, &local_err);
 726            }
 727            break;
 728        default:
 729            if (!nbd_reply_type_is_error(chunk->type)) {
 730                s->quit = true;
 731                error_setg(&local_err,
 732                           "Unexpected reply type: %d (%s) "
 733                           "for CMD_BLOCK_STATUS",
 734                           chunk->type, nbd_reply_type_lookup(chunk->type));
 735                nbd_iter_error(&iter, true, -EINVAL, &local_err);
 736            }
 737        }
 738
 739        g_free(payload);
 740        payload = NULL;
 741    }
 742
 743    if (!extent->length && !iter.err) {
 744        error_setg(&iter.err,
 745                   "Server did not reply with any status extents");
 746        if (!iter.ret) {
 747            iter.ret = -EIO;
 748        }
 749    }
 750    error_propagate(errp, iter.err);
 751    return iter.ret;
 752}
 753
 754static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
 755                          QEMUIOVector *write_qiov)
 756{
 757    int ret;
 758    Error *local_err = NULL;
 759    NBDClientSession *client = nbd_get_client_session(bs);
 760
 761    assert(request->type != NBD_CMD_READ);
 762    if (write_qiov) {
 763        assert(request->type == NBD_CMD_WRITE);
 764        assert(request->len == iov_size(write_qiov->iov, write_qiov->niov));
 765    } else {
 766        assert(request->type != NBD_CMD_WRITE);
 767    }
 768    ret = nbd_co_send_request(bs, request, write_qiov);
 769    if (ret < 0) {
 770        return ret;
 771    }
 772
 773    ret = nbd_co_receive_return_code(client, request->handle, &local_err);
 774    if (local_err) {
 775        error_report_err(local_err);
 776    }
 777    return ret;
 778}
 779
 780int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
 781                         uint64_t bytes, QEMUIOVector *qiov, int flags)
 782{
 783    int ret;
 784    Error *local_err = NULL;
 785    NBDClientSession *client = nbd_get_client_session(bs);
 786    NBDRequest request = {
 787        .type = NBD_CMD_READ,
 788        .from = offset,
 789        .len = bytes,
 790    };
 791
 792    assert(bytes <= NBD_MAX_BUFFER_SIZE);
 793    assert(!flags);
 794
 795    if (!bytes) {
 796        return 0;
 797    }
 798    ret = nbd_co_send_request(bs, &request, NULL);
 799    if (ret < 0) {
 800        return ret;
 801    }
 802
 803    ret = nbd_co_receive_cmdread_reply(client, request.handle, offset, qiov,
 804                                       &local_err);
 805    if (local_err) {
 806        error_report_err(local_err);
 807    }
 808    return ret;
 809}
 810
 811int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
 812                          uint64_t bytes, QEMUIOVector *qiov, int flags)
 813{
 814    NBDClientSession *client = nbd_get_client_session(bs);
 815    NBDRequest request = {
 816        .type = NBD_CMD_WRITE,
 817        .from = offset,
 818        .len = bytes,
 819    };
 820
 821    assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
 822    if (flags & BDRV_REQ_FUA) {
 823        assert(client->info.flags & NBD_FLAG_SEND_FUA);
 824        request.flags |= NBD_CMD_FLAG_FUA;
 825    }
 826
 827    assert(bytes <= NBD_MAX_BUFFER_SIZE);
 828
 829    if (!bytes) {
 830        return 0;
 831    }
 832    return nbd_co_request(bs, &request, qiov);
 833}
 834
 835int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
 836                                int bytes, BdrvRequestFlags flags)
 837{
 838    NBDClientSession *client = nbd_get_client_session(bs);
 839    NBDRequest request = {
 840        .type = NBD_CMD_WRITE_ZEROES,
 841        .from = offset,
 842        .len = bytes,
 843    };
 844
 845    assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
 846    if (!(client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
 847        return -ENOTSUP;
 848    }
 849
 850    if (flags & BDRV_REQ_FUA) {
 851        assert(client->info.flags & NBD_FLAG_SEND_FUA);
 852        request.flags |= NBD_CMD_FLAG_FUA;
 853    }
 854    if (!(flags & BDRV_REQ_MAY_UNMAP)) {
 855        request.flags |= NBD_CMD_FLAG_NO_HOLE;
 856    }
 857
 858    if (!bytes) {
 859        return 0;
 860    }
 861    return nbd_co_request(bs, &request, NULL);
 862}
 863
 864int nbd_client_co_flush(BlockDriverState *bs)
 865{
 866    NBDClientSession *client = nbd_get_client_session(bs);
 867    NBDRequest request = { .type = NBD_CMD_FLUSH };
 868
 869    if (!(client->info.flags & NBD_FLAG_SEND_FLUSH)) {
 870        return 0;
 871    }
 872
 873    request.from = 0;
 874    request.len = 0;
 875
 876    return nbd_co_request(bs, &request, NULL);
 877}
 878
 879int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int bytes)
 880{
 881    NBDClientSession *client = nbd_get_client_session(bs);
 882    NBDRequest request = {
 883        .type = NBD_CMD_TRIM,
 884        .from = offset,
 885        .len = bytes,
 886    };
 887
 888    assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
 889    if (!(client->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
 890        return 0;
 891    }
 892
 893    return nbd_co_request(bs, &request, NULL);
 894}
 895
 896int coroutine_fn nbd_client_co_block_status(BlockDriverState *bs,
 897                                            bool want_zero,
 898                                            int64_t offset, int64_t bytes,
 899                                            int64_t *pnum, int64_t *map,
 900                                            BlockDriverState **file)
 901{
 902    int64_t ret;
 903    NBDExtent extent = { 0 };
 904    NBDClientSession *client = nbd_get_client_session(bs);
 905    Error *local_err = NULL;
 906
 907    NBDRequest request = {
 908        .type = NBD_CMD_BLOCK_STATUS,
 909        .from = offset,
 910        .len = MIN(MIN_NON_ZERO(QEMU_ALIGN_DOWN(INT_MAX,
 911                                                bs->bl.request_alignment),
 912                                client->info.max_block), bytes),
 913        .flags = NBD_CMD_FLAG_REQ_ONE,
 914    };
 915
 916    if (!client->info.base_allocation) {
 917        *pnum = bytes;
 918        return BDRV_BLOCK_DATA;
 919    }
 920
 921    ret = nbd_co_send_request(bs, &request, NULL);
 922    if (ret < 0) {
 923        return ret;
 924    }
 925
 926    ret = nbd_co_receive_blockstatus_reply(client, request.handle, bytes,
 927                                           &extent, &local_err);
 928    if (local_err) {
 929        error_report_err(local_err);
 930    }
 931    if (ret < 0) {
 932        return ret;
 933    }
 934
 935    assert(extent.length);
 936    *pnum = extent.length;
 937    return (extent.flags & NBD_STATE_HOLE ? 0 : BDRV_BLOCK_DATA) |
 938           (extent.flags & NBD_STATE_ZERO ? BDRV_BLOCK_ZERO : 0);
 939}
 940
 941void nbd_client_detach_aio_context(BlockDriverState *bs)
 942{
 943    NBDClientSession *client = nbd_get_client_session(bs);
 944    qio_channel_detach_aio_context(QIO_CHANNEL(client->ioc));
 945}
 946
 947void nbd_client_attach_aio_context(BlockDriverState *bs,
 948                                   AioContext *new_context)
 949{
 950    NBDClientSession *client = nbd_get_client_session(bs);
 951    qio_channel_attach_aio_context(QIO_CHANNEL(client->ioc), new_context);
 952    aio_co_schedule(new_context, client->read_reply_co);
 953}
 954
 955void nbd_client_close(BlockDriverState *bs)
 956{
 957    NBDClientSession *client = nbd_get_client_session(bs);
 958    NBDRequest request = { .type = NBD_CMD_DISC };
 959
 960    if (client->ioc == NULL) {
 961        return;
 962    }
 963
 964    nbd_send_request(client->ioc, &request);
 965
 966    nbd_teardown_connection(bs);
 967}
 968
 969int nbd_client_init(BlockDriverState *bs,
 970                    QIOChannelSocket *sioc,
 971                    const char *export,
 972                    QCryptoTLSCreds *tlscreds,
 973                    const char *hostname,
 974                    const char *x_dirty_bitmap,
 975                    Error **errp)
 976{
 977    NBDClientSession *client = nbd_get_client_session(bs);
 978    int ret;
 979
 980    /* NBD handshake */
 981    logout("session init %s\n", export);
 982    qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
 983
 984    client->info.request_sizes = true;
 985    client->info.structured_reply = true;
 986    client->info.base_allocation = true;
 987    client->info.x_dirty_bitmap = g_strdup(x_dirty_bitmap);
 988    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), export,
 989                                tlscreds, hostname,
 990                                &client->ioc, &client->info, errp);
 991    g_free(client->info.x_dirty_bitmap);
 992    if (ret < 0) {
 993        logout("Failed to negotiate with the NBD server\n");
 994        return ret;
 995    }
 996    if (x_dirty_bitmap && !client->info.base_allocation) {
 997        error_setg(errp, "requested x-dirty-bitmap %s not found",
 998                   x_dirty_bitmap);
 999        ret = -EINVAL;
1000        goto fail;
1001    }
1002    if (client->info.flags & NBD_FLAG_READ_ONLY) {
1003        ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
1004        if (ret < 0) {
1005            goto fail;
1006        }
1007    }
1008    if (client->info.flags & NBD_FLAG_SEND_FUA) {
1009        bs->supported_write_flags = BDRV_REQ_FUA;
1010        bs->supported_zero_flags |= BDRV_REQ_FUA;
1011    }
1012    if (client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
1013        bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
1014    }
1015
1016    qemu_co_mutex_init(&client->send_mutex);
1017    qemu_co_queue_init(&client->free_sema);
1018    client->sioc = sioc;
1019    object_ref(OBJECT(client->sioc));
1020
1021    if (!client->ioc) {
1022        client->ioc = QIO_CHANNEL(sioc);
1023        object_ref(OBJECT(client->ioc));
1024    }
1025
1026    /* Now that we're connected, set the socket to be non-blocking and
1027     * kick the reply mechanism.  */
1028    qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
1029    client->read_reply_co = qemu_coroutine_create(nbd_read_reply_entry, client);
1030    nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
1031
1032    logout("Established connection with NBD server\n");
1033    return 0;
1034
1035 fail:
1036    /*
1037     * We have connected, but must fail for other reasons. The
1038     * connection is still blocking; send NBD_CMD_DISC as a courtesy
1039     * to the server.
1040     */
1041    {
1042        NBDRequest request = { .type = NBD_CMD_DISC };
1043
1044        nbd_send_request(client->ioc ?: QIO_CHANNEL(sioc), &request);
1045        return ret;
1046    }
1047}
1048