qemu/block/nbd-client.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for  NBD
   3 *
   4 * Copyright (C) 2016 Red Hat, Inc.
   5 * Copyright (C) 2008 Bull S.A.S.
   6 *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
   7 *
   8 * Some parts:
   9 *    Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
  10 *
  11 * Permission is hereby granted, free of charge, to any person obtaining a copy
  12 * of this software and associated documentation files (the "Software"), to deal
  13 * in the Software without restriction, including without limitation the rights
  14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  15 * copies of the Software, and to permit persons to whom the Software is
  16 * furnished to do so, subject to the following conditions:
  17 *
  18 * The above copyright notice and this permission notice shall be included in
  19 * all copies or substantial portions of the Software.
  20 *
  21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  24 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  27 * THE SOFTWARE.
  28 */
  29
  30#include "qemu/osdep.h"
  31#include "qapi/error.h"
  32#include "nbd-client.h"
  33
  34#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ (uint64_t)(intptr_t)(bs))
  35#define INDEX_TO_HANDLE(bs, index)  ((index)  ^ (uint64_t)(intptr_t)(bs))
  36
  37static void nbd_recv_coroutines_wake_all(NBDClientSession *s)
  38{
  39    int i;
  40
  41    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
  42        NBDClientRequest *req = &s->requests[i];
  43
  44        if (req->coroutine && req->receiving) {
  45            aio_co_wake(req->coroutine);
  46        }
  47    }
  48}
  49
  50static void nbd_teardown_connection(BlockDriverState *bs)
  51{
  52    NBDClientSession *client = nbd_get_client_session(bs);
  53
  54    if (!client->ioc) { /* Already closed */
  55        return;
  56    }
  57
  58    /* finish any pending coroutines */
  59    qio_channel_shutdown(client->ioc,
  60                         QIO_CHANNEL_SHUTDOWN_BOTH,
  61                         NULL);
  62    BDRV_POLL_WHILE(bs, client->read_reply_co);
  63
  64    nbd_client_detach_aio_context(bs);
  65    object_unref(OBJECT(client->sioc));
  66    client->sioc = NULL;
  67    object_unref(OBJECT(client->ioc));
  68    client->ioc = NULL;
  69}
  70
  71static coroutine_fn void nbd_read_reply_entry(void *opaque)
  72{
  73    NBDClientSession *s = opaque;
  74    uint64_t i;
  75    int ret = 0;
  76    Error *local_err = NULL;
  77
  78    while (!s->quit) {
  79        assert(s->reply.handle == 0);
  80        ret = nbd_receive_reply(s->ioc, &s->reply, &local_err);
  81        if (local_err) {
  82            error_report_err(local_err);
  83        }
  84        if (ret <= 0) {
  85            break;
  86        }
  87
  88        /* There's no need for a mutex on the receive side, because the
  89         * handler acts as a synchronization point and ensures that only
  90         * one coroutine is called until the reply finishes.
  91         */
  92        i = HANDLE_TO_INDEX(s, s->reply.handle);
  93        if (i >= MAX_NBD_REQUESTS ||
  94            !s->requests[i].coroutine ||
  95            !s->requests[i].receiving ||
  96            (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
  97        {
  98            break;
  99        }
 100
 101        /* We're woken up again by the request itself.  Note that there
 102         * is no race between yielding and reentering read_reply_co.  This
 103         * is because:
 104         *
 105         * - if the request runs on the same AioContext, it is only
 106         *   entered after we yield
 107         *
 108         * - if the request runs on a different AioContext, reentering
 109         *   read_reply_co happens through a bottom half, which can only
 110         *   run after we yield.
 111         */
 112        aio_co_wake(s->requests[i].coroutine);
 113        qemu_coroutine_yield();
 114    }
 115
 116    s->quit = true;
 117    nbd_recv_coroutines_wake_all(s);
 118    s->read_reply_co = NULL;
 119}
 120
 121static int nbd_co_send_request(BlockDriverState *bs,
 122                               NBDRequest *request,
 123                               QEMUIOVector *qiov)
 124{
 125    NBDClientSession *s = nbd_get_client_session(bs);
 126    int rc, i;
 127
 128    qemu_co_mutex_lock(&s->send_mutex);
 129    while (s->in_flight == MAX_NBD_REQUESTS) {
 130        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
 131    }
 132    s->in_flight++;
 133
 134    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
 135        if (s->requests[i].coroutine == NULL) {
 136            break;
 137        }
 138    }
 139
 140    g_assert(qemu_in_coroutine());
 141    assert(i < MAX_NBD_REQUESTS);
 142
 143    s->requests[i].coroutine = qemu_coroutine_self();
 144    s->requests[i].offset = request->from;
 145    s->requests[i].receiving = false;
 146
 147    request->handle = INDEX_TO_HANDLE(s, i);
 148
 149    if (s->quit) {
 150        rc = -EIO;
 151        goto err;
 152    }
 153    if (!s->ioc) {
 154        rc = -EPIPE;
 155        goto err;
 156    }
 157
 158    if (qiov) {
 159        qio_channel_set_cork(s->ioc, true);
 160        rc = nbd_send_request(s->ioc, request);
 161        if (rc >= 0 && !s->quit) {
 162            if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
 163                                       NULL) < 0) {
 164                rc = -EIO;
 165            }
 166        } else if (rc >= 0) {
 167            rc = -EIO;
 168        }
 169        qio_channel_set_cork(s->ioc, false);
 170    } else {
 171        rc = nbd_send_request(s->ioc, request);
 172    }
 173
 174err:
 175    if (rc < 0) {
 176        s->quit = true;
 177        s->requests[i].coroutine = NULL;
 178        s->in_flight--;
 179        qemu_co_queue_next(&s->free_sema);
 180    }
 181    qemu_co_mutex_unlock(&s->send_mutex);
 182    return rc;
 183}
 184
 185static inline uint16_t payload_advance16(uint8_t **payload)
 186{
 187    *payload += 2;
 188    return lduw_be_p(*payload - 2);
 189}
 190
 191static inline uint32_t payload_advance32(uint8_t **payload)
 192{
 193    *payload += 4;
 194    return ldl_be_p(*payload - 4);
 195}
 196
 197static inline uint64_t payload_advance64(uint8_t **payload)
 198{
 199    *payload += 8;
 200    return ldq_be_p(*payload - 8);
 201}
 202
 203static int nbd_parse_offset_hole_payload(NBDStructuredReplyChunk *chunk,
 204                                         uint8_t *payload, uint64_t orig_offset,
 205                                         QEMUIOVector *qiov, Error **errp)
 206{
 207    uint64_t offset;
 208    uint32_t hole_size;
 209
 210    if (chunk->length != sizeof(offset) + sizeof(hole_size)) {
 211        error_setg(errp, "Protocol error: invalid payload for "
 212                         "NBD_REPLY_TYPE_OFFSET_HOLE");
 213        return -EINVAL;
 214    }
 215
 216    offset = payload_advance64(&payload);
 217    hole_size = payload_advance32(&payload);
 218
 219    if (!hole_size || offset < orig_offset || hole_size > qiov->size ||
 220        offset > orig_offset + qiov->size - hole_size) {
 221        error_setg(errp, "Protocol error: server sent chunk exceeding requested"
 222                         " region");
 223        return -EINVAL;
 224    }
 225
 226    qemu_iovec_memset(qiov, offset - orig_offset, 0, hole_size);
 227
 228    return 0;
 229}
 230
 231/* nbd_parse_blockstatus_payload
 232 * support only one extent in reply and only for
 233 * base:allocation context
 234 */
 235static int nbd_parse_blockstatus_payload(NBDClientSession *client,
 236                                         NBDStructuredReplyChunk *chunk,
 237                                         uint8_t *payload, uint64_t orig_length,
 238                                         NBDExtent *extent, Error **errp)
 239{
 240    uint32_t context_id;
 241
 242    if (chunk->length != sizeof(context_id) + sizeof(*extent)) {
 243        error_setg(errp, "Protocol error: invalid payload for "
 244                         "NBD_REPLY_TYPE_BLOCK_STATUS");
 245        return -EINVAL;
 246    }
 247
 248    context_id = payload_advance32(&payload);
 249    if (client->info.meta_base_allocation_id != context_id) {
 250        error_setg(errp, "Protocol error: unexpected context id %d for "
 251                         "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
 252                         "id is %d", context_id,
 253                         client->info.meta_base_allocation_id);
 254        return -EINVAL;
 255    }
 256
 257    extent->length = payload_advance32(&payload);
 258    extent->flags = payload_advance32(&payload);
 259
 260    if (extent->length == 0 ||
 261        (client->info.min_block && !QEMU_IS_ALIGNED(extent->length,
 262                                                    client->info.min_block)) ||
 263        extent->length > orig_length)
 264    {
 265        error_setg(errp, "Protocol error: server sent status chunk with "
 266                   "invalid length");
 267        return -EINVAL;
 268    }
 269
 270    return 0;
 271}
 272
 273/* nbd_parse_error_payload
 274 * on success @errp contains message describing nbd error reply
 275 */
 276static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
 277                                   uint8_t *payload, int *request_ret,
 278                                   Error **errp)
 279{
 280    uint32_t error;
 281    uint16_t message_size;
 282
 283    assert(chunk->type & (1 << 15));
 284
 285    if (chunk->length < sizeof(error) + sizeof(message_size)) {
 286        error_setg(errp,
 287                   "Protocol error: invalid payload for structured error");
 288        return -EINVAL;
 289    }
 290
 291    error = nbd_errno_to_system_errno(payload_advance32(&payload));
 292    if (error == 0) {
 293        error_setg(errp, "Protocol error: server sent structured error chunk "
 294                         "with error = 0");
 295        return -EINVAL;
 296    }
 297
 298    *request_ret = -error;
 299    message_size = payload_advance16(&payload);
 300
 301    if (message_size > chunk->length - sizeof(error) - sizeof(message_size)) {
 302        error_setg(errp, "Protocol error: server sent structured error chunk "
 303                         "with incorrect message size");
 304        return -EINVAL;
 305    }
 306
 307    /* TODO: Add a trace point to mention the server complaint */
 308
 309    /* TODO handle ERROR_OFFSET */
 310
 311    return 0;
 312}
 313
 314static int nbd_co_receive_offset_data_payload(NBDClientSession *s,
 315                                              uint64_t orig_offset,
 316                                              QEMUIOVector *qiov, Error **errp)
 317{
 318    QEMUIOVector sub_qiov;
 319    uint64_t offset;
 320    size_t data_size;
 321    int ret;
 322    NBDStructuredReplyChunk *chunk = &s->reply.structured;
 323
 324    assert(nbd_reply_is_structured(&s->reply));
 325
 326    /* The NBD spec requires at least one byte of payload */
 327    if (chunk->length <= sizeof(offset)) {
 328        error_setg(errp, "Protocol error: invalid payload for "
 329                         "NBD_REPLY_TYPE_OFFSET_DATA");
 330        return -EINVAL;
 331    }
 332
 333    if (nbd_read(s->ioc, &offset, sizeof(offset), errp) < 0) {
 334        return -EIO;
 335    }
 336    be64_to_cpus(&offset);
 337
 338    data_size = chunk->length - sizeof(offset);
 339    assert(data_size);
 340    if (offset < orig_offset || data_size > qiov->size ||
 341        offset > orig_offset + qiov->size - data_size) {
 342        error_setg(errp, "Protocol error: server sent chunk exceeding requested"
 343                         " region");
 344        return -EINVAL;
 345    }
 346
 347    qemu_iovec_init(&sub_qiov, qiov->niov);
 348    qemu_iovec_concat(&sub_qiov, qiov, offset - orig_offset, data_size);
 349    ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, errp);
 350    qemu_iovec_destroy(&sub_qiov);
 351
 352    return ret < 0 ? -EIO : 0;
 353}
 354
 355#define NBD_MAX_MALLOC_PAYLOAD 1000
 356/* nbd_co_receive_structured_payload
 357 */
 358static coroutine_fn int nbd_co_receive_structured_payload(
 359        NBDClientSession *s, void **payload, Error **errp)
 360{
 361    int ret;
 362    uint32_t len;
 363
 364    assert(nbd_reply_is_structured(&s->reply));
 365
 366    len = s->reply.structured.length;
 367
 368    if (len == 0) {
 369        return 0;
 370    }
 371
 372    if (payload == NULL) {
 373        error_setg(errp, "Unexpected structured payload");
 374        return -EINVAL;
 375    }
 376
 377    if (len > NBD_MAX_MALLOC_PAYLOAD) {
 378        error_setg(errp, "Payload too large");
 379        return -EINVAL;
 380    }
 381
 382    *payload = g_new(char, len);
 383    ret = nbd_read(s->ioc, *payload, len, errp);
 384    if (ret < 0) {
 385        g_free(*payload);
 386        *payload = NULL;
 387        return ret;
 388    }
 389
 390    return 0;
 391}
 392
 393/* nbd_co_do_receive_one_chunk
 394 * for simple reply:
 395 *   set request_ret to received reply error
 396 *   if qiov is not NULL: read payload to @qiov
 397 * for structured reply chunk:
 398 *   if error chunk: read payload, set @request_ret, do not set @payload
 399 *   else if offset_data chunk: read payload data to @qiov, do not set @payload
 400 *   else: read payload to @payload
 401 *
 402 * If function fails, @errp contains corresponding error message, and the
 403 * connection with the server is suspect.  If it returns 0, then the
 404 * transaction succeeded (although @request_ret may be a negative errno
 405 * corresponding to the server's error reply), and errp is unchanged.
 406 */
 407static coroutine_fn int nbd_co_do_receive_one_chunk(
 408        NBDClientSession *s, uint64_t handle, bool only_structured,
 409        int *request_ret, QEMUIOVector *qiov, void **payload, Error **errp)
 410{
 411    int ret;
 412    int i = HANDLE_TO_INDEX(s, handle);
 413    void *local_payload = NULL;
 414    NBDStructuredReplyChunk *chunk;
 415
 416    if (payload) {
 417        *payload = NULL;
 418    }
 419    *request_ret = 0;
 420
 421    /* Wait until we're woken up by nbd_read_reply_entry.  */
 422    s->requests[i].receiving = true;
 423    qemu_coroutine_yield();
 424    s->requests[i].receiving = false;
 425    if (!s->ioc || s->quit) {
 426        error_setg(errp, "Connection closed");
 427        return -EIO;
 428    }
 429
 430    assert(s->reply.handle == handle);
 431
 432    if (nbd_reply_is_simple(&s->reply)) {
 433        if (only_structured) {
 434            error_setg(errp, "Protocol error: simple reply when structured "
 435                             "reply chunk was expected");
 436            return -EINVAL;
 437        }
 438
 439        *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error);
 440        if (*request_ret < 0 || !qiov) {
 441            return 0;
 442        }
 443
 444        return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
 445                                     errp) < 0 ? -EIO : 0;
 446    }
 447
 448    /* handle structured reply chunk */
 449    assert(s->info.structured_reply);
 450    chunk = &s->reply.structured;
 451
 452    if (chunk->type == NBD_REPLY_TYPE_NONE) {
 453        if (!(chunk->flags & NBD_REPLY_FLAG_DONE)) {
 454            error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
 455                       " NBD_REPLY_FLAG_DONE flag set");
 456            return -EINVAL;
 457        }
 458        if (chunk->length) {
 459            error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
 460                       " nonzero length");
 461            return -EINVAL;
 462        }
 463        return 0;
 464    }
 465
 466    if (chunk->type == NBD_REPLY_TYPE_OFFSET_DATA) {
 467        if (!qiov) {
 468            error_setg(errp, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
 469            return -EINVAL;
 470        }
 471
 472        return nbd_co_receive_offset_data_payload(s, s->requests[i].offset,
 473                                                  qiov, errp);
 474    }
 475
 476    if (nbd_reply_type_is_error(chunk->type)) {
 477        payload = &local_payload;
 478    }
 479
 480    ret = nbd_co_receive_structured_payload(s, payload, errp);
 481    if (ret < 0) {
 482        return ret;
 483    }
 484
 485    if (nbd_reply_type_is_error(chunk->type)) {
 486        ret = nbd_parse_error_payload(chunk, local_payload, request_ret, errp);
 487        g_free(local_payload);
 488        return ret;
 489    }
 490
 491    return 0;
 492}
 493
 494/* nbd_co_receive_one_chunk
 495 * Read reply, wake up read_reply_co and set s->quit if needed.
 496 * Return value is a fatal error code or normal nbd reply error code
 497 */
 498static coroutine_fn int nbd_co_receive_one_chunk(
 499        NBDClientSession *s, uint64_t handle, bool only_structured,
 500        QEMUIOVector *qiov, NBDReply *reply, void **payload, Error **errp)
 501{
 502    int request_ret;
 503    int ret = nbd_co_do_receive_one_chunk(s, handle, only_structured,
 504                                          &request_ret, qiov, payload, errp);
 505
 506    if (ret < 0) {
 507        s->quit = true;
 508    } else {
 509        /* For assert at loop start in nbd_read_reply_entry */
 510        if (reply) {
 511            *reply = s->reply;
 512        }
 513        s->reply.handle = 0;
 514        ret = request_ret;
 515    }
 516
 517    if (s->read_reply_co) {
 518        aio_co_wake(s->read_reply_co);
 519    }
 520
 521    return ret;
 522}
 523
 524typedef struct NBDReplyChunkIter {
 525    int ret;
 526    bool fatal;
 527    Error *err;
 528    bool done, only_structured;
 529} NBDReplyChunkIter;
 530
 531static void nbd_iter_error(NBDReplyChunkIter *iter, bool fatal,
 532                           int ret, Error **local_err)
 533{
 534    assert(ret < 0);
 535
 536    if ((fatal && !iter->fatal) || iter->ret == 0) {
 537        if (iter->ret != 0) {
 538            error_free(iter->err);
 539            iter->err = NULL;
 540        }
 541        iter->fatal = fatal;
 542        iter->ret = ret;
 543        error_propagate(&iter->err, *local_err);
 544    } else {
 545        error_free(*local_err);
 546    }
 547
 548    *local_err = NULL;
 549}
 550
 551/* NBD_FOREACH_REPLY_CHUNK
 552 */
 553#define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \
 554                                qiov, reply, payload) \
 555    for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
 556         nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);)
 557
 558/* nbd_reply_chunk_iter_receive
 559 */
 560static bool nbd_reply_chunk_iter_receive(NBDClientSession *s,
 561                                         NBDReplyChunkIter *iter,
 562                                         uint64_t handle,
 563                                         QEMUIOVector *qiov, NBDReply *reply,
 564                                         void **payload)
 565{
 566    int ret;
 567    NBDReply local_reply;
 568    NBDStructuredReplyChunk *chunk;
 569    Error *local_err = NULL;
 570    if (s->quit) {
 571        error_setg(&local_err, "Connection closed");
 572        nbd_iter_error(iter, true, -EIO, &local_err);
 573        goto break_loop;
 574    }
 575
 576    if (iter->done) {
 577        /* Previous iteration was last. */
 578        goto break_loop;
 579    }
 580
 581    if (reply == NULL) {
 582        reply = &local_reply;
 583    }
 584
 585    ret = nbd_co_receive_one_chunk(s, handle, iter->only_structured,
 586                                   qiov, reply, payload, &local_err);
 587    if (ret < 0) {
 588        /* If it is a fatal error s->quit is set by nbd_co_receive_one_chunk */
 589        nbd_iter_error(iter, s->quit, ret, &local_err);
 590    }
 591
 592    /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
 593    if (nbd_reply_is_simple(&s->reply) || s->quit) {
 594        goto break_loop;
 595    }
 596
 597    chunk = &reply->structured;
 598    iter->only_structured = true;
 599
 600    if (chunk->type == NBD_REPLY_TYPE_NONE) {
 601        /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
 602        assert(chunk->flags & NBD_REPLY_FLAG_DONE);
 603        goto break_loop;
 604    }
 605
 606    if (chunk->flags & NBD_REPLY_FLAG_DONE) {
 607        /* This iteration is last. */
 608        iter->done = true;
 609    }
 610
 611    /* Execute the loop body */
 612    return true;
 613
 614break_loop:
 615    s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
 616
 617    qemu_co_mutex_lock(&s->send_mutex);
 618    s->in_flight--;
 619    qemu_co_queue_next(&s->free_sema);
 620    qemu_co_mutex_unlock(&s->send_mutex);
 621
 622    return false;
 623}
 624
 625static int nbd_co_receive_return_code(NBDClientSession *s, uint64_t handle,
 626                                      Error **errp)
 627{
 628    NBDReplyChunkIter iter;
 629
 630    NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, NULL, NULL) {
 631        /* nbd_reply_chunk_iter_receive does all the work */
 632    }
 633
 634    error_propagate(errp, iter.err);
 635    return iter.ret;
 636}
 637
 638static int nbd_co_receive_cmdread_reply(NBDClientSession *s, uint64_t handle,
 639                                        uint64_t offset, QEMUIOVector *qiov,
 640                                        Error **errp)
 641{
 642    NBDReplyChunkIter iter;
 643    NBDReply reply;
 644    void *payload = NULL;
 645    Error *local_err = NULL;
 646
 647    NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
 648                            qiov, &reply, &payload)
 649    {
 650        int ret;
 651        NBDStructuredReplyChunk *chunk = &reply.structured;
 652
 653        assert(nbd_reply_is_structured(&reply));
 654
 655        switch (chunk->type) {
 656        case NBD_REPLY_TYPE_OFFSET_DATA:
 657            /* special cased in nbd_co_receive_one_chunk, data is already
 658             * in qiov */
 659            break;
 660        case NBD_REPLY_TYPE_OFFSET_HOLE:
 661            ret = nbd_parse_offset_hole_payload(&reply.structured, payload,
 662                                                offset, qiov, &local_err);
 663            if (ret < 0) {
 664                s->quit = true;
 665                nbd_iter_error(&iter, true, ret, &local_err);
 666            }
 667            break;
 668        default:
 669            if (!nbd_reply_type_is_error(chunk->type)) {
 670                /* not allowed reply type */
 671                s->quit = true;
 672                error_setg(&local_err,
 673                           "Unexpected reply type: %d (%s) for CMD_READ",
 674                           chunk->type, nbd_reply_type_lookup(chunk->type));
 675                nbd_iter_error(&iter, true, -EINVAL, &local_err);
 676            }
 677        }
 678
 679        g_free(payload);
 680        payload = NULL;
 681    }
 682
 683    error_propagate(errp, iter.err);
 684    return iter.ret;
 685}
 686
 687static int nbd_co_receive_blockstatus_reply(NBDClientSession *s,
 688                                            uint64_t handle, uint64_t length,
 689                                            NBDExtent *extent, Error **errp)
 690{
 691    NBDReplyChunkIter iter;
 692    NBDReply reply;
 693    void *payload = NULL;
 694    Error *local_err = NULL;
 695    bool received = false;
 696
 697    assert(!extent->length);
 698    NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
 699                            NULL, &reply, &payload)
 700    {
 701        int ret;
 702        NBDStructuredReplyChunk *chunk = &reply.structured;
 703
 704        assert(nbd_reply_is_structured(&reply));
 705
 706        switch (chunk->type) {
 707        case NBD_REPLY_TYPE_BLOCK_STATUS:
 708            if (received) {
 709                s->quit = true;
 710                error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
 711                nbd_iter_error(&iter, true, -EINVAL, &local_err);
 712            }
 713            received = true;
 714
 715            ret = nbd_parse_blockstatus_payload(s, &reply.structured,
 716                                                payload, length, extent,
 717                                                &local_err);
 718            if (ret < 0) {
 719                s->quit = true;
 720                nbd_iter_error(&iter, true, ret, &local_err);
 721            }
 722            break;
 723        default:
 724            if (!nbd_reply_type_is_error(chunk->type)) {
 725                s->quit = true;
 726                error_setg(&local_err,
 727                           "Unexpected reply type: %d (%s) "
 728                           "for CMD_BLOCK_STATUS",
 729                           chunk->type, nbd_reply_type_lookup(chunk->type));
 730                nbd_iter_error(&iter, true, -EINVAL, &local_err);
 731            }
 732        }
 733
 734        g_free(payload);
 735        payload = NULL;
 736    }
 737
 738    if (!extent->length && !iter.err) {
 739        error_setg(&iter.err,
 740                   "Server did not reply with any status extents");
 741        if (!iter.ret) {
 742            iter.ret = -EIO;
 743        }
 744    }
 745    error_propagate(errp, iter.err);
 746    return iter.ret;
 747}
 748
 749static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
 750                          QEMUIOVector *write_qiov)
 751{
 752    int ret;
 753    Error *local_err = NULL;
 754    NBDClientSession *client = nbd_get_client_session(bs);
 755
 756    assert(request->type != NBD_CMD_READ);
 757    if (write_qiov) {
 758        assert(request->type == NBD_CMD_WRITE);
 759        assert(request->len == iov_size(write_qiov->iov, write_qiov->niov));
 760    } else {
 761        assert(request->type != NBD_CMD_WRITE);
 762    }
 763    ret = nbd_co_send_request(bs, request, write_qiov);
 764    if (ret < 0) {
 765        return ret;
 766    }
 767
 768    ret = nbd_co_receive_return_code(client, request->handle, &local_err);
 769    if (local_err) {
 770        error_report_err(local_err);
 771    }
 772    return ret;
 773}
 774
 775int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
 776                         uint64_t bytes, QEMUIOVector *qiov, int flags)
 777{
 778    int ret;
 779    Error *local_err = NULL;
 780    NBDClientSession *client = nbd_get_client_session(bs);
 781    NBDRequest request = {
 782        .type = NBD_CMD_READ,
 783        .from = offset,
 784        .len = bytes,
 785    };
 786
 787    assert(bytes <= NBD_MAX_BUFFER_SIZE);
 788    assert(!flags);
 789
 790    if (!bytes) {
 791        return 0;
 792    }
 793    ret = nbd_co_send_request(bs, &request, NULL);
 794    if (ret < 0) {
 795        return ret;
 796    }
 797
 798    ret = nbd_co_receive_cmdread_reply(client, request.handle, offset, qiov,
 799                                       &local_err);
 800    if (local_err) {
 801        error_report_err(local_err);
 802    }
 803    return ret;
 804}
 805
 806int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
 807                          uint64_t bytes, QEMUIOVector *qiov, int flags)
 808{
 809    NBDClientSession *client = nbd_get_client_session(bs);
 810    NBDRequest request = {
 811        .type = NBD_CMD_WRITE,
 812        .from = offset,
 813        .len = bytes,
 814    };
 815
 816    assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
 817    if (flags & BDRV_REQ_FUA) {
 818        assert(client->info.flags & NBD_FLAG_SEND_FUA);
 819        request.flags |= NBD_CMD_FLAG_FUA;
 820    }
 821
 822    assert(bytes <= NBD_MAX_BUFFER_SIZE);
 823
 824    if (!bytes) {
 825        return 0;
 826    }
 827    return nbd_co_request(bs, &request, qiov);
 828}
 829
 830int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
 831                                int bytes, BdrvRequestFlags flags)
 832{
 833    NBDClientSession *client = nbd_get_client_session(bs);
 834    NBDRequest request = {
 835        .type = NBD_CMD_WRITE_ZEROES,
 836        .from = offset,
 837        .len = bytes,
 838    };
 839
 840    assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
 841    if (!(client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
 842        return -ENOTSUP;
 843    }
 844
 845    if (flags & BDRV_REQ_FUA) {
 846        assert(client->info.flags & NBD_FLAG_SEND_FUA);
 847        request.flags |= NBD_CMD_FLAG_FUA;
 848    }
 849    if (!(flags & BDRV_REQ_MAY_UNMAP)) {
 850        request.flags |= NBD_CMD_FLAG_NO_HOLE;
 851    }
 852
 853    if (!bytes) {
 854        return 0;
 855    }
 856    return nbd_co_request(bs, &request, NULL);
 857}
 858
 859int nbd_client_co_flush(BlockDriverState *bs)
 860{
 861    NBDClientSession *client = nbd_get_client_session(bs);
 862    NBDRequest request = { .type = NBD_CMD_FLUSH };
 863
 864    if (!(client->info.flags & NBD_FLAG_SEND_FLUSH)) {
 865        return 0;
 866    }
 867
 868    request.from = 0;
 869    request.len = 0;
 870
 871    return nbd_co_request(bs, &request, NULL);
 872}
 873
 874int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int bytes)
 875{
 876    NBDClientSession *client = nbd_get_client_session(bs);
 877    NBDRequest request = {
 878        .type = NBD_CMD_TRIM,
 879        .from = offset,
 880        .len = bytes,
 881    };
 882
 883    assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
 884    if (!(client->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
 885        return 0;
 886    }
 887
 888    return nbd_co_request(bs, &request, NULL);
 889}
 890
 891int coroutine_fn nbd_client_co_block_status(BlockDriverState *bs,
 892                                            bool want_zero,
 893                                            int64_t offset, int64_t bytes,
 894                                            int64_t *pnum, int64_t *map,
 895                                            BlockDriverState **file)
 896{
 897    int64_t ret;
 898    NBDExtent extent = { 0 };
 899    NBDClientSession *client = nbd_get_client_session(bs);
 900    Error *local_err = NULL;
 901
 902    NBDRequest request = {
 903        .type = NBD_CMD_BLOCK_STATUS,
 904        .from = offset,
 905        .len = MIN(MIN_NON_ZERO(QEMU_ALIGN_DOWN(INT_MAX,
 906                                                bs->bl.request_alignment),
 907                                client->info.max_block), bytes),
 908        .flags = NBD_CMD_FLAG_REQ_ONE,
 909    };
 910
 911    if (!client->info.base_allocation) {
 912        *pnum = bytes;
 913        return BDRV_BLOCK_DATA;
 914    }
 915
 916    ret = nbd_co_send_request(bs, &request, NULL);
 917    if (ret < 0) {
 918        return ret;
 919    }
 920
 921    ret = nbd_co_receive_blockstatus_reply(client, request.handle, bytes,
 922                                           &extent, &local_err);
 923    if (local_err) {
 924        error_report_err(local_err);
 925    }
 926    if (ret < 0) {
 927        return ret;
 928    }
 929
 930    assert(extent.length);
 931    *pnum = extent.length;
 932    return (extent.flags & NBD_STATE_HOLE ? 0 : BDRV_BLOCK_DATA) |
 933           (extent.flags & NBD_STATE_ZERO ? BDRV_BLOCK_ZERO : 0);
 934}
 935
 936void nbd_client_detach_aio_context(BlockDriverState *bs)
 937{
 938    NBDClientSession *client = nbd_get_client_session(bs);
 939    qio_channel_detach_aio_context(QIO_CHANNEL(client->ioc));
 940}
 941
 942void nbd_client_attach_aio_context(BlockDriverState *bs,
 943                                   AioContext *new_context)
 944{
 945    NBDClientSession *client = nbd_get_client_session(bs);
 946    qio_channel_attach_aio_context(QIO_CHANNEL(client->ioc), new_context);
 947    aio_co_schedule(new_context, client->read_reply_co);
 948}
 949
 950void nbd_client_close(BlockDriverState *bs)
 951{
 952    NBDClientSession *client = nbd_get_client_session(bs);
 953    NBDRequest request = { .type = NBD_CMD_DISC };
 954
 955    if (client->ioc == NULL) {
 956        return;
 957    }
 958
 959    nbd_send_request(client->ioc, &request);
 960
 961    nbd_teardown_connection(bs);
 962}
 963
 964int nbd_client_init(BlockDriverState *bs,
 965                    QIOChannelSocket *sioc,
 966                    const char *export,
 967                    QCryptoTLSCreds *tlscreds,
 968                    const char *hostname,
 969                    Error **errp)
 970{
 971    NBDClientSession *client = nbd_get_client_session(bs);
 972    int ret;
 973
 974    /* NBD handshake */
 975    logout("session init %s\n", export);
 976    qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
 977
 978    client->info.request_sizes = true;
 979    client->info.structured_reply = true;
 980    client->info.base_allocation = true;
 981    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), export,
 982                                tlscreds, hostname,
 983                                &client->ioc, &client->info, errp);
 984    if (ret < 0) {
 985        logout("Failed to negotiate with the NBD server\n");
 986        return ret;
 987    }
 988    if (client->info.flags & NBD_FLAG_READ_ONLY &&
 989        !bdrv_is_read_only(bs)) {
 990        error_setg(errp,
 991                   "request for write access conflicts with read-only export");
 992        return -EACCES;
 993    }
 994    if (client->info.flags & NBD_FLAG_SEND_FUA) {
 995        bs->supported_write_flags = BDRV_REQ_FUA;
 996        bs->supported_zero_flags |= BDRV_REQ_FUA;
 997    }
 998    if (client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
 999        bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
1000    }
1001
1002    qemu_co_mutex_init(&client->send_mutex);
1003    qemu_co_queue_init(&client->free_sema);
1004    client->sioc = sioc;
1005    object_ref(OBJECT(client->sioc));
1006
1007    if (!client->ioc) {
1008        client->ioc = QIO_CHANNEL(sioc);
1009        object_ref(OBJECT(client->ioc));
1010    }
1011
1012    /* Now that we're connected, set the socket to be non-blocking and
1013     * kick the reply mechanism.  */
1014    qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
1015    client->read_reply_co = qemu_coroutine_create(nbd_read_reply_entry, client);
1016    nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
1017
1018    logout("Established connection with NBD server\n");
1019    return 0;
1020}
1021