qemu/block/sheepdog.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
   3 *
   4 * This program is free software; you can redistribute it and/or
   5 * modify it under the terms of the GNU General Public License version
   6 * 2 as published by the Free Software Foundation.
   7 *
   8 * You should have received a copy of the GNU General Public License
   9 * along with this program. If not, see <http://www.gnu.org/licenses/>.
  10 *
  11 * Contributions after 2012-01-13 are licensed under the terms of the
  12 * GNU GPL, version 2 or (at your option) any later version.
  13 */
  14
  15#include "qemu-common.h"
  16#include "qemu/uri.h"
  17#include "qemu/error-report.h"
  18#include "qemu/sockets.h"
  19#include "block/block_int.h"
  20#include "qemu/bitops.h"
  21
  22#define SD_PROTO_VER 0x01
  23
  24#define SD_DEFAULT_ADDR "localhost"
  25#define SD_DEFAULT_PORT 7000
  26
  27#define SD_OP_CREATE_AND_WRITE_OBJ  0x01
  28#define SD_OP_READ_OBJ       0x02
  29#define SD_OP_WRITE_OBJ      0x03
  30/* 0x04 is used internally by Sheepdog */
  31#define SD_OP_DISCARD_OBJ    0x05
  32
  33#define SD_OP_NEW_VDI        0x11
  34#define SD_OP_LOCK_VDI       0x12
  35#define SD_OP_RELEASE_VDI    0x13
  36#define SD_OP_GET_VDI_INFO   0x14
  37#define SD_OP_READ_VDIS      0x15
  38#define SD_OP_FLUSH_VDI      0x16
  39#define SD_OP_DEL_VDI        0x17
  40
  41#define SD_FLAG_CMD_WRITE    0x01
  42#define SD_FLAG_CMD_COW      0x02
  43#define SD_FLAG_CMD_CACHE    0x04 /* Writeback mode for cache */
  44#define SD_FLAG_CMD_DIRECT   0x08 /* Don't use cache */
  45
  46#define SD_RES_SUCCESS       0x00 /* Success */
  47#define SD_RES_UNKNOWN       0x01 /* Unknown error */
  48#define SD_RES_NO_OBJ        0x02 /* No object found */
  49#define SD_RES_EIO           0x03 /* I/O error */
  50#define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
  51#define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
  52#define SD_RES_SYSTEM_ERROR  0x06 /* System error */
  53#define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
  54#define SD_RES_NO_VDI        0x08 /* No vdi found */
  55#define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
  56#define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
  57#define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
  58#define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
  59#define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
  60#define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
  61#define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
  62#define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
  63#define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
  64#define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
  65#define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
  66#define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
  67#define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
  68#define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
  69#define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
  70#define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
  71#define SD_RES_HALT          0x19 /* Sheepdog is stopped serving IO request */
  72#define SD_RES_READONLY      0x1A /* Object is read-only */
  73
  74/*
  75 * Object ID rules
  76 *
  77 *  0 - 19 (20 bits): data object space
  78 * 20 - 31 (12 bits): reserved data object space
  79 * 32 - 55 (24 bits): vdi object space
  80 * 56 - 59 ( 4 bits): reserved vdi object space
  81 * 60 - 63 ( 4 bits): object type identifier space
  82 */
  83
  84#define VDI_SPACE_SHIFT   32
  85#define VDI_BIT (UINT64_C(1) << 63)
  86#define VMSTATE_BIT (UINT64_C(1) << 62)
  87#define MAX_DATA_OBJS (UINT64_C(1) << 20)
  88#define MAX_CHILDREN 1024
  89#define SD_MAX_VDI_LEN 256
  90#define SD_MAX_VDI_TAG_LEN 256
  91#define SD_NR_VDIS   (1U << 24)
  92#define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
  93#define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
  94
  95#define SD_INODE_SIZE (sizeof(SheepdogInode))
  96#define CURRENT_VDI_ID 0
  97
  98typedef struct SheepdogReq {
  99    uint8_t proto_ver;
 100    uint8_t opcode;
 101    uint16_t flags;
 102    uint32_t epoch;
 103    uint32_t id;
 104    uint32_t data_length;
 105    uint32_t opcode_specific[8];
 106} SheepdogReq;
 107
 108typedef struct SheepdogRsp {
 109    uint8_t proto_ver;
 110    uint8_t opcode;
 111    uint16_t flags;
 112    uint32_t epoch;
 113    uint32_t id;
 114    uint32_t data_length;
 115    uint32_t result;
 116    uint32_t opcode_specific[7];
 117} SheepdogRsp;
 118
 119typedef struct SheepdogObjReq {
 120    uint8_t proto_ver;
 121    uint8_t opcode;
 122    uint16_t flags;
 123    uint32_t epoch;
 124    uint32_t id;
 125    uint32_t data_length;
 126    uint64_t oid;
 127    uint64_t cow_oid;
 128    uint8_t copies;
 129    uint8_t copy_policy;
 130    uint8_t reserved[6];
 131    uint64_t offset;
 132} SheepdogObjReq;
 133
 134typedef struct SheepdogObjRsp {
 135    uint8_t proto_ver;
 136    uint8_t opcode;
 137    uint16_t flags;
 138    uint32_t epoch;
 139    uint32_t id;
 140    uint32_t data_length;
 141    uint32_t result;
 142    uint8_t copies;
 143    uint8_t copy_policy;
 144    uint8_t reserved[2];
 145    uint32_t pad[6];
 146} SheepdogObjRsp;
 147
 148typedef struct SheepdogVdiReq {
 149    uint8_t proto_ver;
 150    uint8_t opcode;
 151    uint16_t flags;
 152    uint32_t epoch;
 153    uint32_t id;
 154    uint32_t data_length;
 155    uint64_t vdi_size;
 156    uint32_t vdi_id;
 157    uint8_t copies;
 158    uint8_t copy_policy;
 159    uint8_t reserved[2];
 160    uint32_t snapid;
 161    uint32_t pad[3];
 162} SheepdogVdiReq;
 163
 164typedef struct SheepdogVdiRsp {
 165    uint8_t proto_ver;
 166    uint8_t opcode;
 167    uint16_t flags;
 168    uint32_t epoch;
 169    uint32_t id;
 170    uint32_t data_length;
 171    uint32_t result;
 172    uint32_t rsvd;
 173    uint32_t vdi_id;
 174    uint32_t pad[5];
 175} SheepdogVdiRsp;
 176
 177typedef struct SheepdogInode {
 178    char name[SD_MAX_VDI_LEN];
 179    char tag[SD_MAX_VDI_TAG_LEN];
 180    uint64_t ctime;
 181    uint64_t snap_ctime;
 182    uint64_t vm_clock_nsec;
 183    uint64_t vdi_size;
 184    uint64_t vm_state_size;
 185    uint16_t copy_policy;
 186    uint8_t nr_copies;
 187    uint8_t block_size_shift;
 188    uint32_t snap_id;
 189    uint32_t vdi_id;
 190    uint32_t parent_vdi_id;
 191    uint32_t child_vdi_id[MAX_CHILDREN];
 192    uint32_t data_vdi_id[MAX_DATA_OBJS];
 193} SheepdogInode;
 194
 195/*
 196 * 64 bit FNV-1a non-zero initial basis
 197 */
 198#define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
 199
 200/*
 201 * 64 bit Fowler/Noll/Vo FNV-1a hash code
 202 */
 203static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
 204{
 205    unsigned char *bp = buf;
 206    unsigned char *be = bp + len;
 207    while (bp < be) {
 208        hval ^= (uint64_t) *bp++;
 209        hval += (hval << 1) + (hval << 4) + (hval << 5) +
 210            (hval << 7) + (hval << 8) + (hval << 40);
 211    }
 212    return hval;
 213}
 214
 215static inline bool is_data_obj_writable(SheepdogInode *inode, unsigned int idx)
 216{
 217    return inode->vdi_id == inode->data_vdi_id[idx];
 218}
 219
 220static inline bool is_data_obj(uint64_t oid)
 221{
 222    return !(VDI_BIT & oid);
 223}
 224
 225static inline uint64_t data_oid_to_idx(uint64_t oid)
 226{
 227    return oid & (MAX_DATA_OBJS - 1);
 228}
 229
 230static inline uint32_t oid_to_vid(uint64_t oid)
 231{
 232    return (oid & ~VDI_BIT) >> VDI_SPACE_SHIFT;
 233}
 234
 235static inline uint64_t vid_to_vdi_oid(uint32_t vid)
 236{
 237    return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
 238}
 239
 240static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
 241{
 242    return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
 243}
 244
 245static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
 246{
 247    return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
 248}
 249
 250static inline bool is_snapshot(struct SheepdogInode *inode)
 251{
 252    return !!inode->snap_ctime;
 253}
 254
 255#undef DPRINTF
 256#ifdef DEBUG_SDOG
 257#define DPRINTF(fmt, args...)                                       \
 258    do {                                                            \
 259        fprintf(stdout, "%s %d: " fmt, __func__, __LINE__, ##args); \
 260    } while (0)
 261#else
 262#define DPRINTF(fmt, args...)
 263#endif
 264
 265typedef struct SheepdogAIOCB SheepdogAIOCB;
 266
 267typedef struct AIOReq {
 268    SheepdogAIOCB *aiocb;
 269    unsigned int iov_offset;
 270
 271    uint64_t oid;
 272    uint64_t base_oid;
 273    uint64_t offset;
 274    unsigned int data_len;
 275    uint8_t flags;
 276    uint32_t id;
 277
 278    QLIST_ENTRY(AIOReq) aio_siblings;
 279} AIOReq;
 280
 281enum AIOCBState {
 282    AIOCB_WRITE_UDATA,
 283    AIOCB_READ_UDATA,
 284    AIOCB_FLUSH_CACHE,
 285    AIOCB_DISCARD_OBJ,
 286};
 287
 288struct SheepdogAIOCB {
 289    BlockDriverAIOCB common;
 290
 291    QEMUIOVector *qiov;
 292
 293    int64_t sector_num;
 294    int nb_sectors;
 295
 296    int ret;
 297    enum AIOCBState aiocb_type;
 298
 299    Coroutine *coroutine;
 300    void (*aio_done_func)(SheepdogAIOCB *);
 301
 302    bool cancelable;
 303    bool *finished;
 304    int nr_pending;
 305};
 306
 307typedef struct BDRVSheepdogState {
 308    BlockDriverState *bs;
 309
 310    SheepdogInode inode;
 311
 312    uint32_t min_dirty_data_idx;
 313    uint32_t max_dirty_data_idx;
 314
 315    char name[SD_MAX_VDI_LEN];
 316    bool is_snapshot;
 317    uint32_t cache_flags;
 318    bool discard_supported;
 319
 320    char *host_spec;
 321    bool is_unix;
 322    int fd;
 323
 324    CoMutex lock;
 325    Coroutine *co_send;
 326    Coroutine *co_recv;
 327
 328    uint32_t aioreq_seq_num;
 329
 330    /* Every aio request must be linked to either of these queues. */
 331    QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
 332    QLIST_HEAD(pending_aio_head, AIOReq) pending_aio_head;
 333    QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head;
 334} BDRVSheepdogState;
 335
 336static const char * sd_strerror(int err)
 337{
 338    int i;
 339
 340    static const struct {
 341        int err;
 342        const char *desc;
 343    } errors[] = {
 344        {SD_RES_SUCCESS, "Success"},
 345        {SD_RES_UNKNOWN, "Unknown error"},
 346        {SD_RES_NO_OBJ, "No object found"},
 347        {SD_RES_EIO, "I/O error"},
 348        {SD_RES_VDI_EXIST, "VDI exists already"},
 349        {SD_RES_INVALID_PARMS, "Invalid parameters"},
 350        {SD_RES_SYSTEM_ERROR, "System error"},
 351        {SD_RES_VDI_LOCKED, "VDI is already locked"},
 352        {SD_RES_NO_VDI, "No vdi found"},
 353        {SD_RES_NO_BASE_VDI, "No base VDI found"},
 354        {SD_RES_VDI_READ, "Failed read the requested VDI"},
 355        {SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
 356        {SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
 357        {SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
 358        {SD_RES_NO_TAG, "Failed to find the requested tag"},
 359        {SD_RES_STARTUP, "The system is still booting"},
 360        {SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
 361        {SD_RES_SHUTDOWN, "The system is shutting down"},
 362        {SD_RES_NO_MEM, "Out of memory on the server"},
 363        {SD_RES_FULL_VDI, "We already have the maximum vdis"},
 364        {SD_RES_VER_MISMATCH, "Protocol version mismatch"},
 365        {SD_RES_NO_SPACE, "Server has no space for new objects"},
 366        {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
 367        {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
 368        {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
 369        {SD_RES_HALT, "Sheepdog is stopped serving IO request"},
 370        {SD_RES_READONLY, "Object is read-only"},
 371    };
 372
 373    for (i = 0; i < ARRAY_SIZE(errors); ++i) {
 374        if (errors[i].err == err) {
 375            return errors[i].desc;
 376        }
 377    }
 378
 379    return "Invalid error code";
 380}
 381
 382/*
 383 * Sheepdog I/O handling:
 384 *
 385 * 1. In sd_co_rw_vector, we send the I/O requests to the server and
 386 *    link the requests to the inflight_list in the
 387 *    BDRVSheepdogState.  The function exits without waiting for
 388 *    receiving the response.
 389 *
 390 * 2. We receive the response in aio_read_response, the fd handler to
 391 *    the sheepdog connection.  If metadata update is needed, we send
 392 *    the write request to the vdi object in sd_write_done, the write
 393 *    completion function.  We switch back to sd_co_readv/writev after
 394 *    all the requests belonging to the AIOCB are finished.
 395 */
 396
 397static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
 398                                    uint64_t oid, unsigned int data_len,
 399                                    uint64_t offset, uint8_t flags,
 400                                    uint64_t base_oid, unsigned int iov_offset)
 401{
 402    AIOReq *aio_req;
 403
 404    aio_req = g_malloc(sizeof(*aio_req));
 405    aio_req->aiocb = acb;
 406    aio_req->iov_offset = iov_offset;
 407    aio_req->oid = oid;
 408    aio_req->base_oid = base_oid;
 409    aio_req->offset = offset;
 410    aio_req->data_len = data_len;
 411    aio_req->flags = flags;
 412    aio_req->id = s->aioreq_seq_num++;
 413
 414    acb->nr_pending++;
 415    return aio_req;
 416}
 417
 418static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
 419{
 420    SheepdogAIOCB *acb = aio_req->aiocb;
 421
 422    acb->cancelable = false;
 423    QLIST_REMOVE(aio_req, aio_siblings);
 424    g_free(aio_req);
 425
 426    acb->nr_pending--;
 427}
 428
 429static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
 430{
 431    qemu_coroutine_enter(acb->coroutine, NULL);
 432    if (acb->finished) {
 433        *acb->finished = true;
 434    }
 435    qemu_aio_release(acb);
 436}
 437
 438/*
 439 * Check whether the specified acb can be canceled
 440 *
 441 * We can cancel aio when any request belonging to the acb is:
 442 *  - Not processed by the sheepdog server.
 443 *  - Not linked to the inflight queue.
 444 */
 445static bool sd_acb_cancelable(const SheepdogAIOCB *acb)
 446{
 447    BDRVSheepdogState *s = acb->common.bs->opaque;
 448    AIOReq *aioreq;
 449
 450    if (!acb->cancelable) {
 451        return false;
 452    }
 453
 454    QLIST_FOREACH(aioreq, &s->inflight_aio_head, aio_siblings) {
 455        if (aioreq->aiocb == acb) {
 456            return false;
 457        }
 458    }
 459
 460    return true;
 461}
 462
 463static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
 464{
 465    SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
 466    BDRVSheepdogState *s = acb->common.bs->opaque;
 467    AIOReq *aioreq, *next;
 468    bool finished = false;
 469
 470    acb->finished = &finished;
 471    while (!finished) {
 472        if (sd_acb_cancelable(acb)) {
 473            /* Remove outstanding requests from pending and failed queues.  */
 474            QLIST_FOREACH_SAFE(aioreq, &s->pending_aio_head, aio_siblings,
 475                               next) {
 476                if (aioreq->aiocb == acb) {
 477                    free_aio_req(s, aioreq);
 478                }
 479            }
 480            QLIST_FOREACH_SAFE(aioreq, &s->failed_aio_head, aio_siblings,
 481                               next) {
 482                if (aioreq->aiocb == acb) {
 483                    free_aio_req(s, aioreq);
 484                }
 485            }
 486
 487            assert(acb->nr_pending == 0);
 488            sd_finish_aiocb(acb);
 489            return;
 490        }
 491        qemu_aio_wait();
 492    }
 493}
 494
 495static const AIOCBInfo sd_aiocb_info = {
 496    .aiocb_size = sizeof(SheepdogAIOCB),
 497    .cancel = sd_aio_cancel,
 498};
 499
 500static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
 501                                   int64_t sector_num, int nb_sectors)
 502{
 503    SheepdogAIOCB *acb;
 504
 505    acb = qemu_aio_get(&sd_aiocb_info, bs, NULL, NULL);
 506
 507    acb->qiov = qiov;
 508
 509    acb->sector_num = sector_num;
 510    acb->nb_sectors = nb_sectors;
 511
 512    acb->aio_done_func = NULL;
 513    acb->cancelable = true;
 514    acb->finished = NULL;
 515    acb->coroutine = qemu_coroutine_self();
 516    acb->ret = 0;
 517    acb->nr_pending = 0;
 518    return acb;
 519}
 520
 521static int connect_to_sdog(BDRVSheepdogState *s)
 522{
 523    int fd;
 524    Error *err = NULL;
 525
 526    if (s->is_unix) {
 527        fd = unix_connect(s->host_spec, &err);
 528    } else {
 529        fd = inet_connect(s->host_spec, &err);
 530
 531        if (err == NULL) {
 532            int ret = socket_set_nodelay(fd);
 533            if (ret < 0) {
 534                error_report("%s", strerror(errno));
 535            }
 536        }
 537    }
 538
 539    if (err != NULL) {
 540        qerror_report_err(err);
 541        error_free(err);
 542    } else {
 543        qemu_set_nonblock(fd);
 544    }
 545
 546    return fd;
 547}
 548
 549static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
 550                                    unsigned int *wlen)
 551{
 552    int ret;
 553
 554    ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
 555    if (ret != sizeof(*hdr)) {
 556        error_report("failed to send a req, %s", strerror(errno));
 557        return ret;
 558    }
 559
 560    ret = qemu_co_send(sockfd, data, *wlen);
 561    if (ret != *wlen) {
 562        error_report("failed to send a req, %s", strerror(errno));
 563    }
 564
 565    return ret;
 566}
 567
 568static void restart_co_req(void *opaque)
 569{
 570    Coroutine *co = opaque;
 571
 572    qemu_coroutine_enter(co, NULL);
 573}
 574
 575typedef struct SheepdogReqCo {
 576    int sockfd;
 577    SheepdogReq *hdr;
 578    void *data;
 579    unsigned int *wlen;
 580    unsigned int *rlen;
 581    int ret;
 582    bool finished;
 583} SheepdogReqCo;
 584
 585static coroutine_fn void do_co_req(void *opaque)
 586{
 587    int ret;
 588    Coroutine *co;
 589    SheepdogReqCo *srco = opaque;
 590    int sockfd = srco->sockfd;
 591    SheepdogReq *hdr = srco->hdr;
 592    void *data = srco->data;
 593    unsigned int *wlen = srco->wlen;
 594    unsigned int *rlen = srco->rlen;
 595
 596    co = qemu_coroutine_self();
 597    qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, co);
 598
 599    ret = send_co_req(sockfd, hdr, data, wlen);
 600    if (ret < 0) {
 601        goto out;
 602    }
 603
 604    qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, co);
 605
 606    ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
 607    if (ret != sizeof(*hdr)) {
 608        error_report("failed to get a rsp, %s", strerror(errno));
 609        ret = -errno;
 610        goto out;
 611    }
 612
 613    if (*rlen > hdr->data_length) {
 614        *rlen = hdr->data_length;
 615    }
 616
 617    if (*rlen) {
 618        ret = qemu_co_recv(sockfd, data, *rlen);
 619        if (ret != *rlen) {
 620            error_report("failed to get the data, %s", strerror(errno));
 621            ret = -errno;
 622            goto out;
 623        }
 624    }
 625    ret = 0;
 626out:
 627    /* there is at most one request for this sockfd, so it is safe to
 628     * set each handler to NULL. */
 629    qemu_aio_set_fd_handler(sockfd, NULL, NULL, NULL);
 630
 631    srco->ret = ret;
 632    srco->finished = true;
 633}
 634
 635static int do_req(int sockfd, SheepdogReq *hdr, void *data,
 636                  unsigned int *wlen, unsigned int *rlen)
 637{
 638    Coroutine *co;
 639    SheepdogReqCo srco = {
 640        .sockfd = sockfd,
 641        .hdr = hdr,
 642        .data = data,
 643        .wlen = wlen,
 644        .rlen = rlen,
 645        .ret = 0,
 646        .finished = false,
 647    };
 648
 649    if (qemu_in_coroutine()) {
 650        do_co_req(&srco);
 651    } else {
 652        co = qemu_coroutine_create(do_co_req);
 653        qemu_coroutine_enter(co, &srco);
 654        while (!srco.finished) {
 655            qemu_aio_wait();
 656        }
 657    }
 658
 659    return srco.ret;
 660}
 661
 662static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
 663                           struct iovec *iov, int niov, bool create,
 664                           enum AIOCBState aiocb_type);
 665static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
 666static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag);
 667static int get_sheep_fd(BDRVSheepdogState *s);
 668static void co_write_request(void *opaque);
 669
 670static AIOReq *find_pending_req(BDRVSheepdogState *s, uint64_t oid)
 671{
 672    AIOReq *aio_req;
 673
 674    QLIST_FOREACH(aio_req, &s->pending_aio_head, aio_siblings) {
 675        if (aio_req->oid == oid) {
 676            return aio_req;
 677        }
 678    }
 679
 680    return NULL;
 681}
 682
 683/*
 684 * This function searchs pending requests to the object `oid', and
 685 * sends them.
 686 */
 687static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid)
 688{
 689    AIOReq *aio_req;
 690    SheepdogAIOCB *acb;
 691
 692    while ((aio_req = find_pending_req(s, oid)) != NULL) {
 693        acb = aio_req->aiocb;
 694        /* move aio_req from pending list to inflight one */
 695        QLIST_REMOVE(aio_req, aio_siblings);
 696        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
 697        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov, false,
 698                        acb->aiocb_type);
 699    }
 700}
 701
 702static coroutine_fn void reconnect_to_sdog(void *opaque)
 703{
 704    BDRVSheepdogState *s = opaque;
 705    AIOReq *aio_req, *next;
 706
 707    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL);
 708    close(s->fd);
 709    s->fd = -1;
 710
 711    /* Wait for outstanding write requests to be completed. */
 712    while (s->co_send != NULL) {
 713        co_write_request(opaque);
 714    }
 715
 716    /* Try to reconnect the sheepdog server every one second. */
 717    while (s->fd < 0) {
 718        s->fd = get_sheep_fd(s);
 719        if (s->fd < 0) {
 720            DPRINTF("Wait for connection to be established\n");
 721            co_aio_sleep_ns(bdrv_get_aio_context(s->bs), QEMU_CLOCK_REALTIME,
 722                            1000000000ULL);
 723        }
 724    };
 725
 726    /*
 727     * Now we have to resend all the request in the inflight queue.  However,
 728     * resend_aioreq() can yield and newly created requests can be added to the
 729     * inflight queue before the coroutine is resumed.  To avoid mixing them, we
 730     * have to move all the inflight requests to the failed queue before
 731     * resend_aioreq() is called.
 732     */
 733    QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
 734        QLIST_REMOVE(aio_req, aio_siblings);
 735        QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
 736    }
 737
 738    /* Resend all the failed aio requests. */
 739    while (!QLIST_EMPTY(&s->failed_aio_head)) {
 740        aio_req = QLIST_FIRST(&s->failed_aio_head);
 741        QLIST_REMOVE(aio_req, aio_siblings);
 742        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
 743        resend_aioreq(s, aio_req);
 744    }
 745}
 746
 747/*
 748 * Receive responses of the I/O requests.
 749 *
 750 * This function is registered as a fd handler, and called from the
 751 * main loop when s->fd is ready for reading responses.
 752 */
 753static void coroutine_fn aio_read_response(void *opaque)
 754{
 755    SheepdogObjRsp rsp;
 756    BDRVSheepdogState *s = opaque;
 757    int fd = s->fd;
 758    int ret;
 759    AIOReq *aio_req = NULL;
 760    SheepdogAIOCB *acb;
 761    uint64_t idx;
 762
 763    /* read a header */
 764    ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
 765    if (ret != sizeof(rsp)) {
 766        error_report("failed to get the header, %s", strerror(errno));
 767        goto err;
 768    }
 769
 770    /* find the right aio_req from the inflight aio list */
 771    QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
 772        if (aio_req->id == rsp.id) {
 773            break;
 774        }
 775    }
 776    if (!aio_req) {
 777        error_report("cannot find aio_req %x", rsp.id);
 778        goto err;
 779    }
 780
 781    acb = aio_req->aiocb;
 782
 783    switch (acb->aiocb_type) {
 784    case AIOCB_WRITE_UDATA:
 785        /* this coroutine context is no longer suitable for co_recv
 786         * because we may send data to update vdi objects */
 787        s->co_recv = NULL;
 788        if (!is_data_obj(aio_req->oid)) {
 789            break;
 790        }
 791        idx = data_oid_to_idx(aio_req->oid);
 792
 793        if (s->inode.data_vdi_id[idx] != s->inode.vdi_id) {
 794            /*
 795             * If the object is newly created one, we need to update
 796             * the vdi object (metadata object).  min_dirty_data_idx
 797             * and max_dirty_data_idx are changed to include updated
 798             * index between them.
 799             */
 800            if (rsp.result == SD_RES_SUCCESS) {
 801                s->inode.data_vdi_id[idx] = s->inode.vdi_id;
 802                s->max_dirty_data_idx = MAX(idx, s->max_dirty_data_idx);
 803                s->min_dirty_data_idx = MIN(idx, s->min_dirty_data_idx);
 804            }
 805            /*
 806             * Some requests may be blocked because simultaneous
 807             * create requests are not allowed, so we search the
 808             * pending requests here.
 809             */
 810            send_pending_req(s, aio_req->oid);
 811        }
 812        break;
 813    case AIOCB_READ_UDATA:
 814        ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
 815                            aio_req->iov_offset, rsp.data_length);
 816        if (ret != rsp.data_length) {
 817            error_report("failed to get the data, %s", strerror(errno));
 818            goto err;
 819        }
 820        break;
 821    case AIOCB_FLUSH_CACHE:
 822        if (rsp.result == SD_RES_INVALID_PARMS) {
 823            DPRINTF("disable cache since the server doesn't support it\n");
 824            s->cache_flags = SD_FLAG_CMD_DIRECT;
 825            rsp.result = SD_RES_SUCCESS;
 826        }
 827        break;
 828    case AIOCB_DISCARD_OBJ:
 829        switch (rsp.result) {
 830        case SD_RES_INVALID_PARMS:
 831            error_report("sheep(%s) doesn't support discard command",
 832                         s->host_spec);
 833            rsp.result = SD_RES_SUCCESS;
 834            s->discard_supported = false;
 835            break;
 836        case SD_RES_SUCCESS:
 837            idx = data_oid_to_idx(aio_req->oid);
 838            s->inode.data_vdi_id[idx] = 0;
 839            break;
 840        default:
 841            break;
 842        }
 843    }
 844
 845    switch (rsp.result) {
 846    case SD_RES_SUCCESS:
 847        break;
 848    case SD_RES_READONLY:
 849        if (s->inode.vdi_id == oid_to_vid(aio_req->oid)) {
 850            ret = reload_inode(s, 0, "");
 851            if (ret < 0) {
 852                goto err;
 853            }
 854        }
 855        if (is_data_obj(aio_req->oid)) {
 856            aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
 857                                           data_oid_to_idx(aio_req->oid));
 858        } else {
 859            aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
 860        }
 861        resend_aioreq(s, aio_req);
 862        goto out;
 863    default:
 864        acb->ret = -EIO;
 865        error_report("%s", sd_strerror(rsp.result));
 866        break;
 867    }
 868
 869    free_aio_req(s, aio_req);
 870    if (!acb->nr_pending) {
 871        /*
 872         * We've finished all requests which belong to the AIOCB, so
 873         * we can switch back to sd_co_readv/writev now.
 874         */
 875        acb->aio_done_func(acb);
 876    }
 877out:
 878    s->co_recv = NULL;
 879    return;
 880err:
 881    s->co_recv = NULL;
 882    reconnect_to_sdog(opaque);
 883}
 884
 885static void co_read_response(void *opaque)
 886{
 887    BDRVSheepdogState *s = opaque;
 888
 889    if (!s->co_recv) {
 890        s->co_recv = qemu_coroutine_create(aio_read_response);
 891    }
 892
 893    qemu_coroutine_enter(s->co_recv, opaque);
 894}
 895
 896static void co_write_request(void *opaque)
 897{
 898    BDRVSheepdogState *s = opaque;
 899
 900    qemu_coroutine_enter(s->co_send, NULL);
 901}
 902
 903/*
 904 * Return a socket discriptor to read/write objects.
 905 *
 906 * We cannot use this discriptor for other operations because
 907 * the block driver may be on waiting response from the server.
 908 */
 909static int get_sheep_fd(BDRVSheepdogState *s)
 910{
 911    int fd;
 912
 913    fd = connect_to_sdog(s);
 914    if (fd < 0) {
 915        return fd;
 916    }
 917
 918    qemu_aio_set_fd_handler(fd, co_read_response, NULL, s);
 919    return fd;
 920}
 921
 922static int sd_parse_uri(BDRVSheepdogState *s, const char *filename,
 923                        char *vdi, uint32_t *snapid, char *tag)
 924{
 925    URI *uri;
 926    QueryParams *qp = NULL;
 927    int ret = 0;
 928
 929    uri = uri_parse(filename);
 930    if (!uri) {
 931        return -EINVAL;
 932    }
 933
 934    /* transport */
 935    if (!strcmp(uri->scheme, "sheepdog")) {
 936        s->is_unix = false;
 937    } else if (!strcmp(uri->scheme, "sheepdog+tcp")) {
 938        s->is_unix = false;
 939    } else if (!strcmp(uri->scheme, "sheepdog+unix")) {
 940        s->is_unix = true;
 941    } else {
 942        ret = -EINVAL;
 943        goto out;
 944    }
 945
 946    if (uri->path == NULL || !strcmp(uri->path, "/")) {
 947        ret = -EINVAL;
 948        goto out;
 949    }
 950    pstrcpy(vdi, SD_MAX_VDI_LEN, uri->path + 1);
 951
 952    qp = query_params_parse(uri->query);
 953    if (qp->n > 1 || (s->is_unix && !qp->n) || (!s->is_unix && qp->n)) {
 954        ret = -EINVAL;
 955        goto out;
 956    }
 957
 958    if (s->is_unix) {
 959        /* sheepdog+unix:///vdiname?socket=path */
 960        if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) {
 961            ret = -EINVAL;
 962            goto out;
 963        }
 964        s->host_spec = g_strdup(qp->p[0].value);
 965    } else {
 966        /* sheepdog[+tcp]://[host:port]/vdiname */
 967        s->host_spec = g_strdup_printf("%s:%d", uri->server ?: SD_DEFAULT_ADDR,
 968                                       uri->port ?: SD_DEFAULT_PORT);
 969    }
 970
 971    /* snapshot tag */
 972    if (uri->fragment) {
 973        *snapid = strtoul(uri->fragment, NULL, 10);
 974        if (*snapid == 0) {
 975            pstrcpy(tag, SD_MAX_VDI_TAG_LEN, uri->fragment);
 976        }
 977    } else {
 978        *snapid = CURRENT_VDI_ID; /* search current vdi */
 979    }
 980
 981out:
 982    if (qp) {
 983        query_params_free(qp);
 984    }
 985    uri_free(uri);
 986    return ret;
 987}
 988
 989/*
 990 * Parse a filename (old syntax)
 991 *
 992 * filename must be one of the following formats:
 993 *   1. [vdiname]
 994 *   2. [vdiname]:[snapid]
 995 *   3. [vdiname]:[tag]
 996 *   4. [hostname]:[port]:[vdiname]
 997 *   5. [hostname]:[port]:[vdiname]:[snapid]
 998 *   6. [hostname]:[port]:[vdiname]:[tag]
 999 *
1000 * You can boot from the snapshot images by specifying `snapid` or
1001 * `tag'.
1002 *
1003 * You can run VMs outside the Sheepdog cluster by specifying
1004 * `hostname' and `port' (experimental).
1005 */
1006static int parse_vdiname(BDRVSheepdogState *s, const char *filename,
1007                         char *vdi, uint32_t *snapid, char *tag)
1008{
1009    char *p, *q, *uri;
1010    const char *host_spec, *vdi_spec;
1011    int nr_sep, ret;
1012
1013    strstart(filename, "sheepdog:", (const char **)&filename);
1014    p = q = g_strdup(filename);
1015
1016    /* count the number of separators */
1017    nr_sep = 0;
1018    while (*p) {
1019        if (*p == ':') {
1020            nr_sep++;
1021        }
1022        p++;
1023    }
1024    p = q;
1025
1026    /* use the first two tokens as host_spec. */
1027    if (nr_sep >= 2) {
1028        host_spec = p;
1029        p = strchr(p, ':');
1030        p++;
1031        p = strchr(p, ':');
1032        *p++ = '\0';
1033    } else {
1034        host_spec = "";
1035    }
1036
1037    vdi_spec = p;
1038
1039    p = strchr(vdi_spec, ':');
1040    if (p) {
1041        *p++ = '#';
1042    }
1043
1044    uri = g_strdup_printf("sheepdog://%s/%s", host_spec, vdi_spec);
1045
1046    ret = sd_parse_uri(s, uri, vdi, snapid, tag);
1047
1048    g_free(q);
1049    g_free(uri);
1050
1051    return ret;
1052}
1053
1054static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
1055                         uint32_t snapid, const char *tag, uint32_t *vid,
1056                         bool lock)
1057{
1058    int ret, fd;
1059    SheepdogVdiReq hdr;
1060    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1061    unsigned int wlen, rlen = 0;
1062    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
1063
1064    fd = connect_to_sdog(s);
1065    if (fd < 0) {
1066        return fd;
1067    }
1068
1069    /* This pair of strncpy calls ensures that the buffer is zero-filled,
1070     * which is desirable since we'll soon be sending those bytes, and
1071     * don't want the send_req to read uninitialized data.
1072     */
1073    strncpy(buf, filename, SD_MAX_VDI_LEN);
1074    strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
1075
1076    memset(&hdr, 0, sizeof(hdr));
1077    if (lock) {
1078        hdr.opcode = SD_OP_LOCK_VDI;
1079    } else {
1080        hdr.opcode = SD_OP_GET_VDI_INFO;
1081    }
1082    wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
1083    hdr.proto_ver = SD_PROTO_VER;
1084    hdr.data_length = wlen;
1085    hdr.snapid = snapid;
1086    hdr.flags = SD_FLAG_CMD_WRITE;
1087
1088    ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1089    if (ret) {
1090        goto out;
1091    }
1092
1093    if (rsp->result != SD_RES_SUCCESS) {
1094        error_report("cannot get vdi info, %s, %s %d %s",
1095                     sd_strerror(rsp->result), filename, snapid, tag);
1096        if (rsp->result == SD_RES_NO_VDI) {
1097            ret = -ENOENT;
1098        } else {
1099            ret = -EIO;
1100        }
1101        goto out;
1102    }
1103    *vid = rsp->vdi_id;
1104
1105    ret = 0;
1106out:
1107    closesocket(fd);
1108    return ret;
1109}
1110
1111static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
1112                           struct iovec *iov, int niov, bool create,
1113                           enum AIOCBState aiocb_type)
1114{
1115    int nr_copies = s->inode.nr_copies;
1116    SheepdogObjReq hdr;
1117    unsigned int wlen = 0;
1118    int ret;
1119    uint64_t oid = aio_req->oid;
1120    unsigned int datalen = aio_req->data_len;
1121    uint64_t offset = aio_req->offset;
1122    uint8_t flags = aio_req->flags;
1123    uint64_t old_oid = aio_req->base_oid;
1124
1125    if (!nr_copies) {
1126        error_report("bug");
1127    }
1128
1129    memset(&hdr, 0, sizeof(hdr));
1130
1131    switch (aiocb_type) {
1132    case AIOCB_FLUSH_CACHE:
1133        hdr.opcode = SD_OP_FLUSH_VDI;
1134        break;
1135    case AIOCB_READ_UDATA:
1136        hdr.opcode = SD_OP_READ_OBJ;
1137        hdr.flags = flags;
1138        break;
1139    case AIOCB_WRITE_UDATA:
1140        if (create) {
1141            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1142        } else {
1143            hdr.opcode = SD_OP_WRITE_OBJ;
1144        }
1145        wlen = datalen;
1146        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1147        break;
1148    case AIOCB_DISCARD_OBJ:
1149        hdr.opcode = SD_OP_DISCARD_OBJ;
1150        break;
1151    }
1152
1153    if (s->cache_flags) {
1154        hdr.flags |= s->cache_flags;
1155    }
1156
1157    hdr.oid = oid;
1158    hdr.cow_oid = old_oid;
1159    hdr.copies = s->inode.nr_copies;
1160
1161    hdr.data_length = datalen;
1162    hdr.offset = offset;
1163
1164    hdr.id = aio_req->id;
1165
1166    qemu_co_mutex_lock(&s->lock);
1167    s->co_send = qemu_coroutine_self();
1168    qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request, s);
1169    socket_set_cork(s->fd, 1);
1170
1171    /* send a header */
1172    ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
1173    if (ret != sizeof(hdr)) {
1174        error_report("failed to send a req, %s", strerror(errno));
1175        goto out;
1176    }
1177
1178    if (wlen) {
1179        ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
1180        if (ret != wlen) {
1181            error_report("failed to send a data, %s", strerror(errno));
1182        }
1183    }
1184out:
1185    socket_set_cork(s->fd, 0);
1186    qemu_aio_set_fd_handler(s->fd, co_read_response, NULL, s);
1187    s->co_send = NULL;
1188    qemu_co_mutex_unlock(&s->lock);
1189}
1190
1191static int read_write_object(int fd, char *buf, uint64_t oid, uint8_t copies,
1192                             unsigned int datalen, uint64_t offset,
1193                             bool write, bool create, uint32_t cache_flags)
1194{
1195    SheepdogObjReq hdr;
1196    SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1197    unsigned int wlen, rlen;
1198    int ret;
1199
1200    memset(&hdr, 0, sizeof(hdr));
1201
1202    if (write) {
1203        wlen = datalen;
1204        rlen = 0;
1205        hdr.flags = SD_FLAG_CMD_WRITE;
1206        if (create) {
1207            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1208        } else {
1209            hdr.opcode = SD_OP_WRITE_OBJ;
1210        }
1211    } else {
1212        wlen = 0;
1213        rlen = datalen;
1214        hdr.opcode = SD_OP_READ_OBJ;
1215    }
1216
1217    hdr.flags |= cache_flags;
1218
1219    hdr.oid = oid;
1220    hdr.data_length = datalen;
1221    hdr.offset = offset;
1222    hdr.copies = copies;
1223
1224    ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1225    if (ret) {
1226        error_report("failed to send a request to the sheep");
1227        return ret;
1228    }
1229
1230    switch (rsp->result) {
1231    case SD_RES_SUCCESS:
1232        return 0;
1233    default:
1234        error_report("%s", sd_strerror(rsp->result));
1235        return -EIO;
1236    }
1237}
1238
1239static int read_object(int fd, char *buf, uint64_t oid, uint8_t copies,
1240                       unsigned int datalen, uint64_t offset,
1241                       uint32_t cache_flags)
1242{
1243    return read_write_object(fd, buf, oid, copies, datalen, offset, false,
1244                             false, cache_flags);
1245}
1246
1247static int write_object(int fd, char *buf, uint64_t oid, uint8_t copies,
1248                        unsigned int datalen, uint64_t offset, bool create,
1249                        uint32_t cache_flags)
1250{
1251    return read_write_object(fd, buf, oid, copies, datalen, offset, true,
1252                             create, cache_flags);
1253}
1254
1255/* update inode with the latest state */
1256static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
1257{
1258    SheepdogInode *inode;
1259    int ret = 0, fd;
1260    uint32_t vid = 0;
1261
1262    fd = connect_to_sdog(s);
1263    if (fd < 0) {
1264        return -EIO;
1265    }
1266
1267    inode = g_malloc(sizeof(s->inode));
1268
1269    ret = find_vdi_name(s, s->name, snapid, tag, &vid, false);
1270    if (ret) {
1271        goto out;
1272    }
1273
1274    ret = read_object(fd, (char *)inode, vid_to_vdi_oid(vid),
1275                      s->inode.nr_copies, sizeof(*inode), 0, s->cache_flags);
1276    if (ret < 0) {
1277        goto out;
1278    }
1279
1280    if (inode->vdi_id != s->inode.vdi_id) {
1281        memcpy(&s->inode, inode, sizeof(s->inode));
1282    }
1283
1284out:
1285    g_free(inode);
1286    closesocket(fd);
1287
1288    return ret;
1289}
1290
1291/* Return true if the specified request is linked to the pending list. */
1292static bool check_simultaneous_create(BDRVSheepdogState *s, AIOReq *aio_req)
1293{
1294    AIOReq *areq;
1295    QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
1296        if (areq != aio_req && areq->oid == aio_req->oid) {
1297            /*
1298             * Sheepdog cannot handle simultaneous create requests to the same
1299             * object, so we cannot send the request until the previous request
1300             * finishes.
1301             */
1302            DPRINTF("simultaneous create to %" PRIx64 "\n", aio_req->oid);
1303            aio_req->flags = 0;
1304            aio_req->base_oid = 0;
1305            QLIST_REMOVE(aio_req, aio_siblings);
1306            QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req, aio_siblings);
1307            return true;
1308        }
1309    }
1310
1311    return false;
1312}
1313
1314static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
1315{
1316    SheepdogAIOCB *acb = aio_req->aiocb;
1317    bool create = false;
1318
1319    /* check whether this request becomes a CoW one */
1320    if (acb->aiocb_type == AIOCB_WRITE_UDATA && is_data_obj(aio_req->oid)) {
1321        int idx = data_oid_to_idx(aio_req->oid);
1322
1323        if (is_data_obj_writable(&s->inode, idx)) {
1324            goto out;
1325        }
1326
1327        if (check_simultaneous_create(s, aio_req)) {
1328            return;
1329        }
1330
1331        if (s->inode.data_vdi_id[idx]) {
1332            aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
1333            aio_req->flags |= SD_FLAG_CMD_COW;
1334        }
1335        create = true;
1336    }
1337out:
1338    if (is_data_obj(aio_req->oid)) {
1339        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov, create,
1340                        acb->aiocb_type);
1341    } else {
1342        struct iovec iov;
1343        iov.iov_base = &s->inode;
1344        iov.iov_len = sizeof(s->inode);
1345        add_aio_request(s, aio_req, &iov, 1, false, AIOCB_WRITE_UDATA);
1346    }
1347}
1348
1349/* TODO Convert to fine grained options */
1350static QemuOptsList runtime_opts = {
1351    .name = "sheepdog",
1352    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
1353    .desc = {
1354        {
1355            .name = "filename",
1356            .type = QEMU_OPT_STRING,
1357            .help = "URL to the sheepdog image",
1358        },
1359        { /* end of list */ }
1360    },
1361};
1362
1363static int sd_open(BlockDriverState *bs, QDict *options, int flags,
1364                   Error **errp)
1365{
1366    int ret, fd;
1367    uint32_t vid = 0;
1368    BDRVSheepdogState *s = bs->opaque;
1369    char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1370    uint32_t snapid;
1371    char *buf = NULL;
1372    QemuOpts *opts;
1373    Error *local_err = NULL;
1374    const char *filename;
1375
1376    s->bs = bs;
1377
1378    opts = qemu_opts_create_nofail(&runtime_opts);
1379    qemu_opts_absorb_qdict(opts, options, &local_err);
1380    if (error_is_set(&local_err)) {
1381        qerror_report_err(local_err);
1382        error_free(local_err);
1383        ret = -EINVAL;
1384        goto out;
1385    }
1386
1387    filename = qemu_opt_get(opts, "filename");
1388
1389    QLIST_INIT(&s->inflight_aio_head);
1390    QLIST_INIT(&s->pending_aio_head);
1391    QLIST_INIT(&s->failed_aio_head);
1392    s->fd = -1;
1393
1394    memset(vdi, 0, sizeof(vdi));
1395    memset(tag, 0, sizeof(tag));
1396
1397    if (strstr(filename, "://")) {
1398        ret = sd_parse_uri(s, filename, vdi, &snapid, tag);
1399    } else {
1400        ret = parse_vdiname(s, filename, vdi, &snapid, tag);
1401    }
1402    if (ret < 0) {
1403        goto out;
1404    }
1405    s->fd = get_sheep_fd(s);
1406    if (s->fd < 0) {
1407        ret = s->fd;
1408        goto out;
1409    }
1410
1411    ret = find_vdi_name(s, vdi, snapid, tag, &vid, true);
1412    if (ret) {
1413        goto out;
1414    }
1415
1416    /*
1417     * QEMU block layer emulates writethrough cache as 'writeback + flush', so
1418     * we always set SD_FLAG_CMD_CACHE (writeback cache) as default.
1419     */
1420    s->cache_flags = SD_FLAG_CMD_CACHE;
1421    if (flags & BDRV_O_NOCACHE) {
1422        s->cache_flags = SD_FLAG_CMD_DIRECT;
1423    }
1424    s->discard_supported = true;
1425
1426    if (snapid || tag[0] != '\0') {
1427        DPRINTF("%" PRIx32 " snapshot inode was open.\n", vid);
1428        s->is_snapshot = true;
1429    }
1430
1431    fd = connect_to_sdog(s);
1432    if (fd < 0) {
1433        ret = fd;
1434        goto out;
1435    }
1436
1437    buf = g_malloc(SD_INODE_SIZE);
1438    ret = read_object(fd, buf, vid_to_vdi_oid(vid), 0, SD_INODE_SIZE, 0,
1439                      s->cache_flags);
1440
1441    closesocket(fd);
1442
1443    if (ret) {
1444        goto out;
1445    }
1446
1447    memcpy(&s->inode, buf, sizeof(s->inode));
1448    s->min_dirty_data_idx = UINT32_MAX;
1449    s->max_dirty_data_idx = 0;
1450
1451    bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
1452    pstrcpy(s->name, sizeof(s->name), vdi);
1453    qemu_co_mutex_init(&s->lock);
1454    qemu_opts_del(opts);
1455    g_free(buf);
1456    return 0;
1457out:
1458    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL);
1459    if (s->fd >= 0) {
1460        closesocket(s->fd);
1461    }
1462    qemu_opts_del(opts);
1463    g_free(buf);
1464    return ret;
1465}
1466
1467static int do_sd_create(BDRVSheepdogState *s, char *filename, int64_t vdi_size,
1468                        uint32_t base_vid, uint32_t *vdi_id, int snapshot,
1469                        uint8_t copy_policy)
1470{
1471    SheepdogVdiReq hdr;
1472    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1473    int fd, ret;
1474    unsigned int wlen, rlen = 0;
1475    char buf[SD_MAX_VDI_LEN];
1476
1477    fd = connect_to_sdog(s);
1478    if (fd < 0) {
1479        return fd;
1480    }
1481
1482    /* FIXME: would it be better to fail (e.g., return -EIO) when filename
1483     * does not fit in buf?  For now, just truncate and avoid buffer overrun.
1484     */
1485    memset(buf, 0, sizeof(buf));
1486    pstrcpy(buf, sizeof(buf), filename);
1487
1488    memset(&hdr, 0, sizeof(hdr));
1489    hdr.opcode = SD_OP_NEW_VDI;
1490    hdr.vdi_id = base_vid;
1491
1492    wlen = SD_MAX_VDI_LEN;
1493
1494    hdr.flags = SD_FLAG_CMD_WRITE;
1495    hdr.snapid = snapshot;
1496
1497    hdr.data_length = wlen;
1498    hdr.vdi_size = vdi_size;
1499    hdr.copy_policy = copy_policy;
1500
1501    ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1502
1503    closesocket(fd);
1504
1505    if (ret) {
1506        return ret;
1507    }
1508
1509    if (rsp->result != SD_RES_SUCCESS) {
1510        error_report("%s, %s", sd_strerror(rsp->result), filename);
1511        return -EIO;
1512    }
1513
1514    if (vdi_id) {
1515        *vdi_id = rsp->vdi_id;
1516    }
1517
1518    return 0;
1519}
1520
1521static int sd_prealloc(const char *filename)
1522{
1523    BlockDriverState *bs = NULL;
1524    uint32_t idx, max_idx;
1525    int64_t vdi_size;
1526    void *buf = g_malloc0(SD_DATA_OBJ_SIZE);
1527    Error *local_err = NULL;
1528    int ret;
1529
1530    ret = bdrv_file_open(&bs, filename, NULL, BDRV_O_RDWR, &local_err);
1531    if (ret < 0) {
1532        qerror_report_err(local_err);
1533        error_free(local_err);
1534        goto out;
1535    }
1536
1537    vdi_size = bdrv_getlength(bs);
1538    if (vdi_size < 0) {
1539        ret = vdi_size;
1540        goto out;
1541    }
1542    max_idx = DIV_ROUND_UP(vdi_size, SD_DATA_OBJ_SIZE);
1543
1544    for (idx = 0; idx < max_idx; idx++) {
1545        /*
1546         * The created image can be a cloned image, so we need to read
1547         * a data from the source image.
1548         */
1549        ret = bdrv_pread(bs, idx * SD_DATA_OBJ_SIZE, buf, SD_DATA_OBJ_SIZE);
1550        if (ret < 0) {
1551            goto out;
1552        }
1553        ret = bdrv_pwrite(bs, idx * SD_DATA_OBJ_SIZE, buf, SD_DATA_OBJ_SIZE);
1554        if (ret < 0) {
1555            goto out;
1556        }
1557    }
1558out:
1559    if (bs) {
1560        bdrv_unref(bs);
1561    }
1562    g_free(buf);
1563
1564    return ret;
1565}
1566
1567static int sd_create(const char *filename, QEMUOptionParameter *options,
1568                     Error **errp)
1569{
1570    int ret = 0;
1571    uint32_t vid = 0, base_vid = 0;
1572    int64_t vdi_size = 0;
1573    char *backing_file = NULL;
1574    BDRVSheepdogState *s;
1575    char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1576    uint32_t snapid;
1577    bool prealloc = false;
1578    Error *local_err = NULL;
1579
1580    s = g_malloc0(sizeof(BDRVSheepdogState));
1581
1582    memset(vdi, 0, sizeof(vdi));
1583    memset(tag, 0, sizeof(tag));
1584    if (strstr(filename, "://")) {
1585        ret = sd_parse_uri(s, filename, vdi, &snapid, tag);
1586    } else {
1587        ret = parse_vdiname(s, filename, vdi, &snapid, tag);
1588    }
1589    if (ret < 0) {
1590        goto out;
1591    }
1592
1593    while (options && options->name) {
1594        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
1595            vdi_size = options->value.n;
1596        } else if (!strcmp(options->name, BLOCK_OPT_BACKING_FILE)) {
1597            backing_file = options->value.s;
1598        } else if (!strcmp(options->name, BLOCK_OPT_PREALLOC)) {
1599            if (!options->value.s || !strcmp(options->value.s, "off")) {
1600                prealloc = false;
1601            } else if (!strcmp(options->value.s, "full")) {
1602                prealloc = true;
1603            } else {
1604                error_report("Invalid preallocation mode: '%s'",
1605                             options->value.s);
1606                ret = -EINVAL;
1607                goto out;
1608            }
1609        }
1610        options++;
1611    }
1612
1613    if (vdi_size > SD_MAX_VDI_SIZE) {
1614        error_report("too big image size");
1615        ret = -EINVAL;
1616        goto out;
1617    }
1618
1619    if (backing_file) {
1620        BlockDriverState *bs;
1621        BDRVSheepdogState *s;
1622        BlockDriver *drv;
1623
1624        /* Currently, only Sheepdog backing image is supported. */
1625        drv = bdrv_find_protocol(backing_file, true);
1626        if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
1627            error_report("backing_file must be a sheepdog image");
1628            ret = -EINVAL;
1629            goto out;
1630        }
1631
1632        ret = bdrv_file_open(&bs, backing_file, NULL, 0, &local_err);
1633        if (ret < 0) {
1634            qerror_report_err(local_err);
1635            error_free(local_err);
1636            goto out;
1637        }
1638
1639        s = bs->opaque;
1640
1641        if (!is_snapshot(&s->inode)) {
1642            error_report("cannot clone from a non snapshot vdi");
1643            bdrv_unref(bs);
1644            ret = -EINVAL;
1645            goto out;
1646        }
1647
1648        base_vid = s->inode.vdi_id;
1649        bdrv_unref(bs);
1650    }
1651
1652    /* TODO: allow users to specify copy number */
1653    ret = do_sd_create(s, vdi, vdi_size, base_vid, &vid, 0, 0);
1654    if (!prealloc || ret) {
1655        goto out;
1656    }
1657
1658    ret = sd_prealloc(filename);
1659out:
1660    g_free(s);
1661    return ret;
1662}
1663
1664static void sd_close(BlockDriverState *bs)
1665{
1666    BDRVSheepdogState *s = bs->opaque;
1667    SheepdogVdiReq hdr;
1668    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1669    unsigned int wlen, rlen = 0;
1670    int fd, ret;
1671
1672    DPRINTF("%s\n", s->name);
1673
1674    fd = connect_to_sdog(s);
1675    if (fd < 0) {
1676        return;
1677    }
1678
1679    memset(&hdr, 0, sizeof(hdr));
1680
1681    hdr.opcode = SD_OP_RELEASE_VDI;
1682    hdr.vdi_id = s->inode.vdi_id;
1683    wlen = strlen(s->name) + 1;
1684    hdr.data_length = wlen;
1685    hdr.flags = SD_FLAG_CMD_WRITE;
1686
1687    ret = do_req(fd, (SheepdogReq *)&hdr, s->name, &wlen, &rlen);
1688
1689    closesocket(fd);
1690
1691    if (!ret && rsp->result != SD_RES_SUCCESS &&
1692        rsp->result != SD_RES_VDI_NOT_LOCKED) {
1693        error_report("%s, %s", sd_strerror(rsp->result), s->name);
1694    }
1695
1696    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL);
1697    closesocket(s->fd);
1698    g_free(s->host_spec);
1699}
1700
1701static int64_t sd_getlength(BlockDriverState *bs)
1702{
1703    BDRVSheepdogState *s = bs->opaque;
1704
1705    return s->inode.vdi_size;
1706}
1707
1708static int sd_truncate(BlockDriverState *bs, int64_t offset)
1709{
1710    BDRVSheepdogState *s = bs->opaque;
1711    int ret, fd;
1712    unsigned int datalen;
1713
1714    if (offset < s->inode.vdi_size) {
1715        error_report("shrinking is not supported");
1716        return -EINVAL;
1717    } else if (offset > SD_MAX_VDI_SIZE) {
1718        error_report("too big image size");
1719        return -EINVAL;
1720    }
1721
1722    fd = connect_to_sdog(s);
1723    if (fd < 0) {
1724        return fd;
1725    }
1726
1727    /* we don't need to update entire object */
1728    datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
1729    s->inode.vdi_size = offset;
1730    ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id),
1731                       s->inode.nr_copies, datalen, 0, false, s->cache_flags);
1732    close(fd);
1733
1734    if (ret < 0) {
1735        error_report("failed to update an inode.");
1736    }
1737
1738    return ret;
1739}
1740
1741/*
1742 * This function is called after writing data objects.  If we need to
1743 * update metadata, this sends a write request to the vdi object.
1744 * Otherwise, this switches back to sd_co_readv/writev.
1745 */
1746static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
1747{
1748    BDRVSheepdogState *s = acb->common.bs->opaque;
1749    struct iovec iov;
1750    AIOReq *aio_req;
1751    uint32_t offset, data_len, mn, mx;
1752
1753    mn = s->min_dirty_data_idx;
1754    mx = s->max_dirty_data_idx;
1755    if (mn <= mx) {
1756        /* we need to update the vdi object. */
1757        offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
1758            mn * sizeof(s->inode.data_vdi_id[0]);
1759        data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
1760
1761        s->min_dirty_data_idx = UINT32_MAX;
1762        s->max_dirty_data_idx = 0;
1763
1764        iov.iov_base = &s->inode;
1765        iov.iov_len = sizeof(s->inode);
1766        aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
1767                                data_len, offset, 0, 0, offset);
1768        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1769        add_aio_request(s, aio_req, &iov, 1, false, AIOCB_WRITE_UDATA);
1770
1771        acb->aio_done_func = sd_finish_aiocb;
1772        acb->aiocb_type = AIOCB_WRITE_UDATA;
1773        return;
1774    }
1775
1776    sd_finish_aiocb(acb);
1777}
1778
1779/* Delete current working VDI on the snapshot chain */
1780static bool sd_delete(BDRVSheepdogState *s)
1781{
1782    unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
1783    SheepdogVdiReq hdr = {
1784        .opcode = SD_OP_DEL_VDI,
1785        .vdi_id = s->inode.vdi_id,
1786        .data_length = wlen,
1787        .flags = SD_FLAG_CMD_WRITE,
1788    };
1789    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1790    int fd, ret;
1791
1792    fd = connect_to_sdog(s);
1793    if (fd < 0) {
1794        return false;
1795    }
1796
1797    ret = do_req(fd, (SheepdogReq *)&hdr, s->name, &wlen, &rlen);
1798    closesocket(fd);
1799    if (ret) {
1800        return false;
1801    }
1802    switch (rsp->result) {
1803    case SD_RES_NO_VDI:
1804        error_report("%s was already deleted", s->name);
1805        /* fall through */
1806    case SD_RES_SUCCESS:
1807        break;
1808    default:
1809        error_report("%s, %s", sd_strerror(rsp->result), s->name);
1810        return false;
1811    }
1812
1813    return true;
1814}
1815
1816/*
1817 * Create a writable VDI from a snapshot
1818 */
1819static int sd_create_branch(BDRVSheepdogState *s)
1820{
1821    int ret, fd;
1822    uint32_t vid;
1823    char *buf;
1824    bool deleted;
1825
1826    DPRINTF("%" PRIx32 " is snapshot.\n", s->inode.vdi_id);
1827
1828    buf = g_malloc(SD_INODE_SIZE);
1829
1830    /*
1831     * Even If deletion fails, we will just create extra snapshot based on
1832     * the workding VDI which was supposed to be deleted. So no need to
1833     * false bail out.
1834     */
1835    deleted = sd_delete(s);
1836    ret = do_sd_create(s, s->name, s->inode.vdi_size, s->inode.vdi_id, &vid,
1837                       !deleted, s->inode.copy_policy);
1838    if (ret) {
1839        goto out;
1840    }
1841
1842    DPRINTF("%" PRIx32 " is created.\n", vid);
1843
1844    fd = connect_to_sdog(s);
1845    if (fd < 0) {
1846        ret = fd;
1847        goto out;
1848    }
1849
1850    ret = read_object(fd, buf, vid_to_vdi_oid(vid), s->inode.nr_copies,
1851                      SD_INODE_SIZE, 0, s->cache_flags);
1852
1853    closesocket(fd);
1854
1855    if (ret < 0) {
1856        goto out;
1857    }
1858
1859    memcpy(&s->inode, buf, sizeof(s->inode));
1860
1861    s->is_snapshot = false;
1862    ret = 0;
1863    DPRINTF("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
1864
1865out:
1866    g_free(buf);
1867
1868    return ret;
1869}
1870
1871/*
1872 * Send I/O requests to the server.
1873 *
1874 * This function sends requests to the server, links the requests to
1875 * the inflight_list in BDRVSheepdogState, and exits without
1876 * waiting the response.  The responses are received in the
1877 * `aio_read_response' function which is called from the main loop as
1878 * a fd handler.
1879 *
1880 * Returns 1 when we need to wait a response, 0 when there is no sent
1881 * request and -errno in error cases.
1882 */
1883static int coroutine_fn sd_co_rw_vector(void *p)
1884{
1885    SheepdogAIOCB *acb = p;
1886    int ret = 0;
1887    unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
1888    unsigned long idx = acb->sector_num * BDRV_SECTOR_SIZE / SD_DATA_OBJ_SIZE;
1889    uint64_t oid;
1890    uint64_t offset = (acb->sector_num * BDRV_SECTOR_SIZE) % SD_DATA_OBJ_SIZE;
1891    BDRVSheepdogState *s = acb->common.bs->opaque;
1892    SheepdogInode *inode = &s->inode;
1893    AIOReq *aio_req;
1894
1895    if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
1896        /*
1897         * In the case we open the snapshot VDI, Sheepdog creates the
1898         * writable VDI when we do a write operation first.
1899         */
1900        ret = sd_create_branch(s);
1901        if (ret) {
1902            acb->ret = -EIO;
1903            goto out;
1904        }
1905    }
1906
1907    /*
1908     * Make sure we don't free the aiocb before we are done with all requests.
1909     * This additional reference is dropped at the end of this function.
1910     */
1911    acb->nr_pending++;
1912
1913    while (done != total) {
1914        uint8_t flags = 0;
1915        uint64_t old_oid = 0;
1916        bool create = false;
1917
1918        oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
1919
1920        len = MIN(total - done, SD_DATA_OBJ_SIZE - offset);
1921
1922        switch (acb->aiocb_type) {
1923        case AIOCB_READ_UDATA:
1924            if (!inode->data_vdi_id[idx]) {
1925                qemu_iovec_memset(acb->qiov, done, 0, len);
1926                goto done;
1927            }
1928            break;
1929        case AIOCB_WRITE_UDATA:
1930            if (!inode->data_vdi_id[idx]) {
1931                create = true;
1932            } else if (!is_data_obj_writable(inode, idx)) {
1933                /* Copy-On-Write */
1934                create = true;
1935                old_oid = oid;
1936                flags = SD_FLAG_CMD_COW;
1937            }
1938            break;
1939        case AIOCB_DISCARD_OBJ:
1940            /*
1941             * We discard the object only when the whole object is
1942             * 1) allocated 2) trimmed. Otherwise, simply skip it.
1943             */
1944            if (len != SD_DATA_OBJ_SIZE || inode->data_vdi_id[idx] == 0) {
1945                goto done;
1946            }
1947            break;
1948        default:
1949            break;
1950        }
1951
1952        if (create) {
1953            DPRINTF("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
1954                    inode->vdi_id, oid,
1955                    vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
1956            oid = vid_to_data_oid(inode->vdi_id, idx);
1957            DPRINTF("new oid %" PRIx64 "\n", oid);
1958        }
1959
1960        aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, old_oid, done);
1961        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1962
1963        if (create) {
1964            if (check_simultaneous_create(s, aio_req)) {
1965                goto done;
1966            }
1967        }
1968
1969        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov, create,
1970                        acb->aiocb_type);
1971    done:
1972        offset = 0;
1973        idx++;
1974        done += len;
1975    }
1976out:
1977    if (!--acb->nr_pending) {
1978        return acb->ret;
1979    }
1980    return 1;
1981}
1982
1983static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
1984                        int nb_sectors, QEMUIOVector *qiov)
1985{
1986    SheepdogAIOCB *acb;
1987    int ret;
1988
1989    if (bs->growable && sector_num + nb_sectors > bs->total_sectors) {
1990        ret = sd_truncate(bs, (sector_num + nb_sectors) * BDRV_SECTOR_SIZE);
1991        if (ret < 0) {
1992            return ret;
1993        }
1994        bs->total_sectors = sector_num + nb_sectors;
1995    }
1996
1997    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
1998    acb->aio_done_func = sd_write_done;
1999    acb->aiocb_type = AIOCB_WRITE_UDATA;
2000
2001    ret = sd_co_rw_vector(acb);
2002    if (ret <= 0) {
2003        qemu_aio_release(acb);
2004        return ret;
2005    }
2006
2007    qemu_coroutine_yield();
2008
2009    return acb->ret;
2010}
2011
2012static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
2013                       int nb_sectors, QEMUIOVector *qiov)
2014{
2015    SheepdogAIOCB *acb;
2016    int ret;
2017
2018    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
2019    acb->aiocb_type = AIOCB_READ_UDATA;
2020    acb->aio_done_func = sd_finish_aiocb;
2021
2022    ret = sd_co_rw_vector(acb);
2023    if (ret <= 0) {
2024        qemu_aio_release(acb);
2025        return ret;
2026    }
2027
2028    qemu_coroutine_yield();
2029
2030    return acb->ret;
2031}
2032
2033static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
2034{
2035    BDRVSheepdogState *s = bs->opaque;
2036    SheepdogAIOCB *acb;
2037    AIOReq *aio_req;
2038
2039    if (s->cache_flags != SD_FLAG_CMD_CACHE) {
2040        return 0;
2041    }
2042
2043    acb = sd_aio_setup(bs, NULL, 0, 0);
2044    acb->aiocb_type = AIOCB_FLUSH_CACHE;
2045    acb->aio_done_func = sd_finish_aiocb;
2046
2047    aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
2048                            0, 0, 0, 0, 0);
2049    QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
2050    add_aio_request(s, aio_req, NULL, 0, false, acb->aiocb_type);
2051
2052    qemu_coroutine_yield();
2053    return acb->ret;
2054}
2055
2056static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
2057{
2058    BDRVSheepdogState *s = bs->opaque;
2059    int ret, fd;
2060    uint32_t new_vid;
2061    SheepdogInode *inode;
2062    unsigned int datalen;
2063
2064    DPRINTF("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
2065            "is_snapshot %d\n", sn_info->name, sn_info->id_str,
2066            s->name, sn_info->vm_state_size, s->is_snapshot);
2067
2068    if (s->is_snapshot) {
2069        error_report("You can't create a snapshot of a snapshot VDI, "
2070                     "%s (%" PRIu32 ").", s->name, s->inode.vdi_id);
2071
2072        return -EINVAL;
2073    }
2074
2075    DPRINTF("%s %s\n", sn_info->name, sn_info->id_str);
2076
2077    s->inode.vm_state_size = sn_info->vm_state_size;
2078    s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
2079    /* It appears that inode.tag does not require a NUL terminator,
2080     * which means this use of strncpy is ok.
2081     */
2082    strncpy(s->inode.tag, sn_info->name, sizeof(s->inode.tag));
2083    /* we don't need to update entire object */
2084    datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
2085
2086    /* refresh inode. */
2087    fd = connect_to_sdog(s);
2088    if (fd < 0) {
2089        ret = fd;
2090        goto cleanup;
2091    }
2092
2093    ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id),
2094                       s->inode.nr_copies, datalen, 0, false, s->cache_flags);
2095    if (ret < 0) {
2096        error_report("failed to write snapshot's inode.");
2097        goto cleanup;
2098    }
2099
2100    ret = do_sd_create(s, s->name, s->inode.vdi_size, s->inode.vdi_id, &new_vid,
2101                       1, s->inode.copy_policy);
2102    if (ret < 0) {
2103        error_report("failed to create inode for snapshot. %s",
2104                     strerror(errno));
2105        goto cleanup;
2106    }
2107
2108    inode = (SheepdogInode *)g_malloc(datalen);
2109
2110    ret = read_object(fd, (char *)inode, vid_to_vdi_oid(new_vid),
2111                      s->inode.nr_copies, datalen, 0, s->cache_flags);
2112
2113    if (ret < 0) {
2114        error_report("failed to read new inode info. %s", strerror(errno));
2115        goto cleanup;
2116    }
2117
2118    memcpy(&s->inode, inode, datalen);
2119    DPRINTF("s->inode: name %s snap_id %x oid %x\n",
2120            s->inode.name, s->inode.snap_id, s->inode.vdi_id);
2121
2122cleanup:
2123    closesocket(fd);
2124    return ret;
2125}
2126
2127/*
2128 * We implement rollback(loadvm) operation to the specified snapshot by
2129 * 1) switch to the snapshot
2130 * 2) rely on sd_create_branch to delete working VDI and
2131 * 3) create a new working VDI based on the speicified snapshot
2132 */
2133static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
2134{
2135    BDRVSheepdogState *s = bs->opaque;
2136    BDRVSheepdogState *old_s;
2137    char tag[SD_MAX_VDI_TAG_LEN];
2138    uint32_t snapid = 0;
2139    int ret = 0;
2140
2141    old_s = g_malloc(sizeof(BDRVSheepdogState));
2142
2143    memcpy(old_s, s, sizeof(BDRVSheepdogState));
2144
2145    snapid = strtoul(snapshot_id, NULL, 10);
2146    if (snapid) {
2147        tag[0] = 0;
2148    } else {
2149        pstrcpy(tag, sizeof(tag), snapshot_id);
2150    }
2151
2152    ret = reload_inode(s, snapid, tag);
2153    if (ret) {
2154        goto out;
2155    }
2156
2157    ret = sd_create_branch(s);
2158    if (ret) {
2159        goto out;
2160    }
2161
2162    g_free(old_s);
2163
2164    return 0;
2165out:
2166    /* recover bdrv_sd_state */
2167    memcpy(s, old_s, sizeof(BDRVSheepdogState));
2168    g_free(old_s);
2169
2170    error_report("failed to open. recover old bdrv_sd_state.");
2171
2172    return ret;
2173}
2174
2175static int sd_snapshot_delete(BlockDriverState *bs,
2176                              const char *snapshot_id,
2177                              const char *name,
2178                              Error **errp)
2179{
2180    /* FIXME: Delete specified snapshot id.  */
2181    return 0;
2182}
2183
2184static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
2185{
2186    BDRVSheepdogState *s = bs->opaque;
2187    SheepdogReq req;
2188    int fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
2189    QEMUSnapshotInfo *sn_tab = NULL;
2190    unsigned wlen, rlen;
2191    int found = 0;
2192    static SheepdogInode inode;
2193    unsigned long *vdi_inuse;
2194    unsigned int start_nr;
2195    uint64_t hval;
2196    uint32_t vid;
2197
2198    vdi_inuse = g_malloc(max);
2199
2200    fd = connect_to_sdog(s);
2201    if (fd < 0) {
2202        ret = fd;
2203        goto out;
2204    }
2205
2206    rlen = max;
2207    wlen = 0;
2208
2209    memset(&req, 0, sizeof(req));
2210
2211    req.opcode = SD_OP_READ_VDIS;
2212    req.data_length = max;
2213
2214    ret = do_req(fd, (SheepdogReq *)&req, vdi_inuse, &wlen, &rlen);
2215
2216    closesocket(fd);
2217    if (ret) {
2218        goto out;
2219    }
2220
2221    sn_tab = g_malloc0(nr * sizeof(*sn_tab));
2222
2223    /* calculate a vdi id with hash function */
2224    hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
2225    start_nr = hval & (SD_NR_VDIS - 1);
2226
2227    fd = connect_to_sdog(s);
2228    if (fd < 0) {
2229        ret = fd;
2230        goto out;
2231    }
2232
2233    for (vid = start_nr; found < nr; vid = (vid + 1) % SD_NR_VDIS) {
2234        if (!test_bit(vid, vdi_inuse)) {
2235            break;
2236        }
2237
2238        /* we don't need to read entire object */
2239        ret = read_object(fd, (char *)&inode, vid_to_vdi_oid(vid),
2240                          0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
2241                          s->cache_flags);
2242
2243        if (ret) {
2244            continue;
2245        }
2246
2247        if (!strcmp(inode.name, s->name) && is_snapshot(&inode)) {
2248            sn_tab[found].date_sec = inode.snap_ctime >> 32;
2249            sn_tab[found].date_nsec = inode.snap_ctime & 0xffffffff;
2250            sn_tab[found].vm_state_size = inode.vm_state_size;
2251            sn_tab[found].vm_clock_nsec = inode.vm_clock_nsec;
2252
2253            snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str), "%u",
2254                     inode.snap_id);
2255            pstrcpy(sn_tab[found].name,
2256                    MIN(sizeof(sn_tab[found].name), sizeof(inode.tag)),
2257                    inode.tag);
2258            found++;
2259        }
2260    }
2261
2262    closesocket(fd);
2263out:
2264    *psn_tab = sn_tab;
2265
2266    g_free(vdi_inuse);
2267
2268    if (ret < 0) {
2269        return ret;
2270    }
2271
2272    return found;
2273}
2274
2275static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
2276                                int64_t pos, int size, int load)
2277{
2278    bool create;
2279    int fd, ret = 0, remaining = size;
2280    unsigned int data_len;
2281    uint64_t vmstate_oid;
2282    uint64_t offset;
2283    uint32_t vdi_index;
2284    uint32_t vdi_id = load ? s->inode.parent_vdi_id : s->inode.vdi_id;
2285
2286    fd = connect_to_sdog(s);
2287    if (fd < 0) {
2288        return fd;
2289    }
2290
2291    while (remaining) {
2292        vdi_index = pos / SD_DATA_OBJ_SIZE;
2293        offset = pos % SD_DATA_OBJ_SIZE;
2294
2295        data_len = MIN(remaining, SD_DATA_OBJ_SIZE - offset);
2296
2297        vmstate_oid = vid_to_vmstate_oid(vdi_id, vdi_index);
2298
2299        create = (offset == 0);
2300        if (load) {
2301            ret = read_object(fd, (char *)data, vmstate_oid,
2302                              s->inode.nr_copies, data_len, offset,
2303                              s->cache_flags);
2304        } else {
2305            ret = write_object(fd, (char *)data, vmstate_oid,
2306                               s->inode.nr_copies, data_len, offset, create,
2307                               s->cache_flags);
2308        }
2309
2310        if (ret < 0) {
2311            error_report("failed to save vmstate %s", strerror(errno));
2312            goto cleanup;
2313        }
2314
2315        pos += data_len;
2316        data += data_len;
2317        remaining -= data_len;
2318    }
2319    ret = size;
2320cleanup:
2321    closesocket(fd);
2322    return ret;
2323}
2324
2325static int sd_save_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
2326                           int64_t pos)
2327{
2328    BDRVSheepdogState *s = bs->opaque;
2329    void *buf;
2330    int ret;
2331
2332    buf = qemu_blockalign(bs, qiov->size);
2333    qemu_iovec_to_buf(qiov, 0, buf, qiov->size);
2334    ret = do_load_save_vmstate(s, (uint8_t *) buf, pos, qiov->size, 0);
2335    qemu_vfree(buf);
2336
2337    return ret;
2338}
2339
2340static int sd_load_vmstate(BlockDriverState *bs, uint8_t *data,
2341                           int64_t pos, int size)
2342{
2343    BDRVSheepdogState *s = bs->opaque;
2344
2345    return do_load_save_vmstate(s, data, pos, size, 1);
2346}
2347
2348
2349static coroutine_fn int sd_co_discard(BlockDriverState *bs, int64_t sector_num,
2350                                      int nb_sectors)
2351{
2352    SheepdogAIOCB *acb;
2353    QEMUIOVector dummy;
2354    BDRVSheepdogState *s = bs->opaque;
2355    int ret;
2356
2357    if (!s->discard_supported) {
2358            return 0;
2359    }
2360
2361    acb = sd_aio_setup(bs, &dummy, sector_num, nb_sectors);
2362    acb->aiocb_type = AIOCB_DISCARD_OBJ;
2363    acb->aio_done_func = sd_finish_aiocb;
2364
2365    ret = sd_co_rw_vector(acb);
2366    if (ret <= 0) {
2367        qemu_aio_release(acb);
2368        return ret;
2369    }
2370
2371    qemu_coroutine_yield();
2372
2373    return acb->ret;
2374}
2375
2376static coroutine_fn int64_t
2377sd_co_get_block_status(BlockDriverState *bs, int64_t sector_num, int nb_sectors,
2378                       int *pnum)
2379{
2380    BDRVSheepdogState *s = bs->opaque;
2381    SheepdogInode *inode = &s->inode;
2382    unsigned long start = sector_num * BDRV_SECTOR_SIZE / SD_DATA_OBJ_SIZE,
2383                  end = DIV_ROUND_UP((sector_num + nb_sectors) *
2384                                     BDRV_SECTOR_SIZE, SD_DATA_OBJ_SIZE);
2385    unsigned long idx;
2386    int64_t ret = BDRV_BLOCK_DATA;
2387
2388    for (idx = start; idx < end; idx++) {
2389        if (inode->data_vdi_id[idx] == 0) {
2390            break;
2391        }
2392    }
2393    if (idx == start) {
2394        /* Get the longest length of unallocated sectors */
2395        ret = 0;
2396        for (idx = start + 1; idx < end; idx++) {
2397            if (inode->data_vdi_id[idx] != 0) {
2398                break;
2399            }
2400        }
2401    }
2402
2403    *pnum = (idx - start) * SD_DATA_OBJ_SIZE / BDRV_SECTOR_SIZE;
2404    if (*pnum > nb_sectors) {
2405        *pnum = nb_sectors;
2406    }
2407    return ret;
2408}
2409
2410static QEMUOptionParameter sd_create_options[] = {
2411    {
2412        .name = BLOCK_OPT_SIZE,
2413        .type = OPT_SIZE,
2414        .help = "Virtual disk size"
2415    },
2416    {
2417        .name = BLOCK_OPT_BACKING_FILE,
2418        .type = OPT_STRING,
2419        .help = "File name of a base image"
2420    },
2421    {
2422        .name = BLOCK_OPT_PREALLOC,
2423        .type = OPT_STRING,
2424        .help = "Preallocation mode (allowed values: off, full)"
2425    },
2426    { NULL }
2427};
2428
2429static BlockDriver bdrv_sheepdog = {
2430    .format_name    = "sheepdog",
2431    .protocol_name  = "sheepdog",
2432    .instance_size  = sizeof(BDRVSheepdogState),
2433    .bdrv_needs_filename = true,
2434    .bdrv_file_open = sd_open,
2435    .bdrv_close     = sd_close,
2436    .bdrv_create    = sd_create,
2437    .bdrv_has_zero_init = bdrv_has_zero_init_1,
2438    .bdrv_getlength = sd_getlength,
2439    .bdrv_truncate  = sd_truncate,
2440
2441    .bdrv_co_readv  = sd_co_readv,
2442    .bdrv_co_writev = sd_co_writev,
2443    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2444    .bdrv_co_discard = sd_co_discard,
2445    .bdrv_co_get_block_status = sd_co_get_block_status,
2446
2447    .bdrv_snapshot_create   = sd_snapshot_create,
2448    .bdrv_snapshot_goto     = sd_snapshot_goto,
2449    .bdrv_snapshot_delete   = sd_snapshot_delete,
2450    .bdrv_snapshot_list     = sd_snapshot_list,
2451
2452    .bdrv_save_vmstate  = sd_save_vmstate,
2453    .bdrv_load_vmstate  = sd_load_vmstate,
2454
2455    .create_options = sd_create_options,
2456};
2457
2458static BlockDriver bdrv_sheepdog_tcp = {
2459    .format_name    = "sheepdog",
2460    .protocol_name  = "sheepdog+tcp",
2461    .instance_size  = sizeof(BDRVSheepdogState),
2462    .bdrv_needs_filename = true,
2463    .bdrv_file_open = sd_open,
2464    .bdrv_close     = sd_close,
2465    .bdrv_create    = sd_create,
2466    .bdrv_has_zero_init = bdrv_has_zero_init_1,
2467    .bdrv_getlength = sd_getlength,
2468    .bdrv_truncate  = sd_truncate,
2469
2470    .bdrv_co_readv  = sd_co_readv,
2471    .bdrv_co_writev = sd_co_writev,
2472    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2473    .bdrv_co_discard = sd_co_discard,
2474    .bdrv_co_get_block_status = sd_co_get_block_status,
2475
2476    .bdrv_snapshot_create   = sd_snapshot_create,
2477    .bdrv_snapshot_goto     = sd_snapshot_goto,
2478    .bdrv_snapshot_delete   = sd_snapshot_delete,
2479    .bdrv_snapshot_list     = sd_snapshot_list,
2480
2481    .bdrv_save_vmstate  = sd_save_vmstate,
2482    .bdrv_load_vmstate  = sd_load_vmstate,
2483
2484    .create_options = sd_create_options,
2485};
2486
2487static BlockDriver bdrv_sheepdog_unix = {
2488    .format_name    = "sheepdog",
2489    .protocol_name  = "sheepdog+unix",
2490    .instance_size  = sizeof(BDRVSheepdogState),
2491    .bdrv_needs_filename = true,
2492    .bdrv_file_open = sd_open,
2493    .bdrv_close     = sd_close,
2494    .bdrv_create    = sd_create,
2495    .bdrv_has_zero_init = bdrv_has_zero_init_1,
2496    .bdrv_getlength = sd_getlength,
2497    .bdrv_truncate  = sd_truncate,
2498
2499    .bdrv_co_readv  = sd_co_readv,
2500    .bdrv_co_writev = sd_co_writev,
2501    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2502    .bdrv_co_discard = sd_co_discard,
2503    .bdrv_co_get_block_status = sd_co_get_block_status,
2504
2505    .bdrv_snapshot_create   = sd_snapshot_create,
2506    .bdrv_snapshot_goto     = sd_snapshot_goto,
2507    .bdrv_snapshot_delete   = sd_snapshot_delete,
2508    .bdrv_snapshot_list     = sd_snapshot_list,
2509
2510    .bdrv_save_vmstate  = sd_save_vmstate,
2511    .bdrv_load_vmstate  = sd_load_vmstate,
2512
2513    .create_options = sd_create_options,
2514};
2515
2516static void bdrv_sheepdog_init(void)
2517{
2518    bdrv_register(&bdrv_sheepdog);
2519    bdrv_register(&bdrv_sheepdog_tcp);
2520    bdrv_register(&bdrv_sheepdog_unix);
2521}
2522block_init(bdrv_sheepdog_init);
2523