qemu/block/nbd-client.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for  NBD
   3 *
   4 * Copyright (C) 2016 Red Hat, Inc.
   5 * Copyright (C) 2008 Bull S.A.S.
   6 *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
   7 *
   8 * Some parts:
   9 *    Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
  10 *
  11 * Permission is hereby granted, free of charge, to any person obtaining a copy
  12 * of this software and associated documentation files (the "Software"), to deal
  13 * in the Software without restriction, including without limitation the rights
  14 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  15 * copies of the Software, and to permit persons to whom the Software is
  16 * furnished to do so, subject to the following conditions:
  17 *
  18 * The above copyright notice and this permission notice shall be included in
  19 * all copies or substantial portions of the Software.
  20 *
  21 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  22 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  23 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  24 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  25 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  26 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  27 * THE SOFTWARE.
  28 */
  29
  30#include "qemu/osdep.h"
  31
  32#include "trace.h"
  33#include "qapi/error.h"
  34#include "nbd-client.h"
  35
  36#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ (uint64_t)(intptr_t)(bs))
  37#define INDEX_TO_HANDLE(bs, index)  ((index)  ^ (uint64_t)(intptr_t)(bs))
  38
  39static void nbd_recv_coroutines_wake_all(NBDClientSession *s)
  40{
  41    int i;
  42
  43    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
  44        NBDClientRequest *req = &s->requests[i];
  45
  46        if (req->coroutine && req->receiving) {
  47            aio_co_wake(req->coroutine);
  48        }
  49    }
  50}
  51
  52static void nbd_teardown_connection(BlockDriverState *bs)
  53{
  54    NBDClientSession *client = nbd_get_client_session(bs);
  55
  56    assert(client->ioc);
  57
  58    /* finish any pending coroutines */
  59    qio_channel_shutdown(client->ioc,
  60                         QIO_CHANNEL_SHUTDOWN_BOTH,
  61                         NULL);
  62    BDRV_POLL_WHILE(bs, client->connection_co);
  63
  64    nbd_client_detach_aio_context(bs);
  65    object_unref(OBJECT(client->sioc));
  66    client->sioc = NULL;
  67    object_unref(OBJECT(client->ioc));
  68    client->ioc = NULL;
  69}
  70
  71static coroutine_fn void nbd_connection_entry(void *opaque)
  72{
  73    NBDClientSession *s = opaque;
  74    uint64_t i;
  75    int ret = 0;
  76    Error *local_err = NULL;
  77
  78    while (!s->quit) {
  79        /*
  80         * The NBD client can only really be considered idle when it has
  81         * yielded from qio_channel_readv_all_eof(), waiting for data. This is
  82         * the point where the additional scheduled coroutine entry happens
  83         * after nbd_client_attach_aio_context().
  84         *
  85         * Therefore we keep an additional in_flight reference all the time and
  86         * only drop it temporarily here.
  87         */
  88        assert(s->reply.handle == 0);
  89        ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
  90
  91        if (local_err) {
  92            trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
  93            error_free(local_err);
  94        }
  95        if (ret <= 0) {
  96            break;
  97        }
  98
  99        /* There's no need for a mutex on the receive side, because the
 100         * handler acts as a synchronization point and ensures that only
 101         * one coroutine is called until the reply finishes.
 102         */
 103        i = HANDLE_TO_INDEX(s, s->reply.handle);
 104        if (i >= MAX_NBD_REQUESTS ||
 105            !s->requests[i].coroutine ||
 106            !s->requests[i].receiving ||
 107            (nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
 108        {
 109            break;
 110        }
 111
 112        /* We're woken up again by the request itself.  Note that there
 113         * is no race between yielding and reentering connection_co.  This
 114         * is because:
 115         *
 116         * - if the request runs on the same AioContext, it is only
 117         *   entered after we yield
 118         *
 119         * - if the request runs on a different AioContext, reentering
 120         *   connection_co happens through a bottom half, which can only
 121         *   run after we yield.
 122         */
 123        aio_co_wake(s->requests[i].coroutine);
 124        qemu_coroutine_yield();
 125    }
 126
 127    s->quit = true;
 128    nbd_recv_coroutines_wake_all(s);
 129    bdrv_dec_in_flight(s->bs);
 130
 131    s->connection_co = NULL;
 132    aio_wait_kick();
 133}
 134
 135static int nbd_co_send_request(BlockDriverState *bs,
 136                               NBDRequest *request,
 137                               QEMUIOVector *qiov)
 138{
 139    NBDClientSession *s = nbd_get_client_session(bs);
 140    int rc, i;
 141
 142    qemu_co_mutex_lock(&s->send_mutex);
 143    while (s->in_flight == MAX_NBD_REQUESTS) {
 144        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
 145    }
 146    s->in_flight++;
 147
 148    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
 149        if (s->requests[i].coroutine == NULL) {
 150            break;
 151        }
 152    }
 153
 154    g_assert(qemu_in_coroutine());
 155    assert(i < MAX_NBD_REQUESTS);
 156
 157    s->requests[i].coroutine = qemu_coroutine_self();
 158    s->requests[i].offset = request->from;
 159    s->requests[i].receiving = false;
 160
 161    request->handle = INDEX_TO_HANDLE(s, i);
 162
 163    if (s->quit) {
 164        rc = -EIO;
 165        goto err;
 166    }
 167    assert(s->ioc);
 168
 169    if (qiov) {
 170        qio_channel_set_cork(s->ioc, true);
 171        rc = nbd_send_request(s->ioc, request);
 172        if (rc >= 0 && !s->quit) {
 173            if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
 174                                       NULL) < 0) {
 175                rc = -EIO;
 176            }
 177        } else if (rc >= 0) {
 178            rc = -EIO;
 179        }
 180        qio_channel_set_cork(s->ioc, false);
 181    } else {
 182        rc = nbd_send_request(s->ioc, request);
 183    }
 184
 185err:
 186    if (rc < 0) {
 187        s->quit = true;
 188        s->requests[i].coroutine = NULL;
 189        s->in_flight--;
 190        qemu_co_queue_next(&s->free_sema);
 191    }
 192    qemu_co_mutex_unlock(&s->send_mutex);
 193    return rc;
 194}
 195
 196static inline uint16_t payload_advance16(uint8_t **payload)
 197{
 198    *payload += 2;
 199    return lduw_be_p(*payload - 2);
 200}
 201
 202static inline uint32_t payload_advance32(uint8_t **payload)
 203{
 204    *payload += 4;
 205    return ldl_be_p(*payload - 4);
 206}
 207
 208static inline uint64_t payload_advance64(uint8_t **payload)
 209{
 210    *payload += 8;
 211    return ldq_be_p(*payload - 8);
 212}
 213
 214static int nbd_parse_offset_hole_payload(NBDClientSession *client,
 215                                         NBDStructuredReplyChunk *chunk,
 216                                         uint8_t *payload, uint64_t orig_offset,
 217                                         QEMUIOVector *qiov, Error **errp)
 218{
 219    uint64_t offset;
 220    uint32_t hole_size;
 221
 222    if (chunk->length != sizeof(offset) + sizeof(hole_size)) {
 223        error_setg(errp, "Protocol error: invalid payload for "
 224                         "NBD_REPLY_TYPE_OFFSET_HOLE");
 225        return -EINVAL;
 226    }
 227
 228    offset = payload_advance64(&payload);
 229    hole_size = payload_advance32(&payload);
 230
 231    if (!hole_size || offset < orig_offset || hole_size > qiov->size ||
 232        offset > orig_offset + qiov->size - hole_size) {
 233        error_setg(errp, "Protocol error: server sent chunk exceeding requested"
 234                         " region");
 235        return -EINVAL;
 236    }
 237    if (client->info.min_block &&
 238        !QEMU_IS_ALIGNED(hole_size, client->info.min_block)) {
 239        trace_nbd_structured_read_compliance("hole");
 240    }
 241
 242    qemu_iovec_memset(qiov, offset - orig_offset, 0, hole_size);
 243
 244    return 0;
 245}
 246
 247/* nbd_parse_blockstatus_payload
 248 * Based on our request, we expect only one extent in reply, for the
 249 * base:allocation context.
 250 */
 251static int nbd_parse_blockstatus_payload(NBDClientSession *client,
 252                                         NBDStructuredReplyChunk *chunk,
 253                                         uint8_t *payload, uint64_t orig_length,
 254                                         NBDExtent *extent, Error **errp)
 255{
 256    uint32_t context_id;
 257
 258    /* The server succeeded, so it must have sent [at least] one extent */
 259    if (chunk->length < sizeof(context_id) + sizeof(*extent)) {
 260        error_setg(errp, "Protocol error: invalid payload for "
 261                         "NBD_REPLY_TYPE_BLOCK_STATUS");
 262        return -EINVAL;
 263    }
 264
 265    context_id = payload_advance32(&payload);
 266    if (client->info.context_id != context_id) {
 267        error_setg(errp, "Protocol error: unexpected context id %d for "
 268                         "NBD_REPLY_TYPE_BLOCK_STATUS, when negotiated context "
 269                         "id is %d", context_id,
 270                         client->info.context_id);
 271        return -EINVAL;
 272    }
 273
 274    extent->length = payload_advance32(&payload);
 275    extent->flags = payload_advance32(&payload);
 276
 277    if (extent->length == 0) {
 278        error_setg(errp, "Protocol error: server sent status chunk with "
 279                   "zero length");
 280        return -EINVAL;
 281    }
 282
 283    /*
 284     * A server sending unaligned block status is in violation of the
 285     * protocol, but as qemu-nbd 3.1 is such a server (at least for
 286     * POSIX files that are not a multiple of 512 bytes, since qemu
 287     * rounds files up to 512-byte multiples but lseek(SEEK_HOLE)
 288     * still sees an implicit hole beyond the real EOF), it's nicer to
 289     * work around the misbehaving server. If the request included
 290     * more than the final unaligned block, truncate it back to an
 291     * aligned result; if the request was only the final block, round
 292     * up to the full block and change the status to fully-allocated
 293     * (always a safe status, even if it loses information).
 294     */
 295    if (client->info.min_block && !QEMU_IS_ALIGNED(extent->length,
 296                                                   client->info.min_block)) {
 297        trace_nbd_parse_blockstatus_compliance("extent length is unaligned");
 298        if (extent->length > client->info.min_block) {
 299            extent->length = QEMU_ALIGN_DOWN(extent->length,
 300                                             client->info.min_block);
 301        } else {
 302            extent->length = client->info.min_block;
 303            extent->flags = 0;
 304        }
 305    }
 306
 307    /*
 308     * We used NBD_CMD_FLAG_REQ_ONE, so the server should not have
 309     * sent us any more than one extent, nor should it have included
 310     * status beyond our request in that extent. However, it's easy
 311     * enough to ignore the server's noncompliance without killing the
 312     * connection; just ignore trailing extents, and clamp things to
 313     * the length of our request.
 314     */
 315    if (chunk->length > sizeof(context_id) + sizeof(*extent)) {
 316        trace_nbd_parse_blockstatus_compliance("more than one extent");
 317    }
 318    if (extent->length > orig_length) {
 319        extent->length = orig_length;
 320        trace_nbd_parse_blockstatus_compliance("extent length too large");
 321    }
 322
 323    return 0;
 324}
 325
 326/* nbd_parse_error_payload
 327 * on success @errp contains message describing nbd error reply
 328 */
 329static int nbd_parse_error_payload(NBDStructuredReplyChunk *chunk,
 330                                   uint8_t *payload, int *request_ret,
 331                                   Error **errp)
 332{
 333    uint32_t error;
 334    uint16_t message_size;
 335
 336    assert(chunk->type & (1 << 15));
 337
 338    if (chunk->length < sizeof(error) + sizeof(message_size)) {
 339        error_setg(errp,
 340                   "Protocol error: invalid payload for structured error");
 341        return -EINVAL;
 342    }
 343
 344    error = nbd_errno_to_system_errno(payload_advance32(&payload));
 345    if (error == 0) {
 346        error_setg(errp, "Protocol error: server sent structured error chunk "
 347                         "with error = 0");
 348        return -EINVAL;
 349    }
 350
 351    *request_ret = -error;
 352    message_size = payload_advance16(&payload);
 353
 354    if (message_size > chunk->length - sizeof(error) - sizeof(message_size)) {
 355        error_setg(errp, "Protocol error: server sent structured error chunk "
 356                         "with incorrect message size");
 357        return -EINVAL;
 358    }
 359
 360    /* TODO: Add a trace point to mention the server complaint */
 361
 362    /* TODO handle ERROR_OFFSET */
 363
 364    return 0;
 365}
 366
 367static int nbd_co_receive_offset_data_payload(NBDClientSession *s,
 368                                              uint64_t orig_offset,
 369                                              QEMUIOVector *qiov, Error **errp)
 370{
 371    QEMUIOVector sub_qiov;
 372    uint64_t offset;
 373    size_t data_size;
 374    int ret;
 375    NBDStructuredReplyChunk *chunk = &s->reply.structured;
 376
 377    assert(nbd_reply_is_structured(&s->reply));
 378
 379    /* The NBD spec requires at least one byte of payload */
 380    if (chunk->length <= sizeof(offset)) {
 381        error_setg(errp, "Protocol error: invalid payload for "
 382                         "NBD_REPLY_TYPE_OFFSET_DATA");
 383        return -EINVAL;
 384    }
 385
 386    if (nbd_read64(s->ioc, &offset, "OFFSET_DATA offset", errp) < 0) {
 387        return -EIO;
 388    }
 389
 390    data_size = chunk->length - sizeof(offset);
 391    assert(data_size);
 392    if (offset < orig_offset || data_size > qiov->size ||
 393        offset > orig_offset + qiov->size - data_size) {
 394        error_setg(errp, "Protocol error: server sent chunk exceeding requested"
 395                         " region");
 396        return -EINVAL;
 397    }
 398    if (s->info.min_block && !QEMU_IS_ALIGNED(data_size, s->info.min_block)) {
 399        trace_nbd_structured_read_compliance("data");
 400    }
 401
 402    qemu_iovec_init(&sub_qiov, qiov->niov);
 403    qemu_iovec_concat(&sub_qiov, qiov, offset - orig_offset, data_size);
 404    ret = qio_channel_readv_all(s->ioc, sub_qiov.iov, sub_qiov.niov, errp);
 405    qemu_iovec_destroy(&sub_qiov);
 406
 407    return ret < 0 ? -EIO : 0;
 408}
 409
 410#define NBD_MAX_MALLOC_PAYLOAD 1000
 411/* nbd_co_receive_structured_payload
 412 */
 413static coroutine_fn int nbd_co_receive_structured_payload(
 414        NBDClientSession *s, void **payload, Error **errp)
 415{
 416    int ret;
 417    uint32_t len;
 418
 419    assert(nbd_reply_is_structured(&s->reply));
 420
 421    len = s->reply.structured.length;
 422
 423    if (len == 0) {
 424        return 0;
 425    }
 426
 427    if (payload == NULL) {
 428        error_setg(errp, "Unexpected structured payload");
 429        return -EINVAL;
 430    }
 431
 432    if (len > NBD_MAX_MALLOC_PAYLOAD) {
 433        error_setg(errp, "Payload too large");
 434        return -EINVAL;
 435    }
 436
 437    *payload = g_new(char, len);
 438    ret = nbd_read(s->ioc, *payload, len, "structured payload", errp);
 439    if (ret < 0) {
 440        g_free(*payload);
 441        *payload = NULL;
 442        return ret;
 443    }
 444
 445    return 0;
 446}
 447
 448/* nbd_co_do_receive_one_chunk
 449 * for simple reply:
 450 *   set request_ret to received reply error
 451 *   if qiov is not NULL: read payload to @qiov
 452 * for structured reply chunk:
 453 *   if error chunk: read payload, set @request_ret, do not set @payload
 454 *   else if offset_data chunk: read payload data to @qiov, do not set @payload
 455 *   else: read payload to @payload
 456 *
 457 * If function fails, @errp contains corresponding error message, and the
 458 * connection with the server is suspect.  If it returns 0, then the
 459 * transaction succeeded (although @request_ret may be a negative errno
 460 * corresponding to the server's error reply), and errp is unchanged.
 461 */
 462static coroutine_fn int nbd_co_do_receive_one_chunk(
 463        NBDClientSession *s, uint64_t handle, bool only_structured,
 464        int *request_ret, QEMUIOVector *qiov, void **payload, Error **errp)
 465{
 466    int ret;
 467    int i = HANDLE_TO_INDEX(s, handle);
 468    void *local_payload = NULL;
 469    NBDStructuredReplyChunk *chunk;
 470
 471    if (payload) {
 472        *payload = NULL;
 473    }
 474    *request_ret = 0;
 475
 476    /* Wait until we're woken up by nbd_connection_entry.  */
 477    s->requests[i].receiving = true;
 478    qemu_coroutine_yield();
 479    s->requests[i].receiving = false;
 480    if (s->quit) {
 481        error_setg(errp, "Connection closed");
 482        return -EIO;
 483    }
 484    assert(s->ioc);
 485
 486    assert(s->reply.handle == handle);
 487
 488    if (nbd_reply_is_simple(&s->reply)) {
 489        if (only_structured) {
 490            error_setg(errp, "Protocol error: simple reply when structured "
 491                             "reply chunk was expected");
 492            return -EINVAL;
 493        }
 494
 495        *request_ret = -nbd_errno_to_system_errno(s->reply.simple.error);
 496        if (*request_ret < 0 || !qiov) {
 497            return 0;
 498        }
 499
 500        return qio_channel_readv_all(s->ioc, qiov->iov, qiov->niov,
 501                                     errp) < 0 ? -EIO : 0;
 502    }
 503
 504    /* handle structured reply chunk */
 505    assert(s->info.structured_reply);
 506    chunk = &s->reply.structured;
 507
 508    if (chunk->type == NBD_REPLY_TYPE_NONE) {
 509        if (!(chunk->flags & NBD_REPLY_FLAG_DONE)) {
 510            error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk without"
 511                       " NBD_REPLY_FLAG_DONE flag set");
 512            return -EINVAL;
 513        }
 514        if (chunk->length) {
 515            error_setg(errp, "Protocol error: NBD_REPLY_TYPE_NONE chunk with"
 516                       " nonzero length");
 517            return -EINVAL;
 518        }
 519        return 0;
 520    }
 521
 522    if (chunk->type == NBD_REPLY_TYPE_OFFSET_DATA) {
 523        if (!qiov) {
 524            error_setg(errp, "Unexpected NBD_REPLY_TYPE_OFFSET_DATA chunk");
 525            return -EINVAL;
 526        }
 527
 528        return nbd_co_receive_offset_data_payload(s, s->requests[i].offset,
 529                                                  qiov, errp);
 530    }
 531
 532    if (nbd_reply_type_is_error(chunk->type)) {
 533        payload = &local_payload;
 534    }
 535
 536    ret = nbd_co_receive_structured_payload(s, payload, errp);
 537    if (ret < 0) {
 538        return ret;
 539    }
 540
 541    if (nbd_reply_type_is_error(chunk->type)) {
 542        ret = nbd_parse_error_payload(chunk, local_payload, request_ret, errp);
 543        g_free(local_payload);
 544        return ret;
 545    }
 546
 547    return 0;
 548}
 549
 550/* nbd_co_receive_one_chunk
 551 * Read reply, wake up connection_co and set s->quit if needed.
 552 * Return value is a fatal error code or normal nbd reply error code
 553 */
 554static coroutine_fn int nbd_co_receive_one_chunk(
 555        NBDClientSession *s, uint64_t handle, bool only_structured,
 556        int *request_ret, QEMUIOVector *qiov, NBDReply *reply, void **payload,
 557        Error **errp)
 558{
 559    int ret = nbd_co_do_receive_one_chunk(s, handle, only_structured,
 560                                          request_ret, qiov, payload, errp);
 561
 562    if (ret < 0) {
 563        s->quit = true;
 564    } else {
 565        /* For assert at loop start in nbd_connection_entry */
 566        if (reply) {
 567            *reply = s->reply;
 568        }
 569        s->reply.handle = 0;
 570    }
 571
 572    if (s->connection_co) {
 573        aio_co_wake(s->connection_co);
 574    }
 575
 576    return ret;
 577}
 578
 579typedef struct NBDReplyChunkIter {
 580    int ret;
 581    int request_ret;
 582    Error *err;
 583    bool done, only_structured;
 584} NBDReplyChunkIter;
 585
 586static void nbd_iter_channel_error(NBDReplyChunkIter *iter,
 587                                   int ret, Error **local_err)
 588{
 589    assert(ret < 0);
 590
 591    if (!iter->ret) {
 592        iter->ret = ret;
 593        error_propagate(&iter->err, *local_err);
 594    } else {
 595        error_free(*local_err);
 596    }
 597
 598    *local_err = NULL;
 599}
 600
 601static void nbd_iter_request_error(NBDReplyChunkIter *iter, int ret)
 602{
 603    assert(ret < 0);
 604
 605    if (!iter->request_ret) {
 606        iter->request_ret = ret;
 607    }
 608}
 609
 610/* NBD_FOREACH_REPLY_CHUNK
 611 */
 612#define NBD_FOREACH_REPLY_CHUNK(s, iter, handle, structured, \
 613                                qiov, reply, payload) \
 614    for (iter = (NBDReplyChunkIter) { .only_structured = structured }; \
 615         nbd_reply_chunk_iter_receive(s, &iter, handle, qiov, reply, payload);)
 616
 617/* nbd_reply_chunk_iter_receive
 618 */
 619static bool nbd_reply_chunk_iter_receive(NBDClientSession *s,
 620                                         NBDReplyChunkIter *iter,
 621                                         uint64_t handle,
 622                                         QEMUIOVector *qiov, NBDReply *reply,
 623                                         void **payload)
 624{
 625    int ret, request_ret;
 626    NBDReply local_reply;
 627    NBDStructuredReplyChunk *chunk;
 628    Error *local_err = NULL;
 629    if (s->quit) {
 630        error_setg(&local_err, "Connection closed");
 631        nbd_iter_channel_error(iter, -EIO, &local_err);
 632        goto break_loop;
 633    }
 634
 635    if (iter->done) {
 636        /* Previous iteration was last. */
 637        goto break_loop;
 638    }
 639
 640    if (reply == NULL) {
 641        reply = &local_reply;
 642    }
 643
 644    ret = nbd_co_receive_one_chunk(s, handle, iter->only_structured,
 645                                   &request_ret, qiov, reply, payload,
 646                                   &local_err);
 647    if (ret < 0) {
 648        nbd_iter_channel_error(iter, ret, &local_err);
 649    } else if (request_ret < 0) {
 650        nbd_iter_request_error(iter, request_ret);
 651    }
 652
 653    /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
 654    if (nbd_reply_is_simple(reply) || s->quit) {
 655        goto break_loop;
 656    }
 657
 658    chunk = &reply->structured;
 659    iter->only_structured = true;
 660
 661    if (chunk->type == NBD_REPLY_TYPE_NONE) {
 662        /* NBD_REPLY_FLAG_DONE is already checked in nbd_co_receive_one_chunk */
 663        assert(chunk->flags & NBD_REPLY_FLAG_DONE);
 664        goto break_loop;
 665    }
 666
 667    if (chunk->flags & NBD_REPLY_FLAG_DONE) {
 668        /* This iteration is last. */
 669        iter->done = true;
 670    }
 671
 672    /* Execute the loop body */
 673    return true;
 674
 675break_loop:
 676    s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
 677
 678    qemu_co_mutex_lock(&s->send_mutex);
 679    s->in_flight--;
 680    qemu_co_queue_next(&s->free_sema);
 681    qemu_co_mutex_unlock(&s->send_mutex);
 682
 683    return false;
 684}
 685
 686static int nbd_co_receive_return_code(NBDClientSession *s, uint64_t handle,
 687                                      int *request_ret, Error **errp)
 688{
 689    NBDReplyChunkIter iter;
 690
 691    NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, NULL, NULL) {
 692        /* nbd_reply_chunk_iter_receive does all the work */
 693    }
 694
 695    error_propagate(errp, iter.err);
 696    *request_ret = iter.request_ret;
 697    return iter.ret;
 698}
 699
 700static int nbd_co_receive_cmdread_reply(NBDClientSession *s, uint64_t handle,
 701                                        uint64_t offset, QEMUIOVector *qiov,
 702                                        int *request_ret, Error **errp)
 703{
 704    NBDReplyChunkIter iter;
 705    NBDReply reply;
 706    void *payload = NULL;
 707    Error *local_err = NULL;
 708
 709    NBD_FOREACH_REPLY_CHUNK(s, iter, handle, s->info.structured_reply,
 710                            qiov, &reply, &payload)
 711    {
 712        int ret;
 713        NBDStructuredReplyChunk *chunk = &reply.structured;
 714
 715        assert(nbd_reply_is_structured(&reply));
 716
 717        switch (chunk->type) {
 718        case NBD_REPLY_TYPE_OFFSET_DATA:
 719            /* special cased in nbd_co_receive_one_chunk, data is already
 720             * in qiov */
 721            break;
 722        case NBD_REPLY_TYPE_OFFSET_HOLE:
 723            ret = nbd_parse_offset_hole_payload(s, &reply.structured, payload,
 724                                                offset, qiov, &local_err);
 725            if (ret < 0) {
 726                s->quit = true;
 727                nbd_iter_channel_error(&iter, ret, &local_err);
 728            }
 729            break;
 730        default:
 731            if (!nbd_reply_type_is_error(chunk->type)) {
 732                /* not allowed reply type */
 733                s->quit = true;
 734                error_setg(&local_err,
 735                           "Unexpected reply type: %d (%s) for CMD_READ",
 736                           chunk->type, nbd_reply_type_lookup(chunk->type));
 737                nbd_iter_channel_error(&iter, -EINVAL, &local_err);
 738            }
 739        }
 740
 741        g_free(payload);
 742        payload = NULL;
 743    }
 744
 745    error_propagate(errp, iter.err);
 746    *request_ret = iter.request_ret;
 747    return iter.ret;
 748}
 749
 750static int nbd_co_receive_blockstatus_reply(NBDClientSession *s,
 751                                            uint64_t handle, uint64_t length,
 752                                            NBDExtent *extent,
 753                                            int *request_ret, Error **errp)
 754{
 755    NBDReplyChunkIter iter;
 756    NBDReply reply;
 757    void *payload = NULL;
 758    Error *local_err = NULL;
 759    bool received = false;
 760
 761    assert(!extent->length);
 762    NBD_FOREACH_REPLY_CHUNK(s, iter, handle, false, NULL, &reply, &payload) {
 763        int ret;
 764        NBDStructuredReplyChunk *chunk = &reply.structured;
 765
 766        assert(nbd_reply_is_structured(&reply));
 767
 768        switch (chunk->type) {
 769        case NBD_REPLY_TYPE_BLOCK_STATUS:
 770            if (received) {
 771                s->quit = true;
 772                error_setg(&local_err, "Several BLOCK_STATUS chunks in reply");
 773                nbd_iter_channel_error(&iter, -EINVAL, &local_err);
 774            }
 775            received = true;
 776
 777            ret = nbd_parse_blockstatus_payload(s, &reply.structured,
 778                                                payload, length, extent,
 779                                                &local_err);
 780            if (ret < 0) {
 781                s->quit = true;
 782                nbd_iter_channel_error(&iter, ret, &local_err);
 783            }
 784            break;
 785        default:
 786            if (!nbd_reply_type_is_error(chunk->type)) {
 787                s->quit = true;
 788                error_setg(&local_err,
 789                           "Unexpected reply type: %d (%s) "
 790                           "for CMD_BLOCK_STATUS",
 791                           chunk->type, nbd_reply_type_lookup(chunk->type));
 792                nbd_iter_channel_error(&iter, -EINVAL, &local_err);
 793            }
 794        }
 795
 796        g_free(payload);
 797        payload = NULL;
 798    }
 799
 800    if (!extent->length && !iter.request_ret) {
 801        error_setg(&local_err, "Server did not reply with any status extents");
 802        nbd_iter_channel_error(&iter, -EIO, &local_err);
 803    }
 804
 805    error_propagate(errp, iter.err);
 806    *request_ret = iter.request_ret;
 807    return iter.ret;
 808}
 809
 810static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
 811                          QEMUIOVector *write_qiov)
 812{
 813    int ret, request_ret;
 814    Error *local_err = NULL;
 815    NBDClientSession *client = nbd_get_client_session(bs);
 816
 817    assert(request->type != NBD_CMD_READ);
 818    if (write_qiov) {
 819        assert(request->type == NBD_CMD_WRITE);
 820        assert(request->len == iov_size(write_qiov->iov, write_qiov->niov));
 821    } else {
 822        assert(request->type != NBD_CMD_WRITE);
 823    }
 824    ret = nbd_co_send_request(bs, request, write_qiov);
 825    if (ret < 0) {
 826        return ret;
 827    }
 828
 829    ret = nbd_co_receive_return_code(client, request->handle,
 830                                     &request_ret, &local_err);
 831    if (local_err) {
 832        trace_nbd_co_request_fail(request->from, request->len, request->handle,
 833                                  request->flags, request->type,
 834                                  nbd_cmd_lookup(request->type),
 835                                  ret, error_get_pretty(local_err));
 836        error_free(local_err);
 837    }
 838    return ret ? ret : request_ret;
 839}
 840
 841int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
 842                         uint64_t bytes, QEMUIOVector *qiov, int flags)
 843{
 844    int ret, request_ret;
 845    Error *local_err = NULL;
 846    NBDClientSession *client = nbd_get_client_session(bs);
 847    NBDRequest request = {
 848        .type = NBD_CMD_READ,
 849        .from = offset,
 850        .len = bytes,
 851    };
 852
 853    assert(bytes <= NBD_MAX_BUFFER_SIZE);
 854    assert(!flags);
 855
 856    if (!bytes) {
 857        return 0;
 858    }
 859    /*
 860     * Work around the fact that the block layer doesn't do
 861     * byte-accurate sizing yet - if the read exceeds the server's
 862     * advertised size because the block layer rounded size up, then
 863     * truncate the request to the server and tail-pad with zero.
 864     */
 865    if (offset >= client->info.size) {
 866        assert(bytes < BDRV_SECTOR_SIZE);
 867        qemu_iovec_memset(qiov, 0, 0, bytes);
 868        return 0;
 869    }
 870    if (offset + bytes > client->info.size) {
 871        uint64_t slop = offset + bytes - client->info.size;
 872
 873        assert(slop < BDRV_SECTOR_SIZE);
 874        qemu_iovec_memset(qiov, bytes - slop, 0, slop);
 875        request.len -= slop;
 876    }
 877
 878    ret = nbd_co_send_request(bs, &request, NULL);
 879    if (ret < 0) {
 880        return ret;
 881    }
 882
 883    ret = nbd_co_receive_cmdread_reply(client, request.handle, offset, qiov,
 884                                       &request_ret, &local_err);
 885    if (local_err) {
 886        trace_nbd_co_request_fail(request.from, request.len, request.handle,
 887                                  request.flags, request.type,
 888                                  nbd_cmd_lookup(request.type),
 889                                  ret, error_get_pretty(local_err));
 890        error_free(local_err);
 891    }
 892    return ret ? ret : request_ret;
 893}
 894
 895int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
 896                          uint64_t bytes, QEMUIOVector *qiov, int flags)
 897{
 898    NBDClientSession *client = nbd_get_client_session(bs);
 899    NBDRequest request = {
 900        .type = NBD_CMD_WRITE,
 901        .from = offset,
 902        .len = bytes,
 903    };
 904
 905    assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
 906    if (flags & BDRV_REQ_FUA) {
 907        assert(client->info.flags & NBD_FLAG_SEND_FUA);
 908        request.flags |= NBD_CMD_FLAG_FUA;
 909    }
 910
 911    assert(bytes <= NBD_MAX_BUFFER_SIZE);
 912
 913    if (!bytes) {
 914        return 0;
 915    }
 916    return nbd_co_request(bs, &request, qiov);
 917}
 918
 919int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
 920                                int bytes, BdrvRequestFlags flags)
 921{
 922    NBDClientSession *client = nbd_get_client_session(bs);
 923    NBDRequest request = {
 924        .type = NBD_CMD_WRITE_ZEROES,
 925        .from = offset,
 926        .len = bytes,
 927    };
 928
 929    assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
 930    if (!(client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES)) {
 931        return -ENOTSUP;
 932    }
 933
 934    if (flags & BDRV_REQ_FUA) {
 935        assert(client->info.flags & NBD_FLAG_SEND_FUA);
 936        request.flags |= NBD_CMD_FLAG_FUA;
 937    }
 938    if (!(flags & BDRV_REQ_MAY_UNMAP)) {
 939        request.flags |= NBD_CMD_FLAG_NO_HOLE;
 940    }
 941
 942    if (!bytes) {
 943        return 0;
 944    }
 945    return nbd_co_request(bs, &request, NULL);
 946}
 947
 948int nbd_client_co_flush(BlockDriverState *bs)
 949{
 950    NBDClientSession *client = nbd_get_client_session(bs);
 951    NBDRequest request = { .type = NBD_CMD_FLUSH };
 952
 953    if (!(client->info.flags & NBD_FLAG_SEND_FLUSH)) {
 954        return 0;
 955    }
 956
 957    request.from = 0;
 958    request.len = 0;
 959
 960    return nbd_co_request(bs, &request, NULL);
 961}
 962
 963int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int bytes)
 964{
 965    NBDClientSession *client = nbd_get_client_session(bs);
 966    NBDRequest request = {
 967        .type = NBD_CMD_TRIM,
 968        .from = offset,
 969        .len = bytes,
 970    };
 971
 972    assert(!(client->info.flags & NBD_FLAG_READ_ONLY));
 973    if (!(client->info.flags & NBD_FLAG_SEND_TRIM) || !bytes) {
 974        return 0;
 975    }
 976
 977    return nbd_co_request(bs, &request, NULL);
 978}
 979
 980int coroutine_fn nbd_client_co_block_status(BlockDriverState *bs,
 981                                            bool want_zero,
 982                                            int64_t offset, int64_t bytes,
 983                                            int64_t *pnum, int64_t *map,
 984                                            BlockDriverState **file)
 985{
 986    int ret, request_ret;
 987    NBDExtent extent = { 0 };
 988    NBDClientSession *client = nbd_get_client_session(bs);
 989    Error *local_err = NULL;
 990
 991    NBDRequest request = {
 992        .type = NBD_CMD_BLOCK_STATUS,
 993        .from = offset,
 994        .len = MIN(MIN_NON_ZERO(QEMU_ALIGN_DOWN(INT_MAX,
 995                                                bs->bl.request_alignment),
 996                                client->info.max_block),
 997                   MIN(bytes, client->info.size - offset)),
 998        .flags = NBD_CMD_FLAG_REQ_ONE,
 999    };
1000
1001    if (!client->info.base_allocation) {
1002        *pnum = bytes;
1003        *map = offset;
1004        *file = bs;
1005        return BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID;
1006    }
1007
1008    /*
1009     * Work around the fact that the block layer doesn't do
1010     * byte-accurate sizing yet - if the status request exceeds the
1011     * server's advertised size because the block layer rounded size
1012     * up, we truncated the request to the server (above), or are
1013     * called on just the hole.
1014     */
1015    if (offset >= client->info.size) {
1016        *pnum = bytes;
1017        assert(bytes < BDRV_SECTOR_SIZE);
1018        /* Intentionally don't report offset_valid for the hole */
1019        return BDRV_BLOCK_ZERO;
1020    }
1021
1022    if (client->info.min_block) {
1023        assert(QEMU_IS_ALIGNED(request.len, client->info.min_block));
1024    }
1025    ret = nbd_co_send_request(bs, &request, NULL);
1026    if (ret < 0) {
1027        return ret;
1028    }
1029
1030    ret = nbd_co_receive_blockstatus_reply(client, request.handle, bytes,
1031                                           &extent, &request_ret, &local_err);
1032    if (local_err) {
1033        trace_nbd_co_request_fail(request.from, request.len, request.handle,
1034                                  request.flags, request.type,
1035                                  nbd_cmd_lookup(request.type),
1036                                  ret, error_get_pretty(local_err));
1037        error_free(local_err);
1038    }
1039    if (ret < 0 || request_ret < 0) {
1040        return ret ? ret : request_ret;
1041    }
1042
1043    assert(extent.length);
1044    *pnum = extent.length;
1045    *map = offset;
1046    *file = bs;
1047    return (extent.flags & NBD_STATE_HOLE ? 0 : BDRV_BLOCK_DATA) |
1048        (extent.flags & NBD_STATE_ZERO ? BDRV_BLOCK_ZERO : 0) |
1049        BDRV_BLOCK_OFFSET_VALID;
1050}
1051
1052void nbd_client_detach_aio_context(BlockDriverState *bs)
1053{
1054    NBDClientSession *client = nbd_get_client_session(bs);
1055    qio_channel_detach_aio_context(QIO_CHANNEL(client->ioc));
1056}
1057
1058static void nbd_client_attach_aio_context_bh(void *opaque)
1059{
1060    BlockDriverState *bs = opaque;
1061    NBDClientSession *client = nbd_get_client_session(bs);
1062
1063    /* The node is still drained, so we know the coroutine has yielded in
1064     * nbd_read_eof(), the only place where bs->in_flight can reach 0, or it is
1065     * entered for the first time. Both places are safe for entering the
1066     * coroutine.*/
1067    qemu_aio_coroutine_enter(bs->aio_context, client->connection_co);
1068    bdrv_dec_in_flight(bs);
1069}
1070
1071void nbd_client_attach_aio_context(BlockDriverState *bs,
1072                                   AioContext *new_context)
1073{
1074    NBDClientSession *client = nbd_get_client_session(bs);
1075    qio_channel_attach_aio_context(QIO_CHANNEL(client->ioc), new_context);
1076
1077    bdrv_inc_in_flight(bs);
1078
1079    /* Need to wait here for the BH to run because the BH must run while the
1080     * node is still drained. */
1081    aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs);
1082}
1083
1084void nbd_client_close(BlockDriverState *bs)
1085{
1086    NBDClientSession *client = nbd_get_client_session(bs);
1087    NBDRequest request = { .type = NBD_CMD_DISC };
1088
1089    assert(client->ioc);
1090
1091    nbd_send_request(client->ioc, &request);
1092
1093    nbd_teardown_connection(bs);
1094}
1095
1096static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
1097                                                  Error **errp)
1098{
1099    QIOChannelSocket *sioc;
1100    Error *local_err = NULL;
1101
1102    sioc = qio_channel_socket_new();
1103    qio_channel_set_name(QIO_CHANNEL(sioc), "nbd-client");
1104
1105    qio_channel_socket_connect_sync(sioc, saddr, &local_err);
1106    if (local_err) {
1107        object_unref(OBJECT(sioc));
1108        error_propagate(errp, local_err);
1109        return NULL;
1110    }
1111
1112    qio_channel_set_delay(QIO_CHANNEL(sioc), false);
1113
1114    return sioc;
1115}
1116
1117static int nbd_client_connect(BlockDriverState *bs,
1118                              SocketAddress *saddr,
1119                              const char *export,
1120                              QCryptoTLSCreds *tlscreds,
1121                              const char *hostname,
1122                              const char *x_dirty_bitmap,
1123                              Error **errp)
1124{
1125    NBDClientSession *client = nbd_get_client_session(bs);
1126    int ret;
1127
1128    /*
1129     * establish TCP connection, return error if it fails
1130     * TODO: Configurable retry-until-timeout behaviour.
1131     */
1132    QIOChannelSocket *sioc = nbd_establish_connection(saddr, errp);
1133
1134    if (!sioc) {
1135        return -ECONNREFUSED;
1136    }
1137
1138    /* NBD handshake */
1139    logout("session init %s\n", export);
1140    qio_channel_set_blocking(QIO_CHANNEL(sioc), true, NULL);
1141
1142    client->info.request_sizes = true;
1143    client->info.structured_reply = true;
1144    client->info.base_allocation = true;
1145    client->info.x_dirty_bitmap = g_strdup(x_dirty_bitmap);
1146    client->info.name = g_strdup(export ?: "");
1147    ret = nbd_receive_negotiate(QIO_CHANNEL(sioc), tlscreds, hostname,
1148                                &client->ioc, &client->info, errp);
1149    g_free(client->info.x_dirty_bitmap);
1150    g_free(client->info.name);
1151    if (ret < 0) {
1152        logout("Failed to negotiate with the NBD server\n");
1153        object_unref(OBJECT(sioc));
1154        return ret;
1155    }
1156    if (x_dirty_bitmap && !client->info.base_allocation) {
1157        error_setg(errp, "requested x-dirty-bitmap %s not found",
1158                   x_dirty_bitmap);
1159        ret = -EINVAL;
1160        goto fail;
1161    }
1162    if (client->info.flags & NBD_FLAG_READ_ONLY) {
1163        ret = bdrv_apply_auto_read_only(bs, "NBD export is read-only", errp);
1164        if (ret < 0) {
1165            goto fail;
1166        }
1167    }
1168    if (client->info.flags & NBD_FLAG_SEND_FUA) {
1169        bs->supported_write_flags = BDRV_REQ_FUA;
1170        bs->supported_zero_flags |= BDRV_REQ_FUA;
1171    }
1172    if (client->info.flags & NBD_FLAG_SEND_WRITE_ZEROES) {
1173        bs->supported_zero_flags |= BDRV_REQ_MAY_UNMAP;
1174    }
1175
1176    client->sioc = sioc;
1177
1178    if (!client->ioc) {
1179        client->ioc = QIO_CHANNEL(sioc);
1180        object_ref(OBJECT(client->ioc));
1181    }
1182
1183    /* Now that we're connected, set the socket to be non-blocking and
1184     * kick the reply mechanism.  */
1185    qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
1186    client->connection_co = qemu_coroutine_create(nbd_connection_entry, client);
1187    bdrv_inc_in_flight(bs);
1188    nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
1189
1190    logout("Established connection with NBD server\n");
1191    return 0;
1192
1193 fail:
1194    /*
1195     * We have connected, but must fail for other reasons. The
1196     * connection is still blocking; send NBD_CMD_DISC as a courtesy
1197     * to the server.
1198     */
1199    {
1200        NBDRequest request = { .type = NBD_CMD_DISC };
1201
1202        nbd_send_request(client->ioc ?: QIO_CHANNEL(sioc), &request);
1203
1204        object_unref(OBJECT(sioc));
1205
1206        return ret;
1207    }
1208}
1209
1210int nbd_client_init(BlockDriverState *bs,
1211                    SocketAddress *saddr,
1212                    const char *export,
1213                    QCryptoTLSCreds *tlscreds,
1214                    const char *hostname,
1215                    const char *x_dirty_bitmap,
1216                    Error **errp)
1217{
1218    NBDClientSession *client = nbd_get_client_session(bs);
1219
1220    client->bs = bs;
1221    qemu_co_mutex_init(&client->send_mutex);
1222    qemu_co_queue_init(&client->free_sema);
1223
1224    return nbd_client_connect(bs, saddr, export, tlscreds, hostname,
1225                              x_dirty_bitmap, errp);
1226}
1227