qemu/block/nbd-client.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for  NBD
   3 *
   4 * Copyright (C) 2016 Red Hat, Inc.
   5 * Copyright (C) 2008 Bull S.A.S.
   6 *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
   7 *
   8 * Some parts:
   9 *    Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
  10 *
  11 * Permission is hereby granted, free of charge, to any person obtaining a copy
  12 * of this software and associated documentation files (the "Software"), to deal
  13 * in the Software without restriction, including without limitation the rights
  14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  15 * copies of the Software, and to permit persons to whom the Software is
  16 * furnished to do so, subject to the following conditions:
  17 *
  18 * The above copyright notice and this permission notice shall be included in
  19 * all copies or substantial portions of the Software.
  20 *
  21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  24 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  27 * THE SOFTWARE.
  28 */
  29
  30#include "qemu/osdep.h"
  31#include "qapi/error.h"
  32#include "nbd-client.h"
  33
  34#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ (uint64_t)(intptr_t)(bs))
  35#define INDEX_TO_HANDLE(bs, index)  ((index)  ^ (uint64_t)(intptr_t)(bs))
  36
  37static void nbd_recv_coroutines_wake_all(NBDClientSession *s)
  38{
  39    int i;
  40
  41    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
  42        NBDClientRequest *req = &s->requests[i];
  43
  44        if (req->coroutine && req->receiving) {
  45            aio_co_wake(req->coroutine);
  46        }
  47    }
  48}
  49
  50static void nbd_teardown_connection(BlockDriverState *bs)
  51{
  52    NBDClientSession *client = nbd_get_client_session(bs);
  53
  54    if (!client->ioc) { /* Already closed */
  55        return;
  56    }
  57
  58    /* finish any pending coroutines */
  59    qio_channel_shutdown(client->ioc,
  60                         QIO_CHANNEL_SHUTDOWN_BOTH,
  61                         NULL);
  62    BDRV_POLL_WHILE(bs, client->read_reply_co);
  63
  64    nbd_client_detach_aio_context(bs);
  65    object_unref(OBJECT(client->sioc));
  66    client->sioc = NULL;
  67    object_unref(OBJECT(client->ioc));
  68    client->ioc = NULL;
  69}
  70
  71static coroutine_fn void nbd_read_reply_entry(void *opaque)
  72{
  73    NBDClientSession *s = opaque;
  74    uint64_t i;
  75    int ret = 0;
  76    Error *local_err = NULL;
  77
  78    while (!s->quit) {
  79        assert(s->reply.handle == 0);
  80        ret = nbd_receive_reply(s->ioc, &s->reply, &local_err);
  81        if (local_err) {
  82            error_report_err(local_err);
  83        }
  84        if (ret <= 0) {
  85            break;
  86        }
  87
  88        /* There's no need for a mutex on the receive side, because the
  89         * handler acts as a synchronization point and ensures that only
  90         * one coroutine is called until the reply finishes.
  91         */
  92        i = HANDLE_TO_INDEX(s, s->reply.handle);
  93        if (i >= MAX_NBD_REQUESTS ||
  94            !s->requests[i].coroutine ||
  95            !s->requests[i].receiving ||
  96            (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
  97        {
  98            break;
  99        }
 100
 101        /* We're woken up again by the request itself.  Note that there
 102         * is no race between yielding and reentering read_reply_co.  This
 103         * is because:
 104         *
 105         * - if the request runs on the same AioContext, it is only
 106         *   entered after we yield
 107         *
 108         * - if the request runs on a different AioContext, reentering
 109         *   read_reply_co happens through a bottom half, which can only
 110         *   run after we yield.
 111         */
 112        aio_co_wake(s->requests[i].coroutine);
 113        qemu_coroutine_yield();
 114    }
 115
 116    s->quit = true;
 117    nbd_recv_coroutines_wake_all(s);
 118    s->read_reply_co = NULL;
 119}
 120
 121static int nbd_co_send_request(BlockDriverState *bs,
 122                               NBDRequest *request,
 123                               QEMUIOVector *qiov)
 124{
 125    NBDClientSession *s = nbd_get_client_session(bs);
 126    int rc, i;
 127
 128    qemu_co_mutex_lock(&s->send_mutex);
 129    while (s->in_flight == MAX_NBD_REQUESTS) {
 130        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
 131    }
 132    s->in_flight++;
 133
 134    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
 135        if (s->requests[i].coroutine == NULL) {
 136            break;
 137        }
 138    }
 139
 140    g_assert(qemu_in_coroutine());
 141    assert(i < MAX_NBD_REQUESTS);
 142
 143    s->requests[i].coroutine = qemu_coroutine_self();
 144    s->requests[i].offset = request->from;
 145    s->requests[i].receiving = false;
 146
 147    request->handle = INDEX_TO_HANDLE(s, i);
 148
 149    if (s->quit) {
 150        rc = -EIO;
 151        goto err;
 152    }
 153    if (!s->ioc) {
 154        rc = -EPIPE;
 155        goto err;
 156    }
 157
 158    if (qiov) {
 159        qio_channel_set_cork(s->ioc, true);
 160        rc = nbd_send_request(s->ioc, request);
 161        if (rc >= 0 && !s->quit) {
 162            if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
 163                                       NULL) < 0) {
 164                rc = -EIO;
 165            }
 166        } else if (rc >= 0) {
 167            rc = -EIO;
 168        }
 169        qio_channel_set_cork(s->ioc, false);
 170    } else {
 171        rc = nbd_send_request(s->ioc, request);
 172    }
 173
 174err:
 175    if (rc < 0) {
 176        s->quit = true;
 177        s->requests[i].coroutine = NULL;
 178        s->in_flight--;
 179        qemu_co_queue_next(&s->free_sema);
 180    }
 181    qemu_co_mutex_unlock(&s->send_mutex);
 182    return rc;
 183}
 184
 185static inline uint16_t payload_advance16(uint8_t **payload)
 186{
 187    *payload += 2;
 188    return lduw_be_p(*payload - 2);
 189}
 190
 191static inline uint32_t payload_advance32(uint8_t **payload)
 192{
 193    *payload += 4;
 194    return ldl_be_p(*payload - 4);
 195}
 196
 197static inline uint64_t payload_advance64(uint8_t **payload)
 198{
 199    *payload += 8;
 200    return ldq_be_p(*payload - 8);
 201}
 202
 203static int nbd_parse_offset_hole_payload(NBDStructuredReplyChunk *chunk,
 204                                         uint8_t *payload, uint64_t orig_offset,
 205                                         QEMUIOVector *qiov, Error **errp)
 206{
 207    uint64_t offset;
 208    uint32_t hole_size;
 209
 210    if (chunk->length != sizeof(offset) + sizeof(hole_size)) {
 211        error_setg(errp, "Protocol error: invalid payload for "
 212                         "NBD_REPLY_TYPE_OFFSET_HOLE");
 213        return -EINVAL;
 214    }
 215
 216    offset = payload_advance64(&payload);
 217    hole_size = payload_advance32(&payload);
 218
 219    if (!hole_size || offset < orig_offset || hole_size > qiov->size ||
 220        offset > orig_offset + qiov->size - hole_size) {
 221        error_setg(errp, "Protocol error: server sent chunk exceeding requested"
 222                         " region");
 223        return -EINVAL;
 224    }
 225
 226    qemu_iovec_memset(qiov, offset - orig_offset, 0, hole_size);
 227
 228    return 0;
 229}
 230
 231/* nbd_parse_blockstatus_payload
 232 * support only one extent in reply and only for
 233 * base:allocation context
 234 */
 235static int nbd_parse_blockstatus_payload(NBDClientSession *client,
 236                                         NBDStructuredReplyChunk *chunk,
 237                                         uint8_t *payload, uint64_t orig_length,
 238                                         NBDExtent *extent, Error **errp)
 239{
 240    uint32_t context_id;
 241
 242    if (chunk->length != sizeof(context_id) + sizeof(*extent)) {
 243        error_setg(errp, "Protocol error: invalid payload for "
 244                         "NBD_REPLY_TYPE_BLOCK_STATUS");
 245        return -EINVAL;
 246    }
 247
 248    context_id = payload_advance32(&payload);
 249    if (client->info.meta_base_allocation_id != context_id) {
 250        error_setg(errp, "Protocol error: unexpected context id %d for "
 251                         "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
 252                         "id is %d", context_id,
 253                         client->info.meta_base_allocation_id);
 254        return -EINVAL;
 255    }
 256
 257    extent->length = payload_advance32(&payload);
 258    extent->flags = payload_advance32(&payload);
 259
 260    if (extent->length == 0 ||
 261        (client->info.min_block && !QEMU_IS_ALIGNED(extent->length,
 262                                                    client->info.min_block))) {
 263        error_setg(errp, "Protocol error: server sent status chunk with "
 264                   "invalid length");
 265        return -EINVAL;
 266    }
 267
 268    /* The server is allowed to send us extra information on the final
 269     * extent; just clamp it to the length we requested. */
 270    if (extent->length > orig_length) {
 271        extent->length = orig_length;
 272    }
 273
 274    return 0;
 275}
 276
 277/* nbd_parse_error_payload
 278 * on success @errp contains message describing nbd error reply
 279 */
 280static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
 281                                   uint8_t *payload, int *request_ret,
 282                                   Error **errp)
 283{
 284    uint32_t error;
 285    uint16_t message_size;
 286
 287    assert(chunk->type & (1 << 15));
 288
 289    if (chunk->length < sizeof(error) + sizeof(message_size)) {
 290        error_setg(errp,
 291                   "Protocol error: invalid payload for structured error");
 292        return -EINVAL;
 293    }
 294
 295    error = nbd_errno_to_system_errno(payload_advance32(&payload));
 296    if (error == 0) {
 297        error_setg(errp, "Protocol error: server sent structured error chunk "
 298                         "with error = 0");
 299        return -EINVAL;
 300    }
 301
 302    *request_ret = -error;
 303    message_size = payload_advance16(&payload);
 304
 305    if (message_size > chunk->length - sizeof(error) - sizeof(message_size)) {
 306        error_setg(errp, "Protocol error: server sent structured error chunk "
 307                         "with incorrect message size");
 308        return -EINVAL;
 309    }
 310
 311    /* TODO: Add a trace point to mention the server complaint */
 312
 313    /* TODO handle ERROR_OFFSET */
 314
 315    return 0;
 316}
 317
 318static int nbd_co_receive_offset_data_payload(NBDClientSession *s,
 319                                              uint64_t orig_offset,
 320                                              QEMUIOVector *qiov, Error **errp)
 321{
 322    QEMUIOVector sub_qiov;
 323    uint64_t offset;
 324    size_t data_size;
 325    int ret;
 326    NBDStructuredReplyChunk *chunk = &s->reply.structured;
 327
 328    assert(nbd_reply_is_structured(&s->reply));
 329
 330    /* The NBD spec requires at least one byte of payload */
 331    if (chunk->length <= sizeof(offset)) {
 332        error_setg(errp, "Protocol error: invalid payload for "
 333                         "NBD_REPLY_TYPE_OFFSET_DATA");
 334        return -EINVAL;
 335    }
 336
 337    if (nbd_read(s->ioc, &offset, sizeof(offset), errp) < 0) {
 338        return -EIO;
 339    }
 340    be64_to_cpus(&offset);
 341
 342    data_size = chunk->length - sizeof(offset);
 343    assert(data_size);
 344    if (offset < orig_offset || data_size > qiov->size ||
 345        offset > orig_offset + qiov->size - data_size) {
 346        error_setg(errp, "Protocol error: server sent chunk exceeding requested"
 347                         " region");
 348        return -EINVAL;
 349    }
 350
 351    qemu_iovec_init(&sub_qiov, qiov->niov);
 352    qemu_iovec_concat(&sub_qiov, qiov, offset - orig_offset, data_size);
 353    ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, errp);
 354    qemu_iovec_destroy(&sub_qiov);
 355
 356    return ret < 0 ? -EIO : 0;
 357}
 358
 359#define NBD_MAX_MALLOC_PAYLOAD 1000
 360/* nbd_co_receive_structured_payload
 361 */
 362static coroutine_fn int nbd_co_receive_structured_payload(
 363        NBDClientSession *s, void **payload, Error **errp)
 364{
 365    int ret;
 366    uint32_t len;
 367
 368    assert(nbd_reply_is_structured(&s->reply));
 369
 370    len = s->reply.structured.length;
 371
 372    if (len == 0) {
 373        return 0;
 374    }
 375
 376    if (payload == NULL) {
 377        error_setg(errp, "Unexpected structured payload");
 378        return -EINVAL;
 379    }
 380
 381    if (len > NBD_MAX_MALLOC_PAYLOAD) {
 382        error_setg(errp, "Payload too large");
 383        return -EINVAL;
 384    }
 385
 386    *payload = g_new(char, len);
 387    ret = nbd_read(s->ioc, *payload, len, errp);
 388    if (ret < 0) {
 389        g_free(*payload);
 390        *payload = NULL;
 391        return ret;
 392    }
 393
 394    return 0;
 395}
 396
 397/* nbd_co_do_receive_one_chunk
 398 * for simple reply:
 399 *   set request_ret to received reply error
 400 *   if qiov is not NULL: read payload to @qiov
 401 * for structured reply chunk:
 402 *   if error chunk: read payload, set @request_ret, do not set @payload
 403 *   else if offset_data chunk: read payload data to @qiov, do not set @payload
 404 *   else: read payload to @payload
 405 *
 406 * If function fails, @errp contains corresponding error message, and the
 407 * connection with the server is suspect.  If it returns 0, then the
 408 * transaction succeeded (although @request_ret may be a negative errno
 409 * corresponding to the server's error reply), and errp is unchanged.
 410 */
 411static coroutine_fn int nbd_co_do_receive_one_chunk(
 412        NBDClientSession *s, uint64_t handle, bool only_structured,
 413        int *request_ret, QEMUIOVector *qiov, void **payload, Error **errp)
 414{
 415    int ret;
 416    int i = HANDLE_TO_INDEX(s, handle);
 417    void *local_payload = NULL;
 418    NBDStructuredReplyChunk *chunk;
 419
 420    if (payload) {
 421        *payload = NULL;
 422    }
 423    *request_ret = 0;
 424
 425    /* Wait until we're woken up by nbd_read_reply_entry.  */
 426    s->requests[i].receiving = true;
 427    qemu_coroutine_yield();
 428    s->requests[i].receiving = false;
 429    if (!s->ioc || s->quit) {
 430        error_setg(errp, "Connection closed");
 431        return -EIO;
 432    }
 433
 434    assert(s->reply.handle == handle);
 435
 436    if (nbd_reply_is_simple(&s->reply)) {
 437        if (only_structured) {
 438            error_setg(errp, "Protocol error: simple reply when structured "
 439                             "reply chunk was expected");
 440            return -EINVAL;
 441        }
 442
 443        *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error);
 444        if (*request_ret < 0 || !qiov) {
 445            return 0;
 446        }
 447
 448        return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
 449                                     errp) < 0 ? -EIO : 0;
 450    }
 451
 452    /* handle structured reply chunk */
 453    assert(s->info.structured_reply);
 454    chunk = &s->reply.structured;
 455
 456    if (chunk->type == NBD_REPLY_TYPE_NONE) {
 457        if (!(chunk->flags & NBD_REPLY_FLAG_DONE)) {
 458            error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
 459                       " NBD_REPLY_FLAG_DONE flag set");
 460            return -EINVAL;
 461        }
 462        if (chunk->length) {
 463            error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
 464                       " nonzero length");
 465            return -EINVAL;
 466        }
 467        return 0;
 468    }
 469
 470    if (chunk->type == NBD_REPLY_TYPE_OFFSET_DATA) {
 471        if (!qiov) {
 472            error_setg(errp, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
 473            return -EINVAL;
 474        }
 475
 476        return nbd_co_receive_offset_data_payload(s, s->requests[i].offset,
 477                                                  qiov, errp);
 478    }
 479
 480    if (nbd_reply_type_is_error(chunk->type)) {
 481        payload = &local_payload;
 482    }
 483
 484    ret = nbd_co_receive_structured_payload(s, payload, errp);
 485    if (ret < 0) {
 486        return ret;
 487    }
 488
 489    if (nbd_reply_type_is_error(chunk->type)) {
 490        ret = nbd_parse_error_payload(chunk, local_payload, request_ret, errp);
 491        g_free(local_payload);
 492        return ret;
 493    }
 494
 495    return 0;
 496}
 497
 498/* nbd_co_receive_one_chunk
 499 * Read reply, wake up read_reply_co and set s->quit if needed.
 500 * Return value is a fatal error code or normal nbd reply error code
 501 */
 502static coroutine_fn int nbd_co_receive_one_chunk(
 503        NBDClientSession *s, uint64_t handle, bool only_structured,
 504        QEMUIOVector *qiov, NBDReply *reply, void **payload, Error **errp)
 505{
 506    int request_ret;
 507    int ret = nbd_co_do_receive_one_chunk(s, handle, only_structured,
 508                                          &request_ret, qiov, payload, errp);
 509
 510    if (ret < 0) {
 511        s->quit = true;
 512    } else {
 513        /* For assert at loop start in nbd_read_reply_entry */
 514        if (reply) {
 515            *reply = s->reply;
 516        }
 517        s->reply.handle = 0;
 518        ret = request_ret;
 519    }
 520
 521    if (s->read_reply_co) {
 522        aio_co_wake(s->read_reply_co);
 523    }
 524
 525    return ret;
 526}
 527
 528typedef struct NBDReplyChunkIter {
 529    int ret;
 530    bool fatal;
 531    Error *err;
 532    bool done, only_structured;
 533} NBDReplyChunkIter;
 534
 535static void nbd_iter_error(NBDReplyChunkIter *iter, bool fatal,
 536                           int ret, Error **local_err)
 537{
 538    assert(ret < 0);
 539
 540    if ((fatal && !iter->fatal) || iter->ret == 0) {
 541        if (iter->ret != 0) {
 542            error_free(iter->err);
 543            iter->err = NULL;
 544        }
 545        iter->fatal = fatal;
 546        iter->ret = ret;
 547        error_propagate(&iter->err, *local_err);
 548    } else {
 549        error_free(*local_err);
 550    }
 551
 552    *local_err = NULL;
 553}
 554
 555/* NBD_FOREACH_REPLY_CHUNK
 556 */
 557#define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \
 558                                qiov, reply, payload) \
 559    for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
 560         nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);)
 561
 562/* nbd_reply_chunk_iter_receive
 563 */
 564static bool nbd_reply_chunk_iter_receive(NBDClientSession *s,
 565                                         NBDReplyChunkIter *iter,
 566                                         uint64_t handle,
 567                                         QEMUIOVector *qiov, NBDReply *reply,
 568                                         void **payload)
 569{
 570    int ret;
 571    NBDReply local_reply;
 572    NBDStructuredReplyChunk *chunk;
 573    Error *local_err = NULL;
 574    if (s->quit) {
 575        error_setg(&local_err, "Connection closed");
 576        nbd_iter_error(iter, true, -EIO, &local_err);
 577        goto break_loop;
 578    }
 579
 580    if (iter->done) {
 581        /* Previous iteration was last. */
 582        goto break_loop;
 583    }
 584
 585    if (reply == NULL) {
 586        reply = &local_reply;
 587    }
 588
 589    ret = nbd_co_receive_one_chunk(s, handle, iter->only_structured,
 590                                   qiov, reply, payload, &local_err);
 591    if (ret < 0) {
 592        /* If it is a fatal error s->quit is set by nbd_co_receive_one_chunk */
 593        nbd_iter_error(iter, s->quit, ret, &local_err);
 594    }
 595
 596    /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
 597    if (nbd_reply_is_simple(&s->reply) || s->quit) {
 598        goto break_loop;
 599    }
 600
 601    chunk = &reply->structured;
 602    iter->only_structured = true;
 603
 604    if (chunk->type == NBD_REPLY_TYPE_NONE) {
 605        /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
 606        assert(chunk->flags & NBD_REPLY_FLAG_DONE);
 607        goto break_loop;
 608    }
 609
 610    if (chunk->flags & NBD_REPLY_FLAG_DONE) {
 611        /* This iteration is last. */
 612        iter->done = true;
 613    }
 614
 615    /* Execute the loop body */
 616    return true;
 617
 618break_loop:
 619    s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
 620
 621    qemu_co_mutex_lock(&s->send_mutex);
 622    s->in_flight--;
 623    qemu_co_queue_next(&s->free_sema);
 624    qemu_co_mutex_unlock(&s->send_mutex);
 625
 626    return false;
 627}
 628
 629static int nbd_co_receive_return_code(NBDClientSession *s, uint64_t handle,
 630                                      Error **errp)
 631{
 632    NBDReplyChunkIter iter;
 633
 634    NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, NULL, NULL) {
 635        /* nbd_reply_chunk_iter_receive does all the work */
 636    }
 637
 638    error_propagate(errp, iter.err);
 639    return iter.ret;
 640}
 641
 642static int nbd_co_receive_cmdread_reply(NBDClientSession *s, uint64_t handle,
 643                                        uint64_t offset, QEMUIOVector *qiov,
 644                                        Error **errp)
 645{
 646    NBDReplyChunkIter iter;
 647    NBDReply reply;
 648    void *payload = NULL;
 649    Error *local_err = NULL;
 650
 651    NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
 652                            qiov, &reply, &payload)
 653    {
 654        int ret;
 655        NBDStructuredReplyChunk *chunk = &reply.structured;
 656
 657        assert(nbd_reply_is_structured(&reply));
 658
 659        switch (chunk->type) {
 660        case NBD_REPLY_TYPE_OFFSET_DATA:
 661            /* special cased in nbd_co_receive_one_chunk, data is already
 662             * in qiov */
 663            break;
 664        case NBD_REPLY_TYPE_OFFSET_HOLE:
 665            ret = nbd_parse_offset_hole_payload(&reply.structured, payload,
 666                                                offset, qiov, &local_err);
 667            if (ret < 0) {
 668                s->quit = true;
 669                nbd_iter_error(&iter, true, ret, &local_err);
 670            }
 671            break;
 672        default:
 673            if (!nbd_reply_type_is_error(chunk->type)) {
 674                /* not allowed reply type */
 675                s->quit = true;
 676                error_setg(&local_err,
 677                           "Unexpected reply type: %d (%s) for CMD_READ",
 678                           chunk->type, nbd_reply_type_lookup(chunk->type));
 679                nbd_iter_error(&iter, true, -EINVAL, &local_err);
 680            }
 681        }
 682
 683        g_free(payload);
 684        payload = NULL;
 685    }
 686
 687    error_propagate(errp, iter.err);
 688    return iter.ret;
 689}
 690
 691static int nbd_co_receive_blockstatus_reply(NBDClientSession *s,
 692                                            uint64_t handle, uint64_t length,
 693                                            NBDExtent *extent, Error **errp)
 694{
 695    NBDReplyChunkIter iter;
 696    NBDReply reply;
 697    void *payload = NULL;
 698    Error *local_err = NULL;
 699    bool received = false;
 700
 701    assert(!extent->length);
 702    NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
 703                            NULL, &reply, &payload)
 704    {
 705        int ret;
 706        NBDStructuredReplyChunk *chunk = &reply.structured;
 707
 708        assert(nbd_reply_is_structured(&reply));
 709
 710        switch (chunk->type) {
 711        case NBD_REPLY_TYPE_BLOCK_STATUS:
 712            if (received) {
 713                s->quit = true;
 714                error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
 715                nbd_iter_error(&iter, true, -EINVAL, &local_err);
 716            }
 717            received = true;
 718
 719            ret = nbd_parse_blockstatus_payload(s, &reply.structured,
 720                                                payload, length, extent,
 721                                                &local_err);
 722            if (ret < 0) {
 723                s->quit = true;
 724                nbd_iter_error(&iter, true, ret, &local_err);
 725            }
 726            break;
 727        default:
 728            if (!nbd_reply_type_is_error(chunk->type)) {
 729                s->quit = true;
 730                error_setg(&local_err,
 731                           "Unexpected reply type: %d (%s) "
 732                           "for CMD_BLOCK_STATUS",
 733                           chunk->type, nbd_reply_type_lookup(chunk->type));
 734                nbd_iter_error(&iter, true, -EINVAL, &local_err);
 735            }
 736        }
 737
 738        g_free(payload);
 739        payload = NULL;
 740    }
 741
 742    if (!extent->length && !iter.err) {
 743        error_setg(&iter.err,
 744                   "Server did not reply with any status extents");
 745        if (!iter.ret) {
 746            iter.ret = -EIO;
 747        }
 748    }
 749    error_propagate(errp, iter.err);
 750    return iter.ret;
 751}
 752
 753static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
 754                          QEMUIOVector *write_qiov)
 755{
 756    int ret;
 757    Error *local_err = NULL;
 758    NBDClientSession *client = nbd_get_client_session(bs);
 759
 760    assert(request->type != NBD_CMD_READ);
 761    if (write_qiov) {
 762        assert(request->type == NBD_CMD_WRITE);
 763        assert(request->len == iov_size(write_qiov->iov, write_qiov->niov));
 764    } else {
 765        assert(request->type != NBD_CMD_WRITE);
 766    }
 767    ret = nbd_co_send_request(bs, request, write_qiov);
 768    if (ret < 0) {
 769        return ret;
 770    }
 771
 772    ret = nbd_co_receive_return_code(client, request->handle, &local_err);
 773    if (local_err) {
 774        error_report_err(local_err);
 775    }
 776    return ret;
 777}
 778
 779int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
 780                         uint64_t bytes, QEMUIOVector *qiov, int flags)
 781{
 782    int ret;
 783    Error *local_err = NULL;
 784    NBDClientSession *client = nbd_get_client_session(bs);
 785    NBDRequest request = {
 786        .type = NBD_CMD_READ,
 787        .from = offset,
 788        .len = bytes,
 789    };
 790
 791    assert(bytes <= NBD_MAX_BUFFER_SIZE);
 792    assert(!flags);
 793
 794    if (!bytes) {
 795        return 0;
 796    }
 797    ret = nbd_co_send_request(bs, &request, NULL);
 798    if (ret < 0) {
 799        return ret;
 800    }
 801
 802    ret = nbd_co_receive_cmdread_reply(client, request.handle, offset, qiov,
 803                                       &local_err);
 804    if (local_err) {
 805        error_report_err(local_err);
 806    }
 807    return ret;
 808}
 809
 810int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
 811                          uint64_t bytes, QEMUIOVector *qiov, int flags)
 812{
 813    NBDClientSession *client = nbd_get_client_session(bs);
 814    NBDRequest request = {
 815        .type = NBD_CMD_WRITE,
 816        .from = offset,
 817        .len = bytes,
 818    };
 819
 820    assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
 821    if (flags & BDRV_REQ_FUA) {
 822        assert(client->info.flags & NBD_FLAG_SEND_FUA);
 823        request.flags |= NBD_CMD_FLAG_FUA;
 824    }
 825
 826    assert(bytes <= NBD_MAX_BUFFER_SIZE);
 827
 828    if (!bytes) {
 829        return 0;
 830    }
 831    return nbd_co_request(bs, &request, qiov);
 832}
 833
 834int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
 835                                int bytes, BdrvRequestFlags flags)
 836{
 837    NBDClientSession *client = nbd_get_client_session(bs);
 838    NBDRequest request = {
 839        .type = NBD_CMD_WRITE_ZEROES,
 840        .from = offset,
 841        .len = bytes,
 842    };
 843
 844    assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
 845    if (!(client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
 846        return -ENOTSUP;
 847    }
 848
 849    if (flags & BDRV_REQ_FUA) {
 850        assert(client->info.flags & NBD_FLAG_SEND_FUA);
 851        request.flags |= NBD_CMD_FLAG_FUA;
 852    }
 853    if (!(flags & BDRV_REQ_MAY_UNMAP)) {
 854        request.flags |= NBD_CMD_FLAG_NO_HOLE;
 855    }
 856
 857    if (!bytes) {
 858        return 0;
 859    }
 860    return nbd_co_request(bs, &request, NULL);
 861}
 862
 863int nbd_client_co_flush(BlockDriverState *bs)
 864{
 865    NBDClientSession *client = nbd_get_client_session(bs);
 866    NBDRequest request = { .type = NBD_CMD_FLUSH };
 867
 868    if (!(client->info.flags & NBD_FLAG_SEND_FLUSH)) {
 869        return 0;
 870    }
 871
 872    request.from = 0;
 873    request.len = 0;
 874
 875    return nbd_co_request(bs, &request, NULL);
 876}
 877
 878int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int bytes)
 879{
 880    NBDClientSession *client = nbd_get_client_session(bs);
 881    NBDRequest request = {
 882        .type = NBD_CMD_TRIM,
 883        .from = offset,
 884        .len = bytes,
 885    };
 886
 887    assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
 888    if (!(client->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
 889        return 0;
 890    }
 891
 892    return nbd_co_request(bs, &request, NULL);
 893}
 894
 895int coroutine_fn nbd_client_co_block_status(BlockDriverState *bs,
 896                                            bool want_zero,
 897                                            int64_t offset, int64_t bytes,
 898                                            int64_t *pnum, int64_t *map,
 899                                            BlockDriverState **file)
 900{
 901    int64_t ret;
 902    NBDExtent extent = { 0 };
 903    NBDClientSession *client = nbd_get_client_session(bs);
 904    Error *local_err = NULL;
 905
 906    NBDRequest request = {
 907        .type = NBD_CMD_BLOCK_STATUS,
 908        .from = offset,
 909        .len = MIN(MIN_NON_ZERO(QEMU_ALIGN_DOWN(INT_MAX,
 910                                                bs->bl.request_alignment),
 911                                client->info.max_block), bytes),
 912        .flags = NBD_CMD_FLAG_REQ_ONE,
 913    };
 914
 915    if (!client->info.base_allocation) {
 916        *pnum = bytes;
 917        return BDRV_BLOCK_DATA;
 918    }
 919
 920    ret = nbd_co_send_request(bs, &request, NULL);
 921    if (ret < 0) {
 922        return ret;
 923    }
 924
 925    ret = nbd_co_receive_blockstatus_reply(client, request.handle, bytes,
 926                                           &extent, &local_err);
 927    if (local_err) {
 928        error_report_err(local_err);
 929    }
 930    if (ret < 0) {
 931        return ret;
 932    }
 933
 934    assert(extent.length);
 935    *pnum = extent.length;
 936    return (extent.flags & NBD_STATE_HOLE ? 0 : BDRV_BLOCK_DATA) |
 937           (extent.flags & NBD_STATE_ZERO ? BDRV_BLOCK_ZERO : 0);
 938}
 939
 940void nbd_client_detach_aio_context(BlockDriverState *bs)
 941{
 942    NBDClientSession *client = nbd_get_client_session(bs);
 943    qio_channel_detach_aio_context(QIO_CHANNEL(client->ioc));
 944}
 945
 946void nbd_client_attach_aio_context(BlockDriverState *bs,
 947                                   AioContext *new_context)
 948{
 949    NBDClientSession *client = nbd_get_client_session(bs);
 950    qio_channel_attach_aio_context(QIO_CHANNEL(client->ioc), new_context);
 951    aio_co_schedule(new_context, client->read_reply_co);
 952}
 953
 954void nbd_client_close(BlockDriverState *bs)
 955{
 956    NBDClientSession *client = nbd_get_client_session(bs);
 957    NBDRequest request = { .type = NBD_CMD_DISC };
 958
 959    if (client->ioc == NULL) {
 960        return;
 961    }
 962
 963    nbd_send_request(client->ioc, &request);
 964
 965    nbd_teardown_connection(bs);
 966}
 967
 968int nbd_client_init(BlockDriverState *bs,
 969                    QIOChannelSocket *sioc,
 970                    const char *export,
 971                    QCryptoTLSCreds *tlscreds,
 972                    const char *hostname,
 973                    const char *x_dirty_bitmap,
 974                    Error **errp)
 975{
 976    NBDClientSession *client = nbd_get_client_session(bs);
 977    int ret;
 978
 979    /* NBD handshake */
 980    logout("session init %s\n", export);
 981    qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
 982
 983    client->info.request_sizes = true;
 984    client->info.structured_reply = true;
 985    client->info.base_allocation = true;
 986    client->info.x_dirty_bitmap = g_strdup(x_dirty_bitmap);
 987    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), export,
 988                                tlscreds, hostname,
 989                                &client->ioc, &client->info, errp);
 990    g_free(client->info.x_dirty_bitmap);
 991    if (ret < 0) {
 992        logout("Failed to negotiate with the NBD server\n");
 993        return ret;
 994    }
 995    if (x_dirty_bitmap && !client->info.base_allocation) {
 996        error_setg(errp, "requested x-dirty-bitmap %s not found",
 997                   x_dirty_bitmap);
 998        ret = -EINVAL;
 999        goto fail;
1000    }
1001    if (client->info.flags & NBD_FLAG_READ_ONLY) {
1002        ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
1003        if (ret < 0) {
1004            goto fail;
1005        }
1006    }
1007    if (client->info.flags & NBD_FLAG_SEND_FUA) {
1008        bs->supported_write_flags = BDRV_REQ_FUA;
1009        bs->supported_zero_flags |= BDRV_REQ_FUA;
1010    }
1011    if (client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
1012        bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
1013    }
1014
1015    qemu_co_mutex_init(&client->send_mutex);
1016    qemu_co_queue_init(&client->free_sema);
1017    client->sioc = sioc;
1018    object_ref(OBJECT(client->sioc));
1019
1020    if (!client->ioc) {
1021        client->ioc = QIO_CHANNEL(sioc);
1022        object_ref(OBJECT(client->ioc));
1023    }
1024
1025    /* Now that we're connected, set the socket to be non-blocking and
1026     * kick the reply mechanism.  */
1027    qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
1028    client->read_reply_co = qemu_coroutine_create(nbd_read_reply_entry, client);
1029    nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
1030
1031    logout("Established connection with NBD server\n");
1032    return 0;
1033
1034 fail:
1035    /*
1036     * We have connected, but must fail for other reasons. The
1037     * connection is still blocking; send NBD_CMD_DISC as a courtesy
1038     * to the server.
1039     */
1040    {
1041        NBDRequest request = { .type = NBD_CMD_DISC };
1042
1043        nbd_send_request(client->ioc ?: QIO_CHANNEL(sioc), &request);
1044        return ret;
1045    }
1046}
1047