qemu/block/sheepdog.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
   3 *
   4 * This program is free software; you can redistribute it and/or
   5 * modify it under the terms of the GNU General Public License version
   6 * 2 as published by the Free Software Foundation.
   7 *
   8 * You should have received a copy of the GNU General Public License
   9 * along with this program. If not, see <http://www.gnu.org/licenses/>.
  10 *
  11 * Contributions after 2012-01-13 are licensed under the terms of the
  12 * GNU GPL, version 2 or (at your option) any later version.
  13 */
  14
  15#include "qemu/osdep.h"
  16#include "qapi-visit.h"
  17#include "qapi/error.h"
  18#include "qapi/qmp/qdict.h"
  19#include "qapi/qobject-input-visitor.h"
  20#include "qemu/uri.h"
  21#include "qemu/error-report.h"
  22#include "qemu/sockets.h"
  23#include "block/block_int.h"
  24#include "sysemu/block-backend.h"
  25#include "qemu/bitops.h"
  26#include "qemu/cutils.h"
  27
  28#define SD_PROTO_VER 0x01
  29
  30#define SD_DEFAULT_ADDR "localhost"
  31#define SD_DEFAULT_PORT 7000
  32
  33#define SD_OP_CREATE_AND_WRITE_OBJ  0x01
  34#define SD_OP_READ_OBJ       0x02
  35#define SD_OP_WRITE_OBJ      0x03
  36/* 0x04 is used internally by Sheepdog */
  37
  38#define SD_OP_NEW_VDI        0x11
  39#define SD_OP_LOCK_VDI       0x12
  40#define SD_OP_RELEASE_VDI    0x13
  41#define SD_OP_GET_VDI_INFO   0x14
  42#define SD_OP_READ_VDIS      0x15
  43#define SD_OP_FLUSH_VDI      0x16
  44#define SD_OP_DEL_VDI        0x17
  45#define SD_OP_GET_CLUSTER_DEFAULT   0x18
  46
  47#define SD_FLAG_CMD_WRITE    0x01
  48#define SD_FLAG_CMD_COW      0x02
  49#define SD_FLAG_CMD_CACHE    0x04 /* Writeback mode for cache */
  50#define SD_FLAG_CMD_DIRECT   0x08 /* Don't use cache */
  51
  52#define SD_RES_SUCCESS       0x00 /* Success */
  53#define SD_RES_UNKNOWN       0x01 /* Unknown error */
  54#define SD_RES_NO_OBJ        0x02 /* No object found */
  55#define SD_RES_EIO           0x03 /* I/O error */
  56#define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
  57#define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
  58#define SD_RES_SYSTEM_ERROR  0x06 /* System error */
  59#define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
  60#define SD_RES_NO_VDI        0x08 /* No vdi found */
  61#define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
  62#define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
  63#define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
  64#define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
  65#define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
  66#define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
  67#define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
  68#define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
  69#define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
  70#define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
  71#define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
  72#define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
  73#define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
  74#define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
  75#define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
  76#define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
  77#define SD_RES_HALT          0x19 /* Sheepdog is stopped serving IO request */
  78#define SD_RES_READONLY      0x1A /* Object is read-only */
  79
  80/*
  81 * Object ID rules
  82 *
  83 *  0 - 19 (20 bits): data object space
  84 * 20 - 31 (12 bits): reserved data object space
  85 * 32 - 55 (24 bits): vdi object space
  86 * 56 - 59 ( 4 bits): reserved vdi object space
  87 * 60 - 63 ( 4 bits): object type identifier space
  88 */
  89
  90#define VDI_SPACE_SHIFT   32
  91#define VDI_BIT (UINT64_C(1) << 63)
  92#define VMSTATE_BIT (UINT64_C(1) << 62)
  93#define MAX_DATA_OBJS (UINT64_C(1) << 20)
  94#define MAX_CHILDREN 1024
  95#define SD_MAX_VDI_LEN 256
  96#define SD_MAX_VDI_TAG_LEN 256
  97#define SD_NR_VDIS   (1U << 24)
  98#define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
  99#define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
 100#define SD_DEFAULT_BLOCK_SIZE_SHIFT 22
 101/*
 102 * For erasure coding, we use at most SD_EC_MAX_STRIP for data strips and
 103 * (SD_EC_MAX_STRIP - 1) for parity strips
 104 *
 105 * SD_MAX_COPIES is sum of number of data strips and parity strips.
 106 */
 107#define SD_EC_MAX_STRIP 16
 108#define SD_MAX_COPIES (SD_EC_MAX_STRIP * 2 - 1)
 109
 110#define SD_INODE_SIZE (sizeof(SheepdogInode))
 111#define CURRENT_VDI_ID 0
 112
 113#define LOCK_TYPE_NORMAL 0
 114#define LOCK_TYPE_SHARED 1      /* for iSCSI multipath */
 115
 116typedef struct SheepdogReq {
 117    uint8_t proto_ver;
 118    uint8_t opcode;
 119    uint16_t flags;
 120    uint32_t epoch;
 121    uint32_t id;
 122    uint32_t data_length;
 123    uint32_t opcode_specific[8];
 124} SheepdogReq;
 125
 126typedef struct SheepdogRsp {
 127    uint8_t proto_ver;
 128    uint8_t opcode;
 129    uint16_t flags;
 130    uint32_t epoch;
 131    uint32_t id;
 132    uint32_t data_length;
 133    uint32_t result;
 134    uint32_t opcode_specific[7];
 135} SheepdogRsp;
 136
 137typedef struct SheepdogObjReq {
 138    uint8_t proto_ver;
 139    uint8_t opcode;
 140    uint16_t flags;
 141    uint32_t epoch;
 142    uint32_t id;
 143    uint32_t data_length;
 144    uint64_t oid;
 145    uint64_t cow_oid;
 146    uint8_t copies;
 147    uint8_t copy_policy;
 148    uint8_t reserved[6];
 149    uint64_t offset;
 150} SheepdogObjReq;
 151
 152typedef struct SheepdogObjRsp {
 153    uint8_t proto_ver;
 154    uint8_t opcode;
 155    uint16_t flags;
 156    uint32_t epoch;
 157    uint32_t id;
 158    uint32_t data_length;
 159    uint32_t result;
 160    uint8_t copies;
 161    uint8_t copy_policy;
 162    uint8_t reserved[2];
 163    uint32_t pad[6];
 164} SheepdogObjRsp;
 165
 166typedef struct SheepdogVdiReq {
 167    uint8_t proto_ver;
 168    uint8_t opcode;
 169    uint16_t flags;
 170    uint32_t epoch;
 171    uint32_t id;
 172    uint32_t data_length;
 173    uint64_t vdi_size;
 174    uint32_t base_vdi_id;
 175    uint8_t copies;
 176    uint8_t copy_policy;
 177    uint8_t store_policy;
 178    uint8_t block_size_shift;
 179    uint32_t snapid;
 180    uint32_t type;
 181    uint32_t pad[2];
 182} SheepdogVdiReq;
 183
 184typedef struct SheepdogVdiRsp {
 185    uint8_t proto_ver;
 186    uint8_t opcode;
 187    uint16_t flags;
 188    uint32_t epoch;
 189    uint32_t id;
 190    uint32_t data_length;
 191    uint32_t result;
 192    uint32_t rsvd;
 193    uint32_t vdi_id;
 194    uint32_t pad[5];
 195} SheepdogVdiRsp;
 196
 197typedef struct SheepdogClusterRsp {
 198    uint8_t proto_ver;
 199    uint8_t opcode;
 200    uint16_t flags;
 201    uint32_t epoch;
 202    uint32_t id;
 203    uint32_t data_length;
 204    uint32_t result;
 205    uint8_t nr_copies;
 206    uint8_t copy_policy;
 207    uint8_t block_size_shift;
 208    uint8_t __pad1;
 209    uint32_t __pad2[6];
 210} SheepdogClusterRsp;
 211
 212typedef struct SheepdogInode {
 213    char name[SD_MAX_VDI_LEN];
 214    char tag[SD_MAX_VDI_TAG_LEN];
 215    uint64_t ctime;
 216    uint64_t snap_ctime;
 217    uint64_t vm_clock_nsec;
 218    uint64_t vdi_size;
 219    uint64_t vm_state_size;
 220    uint16_t copy_policy;
 221    uint8_t nr_copies;
 222    uint8_t block_size_shift;
 223    uint32_t snap_id;
 224    uint32_t vdi_id;
 225    uint32_t parent_vdi_id;
 226    uint32_t child_vdi_id[MAX_CHILDREN];
 227    uint32_t data_vdi_id[MAX_DATA_OBJS];
 228} SheepdogInode;
 229
 230#define SD_INODE_HEADER_SIZE offsetof(SheepdogInode, data_vdi_id)
 231
 232/*
 233 * 64 bit FNV-1a non-zero initial basis
 234 */
 235#define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
 236
 237/*
 238 * 64 bit Fowler/Noll/Vo FNV-1a hash code
 239 */
 240static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
 241{
 242    unsigned char *bp = buf;
 243    unsigned char *be = bp + len;
 244    while (bp < be) {
 245        hval ^= (uint64_t) *bp++;
 246        hval += (hval << 1) + (hval << 4) + (hval << 5) +
 247            (hval << 7) + (hval << 8) + (hval << 40);
 248    }
 249    return hval;
 250}
 251
 252static inline bool is_data_obj_writable(SheepdogInode *inode, unsigned int idx)
 253{
 254    return inode->vdi_id == inode->data_vdi_id[idx];
 255}
 256
 257static inline bool is_data_obj(uint64_t oid)
 258{
 259    return !(VDI_BIT & oid);
 260}
 261
 262static inline uint64_t data_oid_to_idx(uint64_t oid)
 263{
 264    return oid & (MAX_DATA_OBJS - 1);
 265}
 266
 267static inline uint32_t oid_to_vid(uint64_t oid)
 268{
 269    return (oid & ~VDI_BIT) >> VDI_SPACE_SHIFT;
 270}
 271
 272static inline uint64_t vid_to_vdi_oid(uint32_t vid)
 273{
 274    return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
 275}
 276
 277static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
 278{
 279    return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
 280}
 281
 282static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
 283{
 284    return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
 285}
 286
 287static inline bool is_snapshot(struct SheepdogInode *inode)
 288{
 289    return !!inode->snap_ctime;
 290}
 291
 292static inline size_t count_data_objs(const struct SheepdogInode *inode)
 293{
 294    return DIV_ROUND_UP(inode->vdi_size,
 295                        (1UL << inode->block_size_shift));
 296}
 297
 298#undef DPRINTF
 299#ifdef DEBUG_SDOG
 300#define DEBUG_SDOG_PRINT 1
 301#else
 302#define DEBUG_SDOG_PRINT 0
 303#endif
 304#define DPRINTF(fmt, args...)                                           \
 305    do {                                                                \
 306        if (DEBUG_SDOG_PRINT) {                                         \
 307            fprintf(stderr, "%s %d: " fmt, __func__, __LINE__, ##args); \
 308        }                                                               \
 309    } while (0)
 310
 311typedef struct SheepdogAIOCB SheepdogAIOCB;
 312typedef struct BDRVSheepdogState BDRVSheepdogState;
 313
 314typedef struct AIOReq {
 315    SheepdogAIOCB *aiocb;
 316    unsigned int iov_offset;
 317
 318    uint64_t oid;
 319    uint64_t base_oid;
 320    uint64_t offset;
 321    unsigned int data_len;
 322    uint8_t flags;
 323    uint32_t id;
 324    bool create;
 325
 326    QLIST_ENTRY(AIOReq) aio_siblings;
 327} AIOReq;
 328
 329enum AIOCBState {
 330    AIOCB_WRITE_UDATA,
 331    AIOCB_READ_UDATA,
 332    AIOCB_FLUSH_CACHE,
 333    AIOCB_DISCARD_OBJ,
 334};
 335
 336#define AIOCBOverlapping(x, y)                                 \
 337    (!(x->max_affect_data_idx < y->min_affect_data_idx          \
 338       || y->max_affect_data_idx < x->min_affect_data_idx))
 339
 340struct SheepdogAIOCB {
 341    BDRVSheepdogState *s;
 342
 343    QEMUIOVector *qiov;
 344
 345    int64_t sector_num;
 346    int nb_sectors;
 347
 348    int ret;
 349    enum AIOCBState aiocb_type;
 350
 351    Coroutine *coroutine;
 352    int nr_pending;
 353
 354    uint32_t min_affect_data_idx;
 355    uint32_t max_affect_data_idx;
 356
 357    /*
 358     * The difference between affect_data_idx and dirty_data_idx:
 359     * affect_data_idx represents range of index of all request types.
 360     * dirty_data_idx represents range of index updated by COW requests.
 361     * dirty_data_idx is used for updating an inode object.
 362     */
 363    uint32_t min_dirty_data_idx;
 364    uint32_t max_dirty_data_idx;
 365
 366    QLIST_ENTRY(SheepdogAIOCB) aiocb_siblings;
 367};
 368
 369struct BDRVSheepdogState {
 370    BlockDriverState *bs;
 371    AioContext *aio_context;
 372
 373    SheepdogInode inode;
 374
 375    char name[SD_MAX_VDI_LEN];
 376    bool is_snapshot;
 377    uint32_t cache_flags;
 378    bool discard_supported;
 379
 380    SocketAddress *addr;
 381    int fd;
 382
 383    CoMutex lock;
 384    Coroutine *co_send;
 385    Coroutine *co_recv;
 386
 387    uint32_t aioreq_seq_num;
 388
 389    /* Every aio request must be linked to either of these queues. */
 390    QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
 391    QLIST_HEAD(failed_aio_head, AIOReq) failed_aio_head;
 392
 393    CoMutex queue_lock;
 394    CoQueue overlapping_queue;
 395    QLIST_HEAD(inflight_aiocb_head, SheepdogAIOCB) inflight_aiocb_head;
 396};
 397
 398typedef struct BDRVSheepdogReopenState {
 399    int fd;
 400    int cache_flags;
 401} BDRVSheepdogReopenState;
 402
 403static const char * sd_strerror(int err)
 404{
 405    int i;
 406
 407    static const struct {
 408        int err;
 409        const char *desc;
 410    } errors[] = {
 411        {SD_RES_SUCCESS, "Success"},
 412        {SD_RES_UNKNOWN, "Unknown error"},
 413        {SD_RES_NO_OBJ, "No object found"},
 414        {SD_RES_EIO, "I/O error"},
 415        {SD_RES_VDI_EXIST, "VDI exists already"},
 416        {SD_RES_INVALID_PARMS, "Invalid parameters"},
 417        {SD_RES_SYSTEM_ERROR, "System error"},
 418        {SD_RES_VDI_LOCKED, "VDI is already locked"},
 419        {SD_RES_NO_VDI, "No vdi found"},
 420        {SD_RES_NO_BASE_VDI, "No base VDI found"},
 421        {SD_RES_VDI_READ, "Failed read the requested VDI"},
 422        {SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
 423        {SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
 424        {SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
 425        {SD_RES_NO_TAG, "Failed to find the requested tag"},
 426        {SD_RES_STARTUP, "The system is still booting"},
 427        {SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
 428        {SD_RES_SHUTDOWN, "The system is shutting down"},
 429        {SD_RES_NO_MEM, "Out of memory on the server"},
 430        {SD_RES_FULL_VDI, "We already have the maximum vdis"},
 431        {SD_RES_VER_MISMATCH, "Protocol version mismatch"},
 432        {SD_RES_NO_SPACE, "Server has no space for new objects"},
 433        {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
 434        {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
 435        {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
 436        {SD_RES_HALT, "Sheepdog is stopped serving IO request"},
 437        {SD_RES_READONLY, "Object is read-only"},
 438    };
 439
 440    for (i = 0; i < ARRAY_SIZE(errors); ++i) {
 441        if (errors[i].err == err) {
 442            return errors[i].desc;
 443        }
 444    }
 445
 446    return "Invalid error code";
 447}
 448
 449/*
 450 * Sheepdog I/O handling:
 451 *
 452 * 1. In sd_co_rw_vector, we send the I/O requests to the server and
 453 *    link the requests to the inflight_list in the
 454 *    BDRVSheepdogState.  The function yields while waiting for
 455 *    receiving the response.
 456 *
 457 * 2. We receive the response in aio_read_response, the fd handler to
 458 *    the sheepdog connection.  We switch back to sd_co_readv/sd_writev
 459 *    after all the requests belonging to the AIOCB are finished.  If
 460 *    needed, sd_co_writev will send another requests for the vdi object.
 461 */
 462
 463static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
 464                                    uint64_t oid, unsigned int data_len,
 465                                    uint64_t offset, uint8_t flags, bool create,
 466                                    uint64_t base_oid, unsigned int iov_offset)
 467{
 468    AIOReq *aio_req;
 469
 470    aio_req = g_malloc(sizeof(*aio_req));
 471    aio_req->aiocb = acb;
 472    aio_req->iov_offset = iov_offset;
 473    aio_req->oid = oid;
 474    aio_req->base_oid = base_oid;
 475    aio_req->offset = offset;
 476    aio_req->data_len = data_len;
 477    aio_req->flags = flags;
 478    aio_req->id = s->aioreq_seq_num++;
 479    aio_req->create = create;
 480
 481    acb->nr_pending++;
 482    return aio_req;
 483}
 484
 485static void wait_for_overlapping_aiocb(BDRVSheepdogState *s, SheepdogAIOCB *acb)
 486{
 487    SheepdogAIOCB *cb;
 488
 489retry:
 490    QLIST_FOREACH(cb, &s->inflight_aiocb_head, aiocb_siblings) {
 491        if (AIOCBOverlapping(acb, cb)) {
 492            qemu_co_queue_wait(&s->overlapping_queue, &s->queue_lock);
 493            goto retry;
 494        }
 495    }
 496}
 497
 498static void sd_aio_setup(SheepdogAIOCB *acb, BDRVSheepdogState *s,
 499                         QEMUIOVector *qiov, int64_t sector_num, int nb_sectors,
 500                         int type)
 501{
 502    uint32_t object_size;
 503
 504    object_size = (UINT32_C(1) << s->inode.block_size_shift);
 505
 506    acb->s = s;
 507
 508    acb->qiov = qiov;
 509
 510    acb->sector_num = sector_num;
 511    acb->nb_sectors = nb_sectors;
 512
 513    acb->coroutine = qemu_coroutine_self();
 514    acb->ret = 0;
 515    acb->nr_pending = 0;
 516
 517    acb->min_affect_data_idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
 518    acb->max_affect_data_idx = (acb->sector_num * BDRV_SECTOR_SIZE +
 519                              acb->nb_sectors * BDRV_SECTOR_SIZE) / object_size;
 520
 521    acb->min_dirty_data_idx = UINT32_MAX;
 522    acb->max_dirty_data_idx = 0;
 523    acb->aiocb_type = type;
 524
 525    if (type == AIOCB_FLUSH_CACHE) {
 526        return;
 527    }
 528
 529    qemu_co_mutex_lock(&s->queue_lock);
 530    wait_for_overlapping_aiocb(s, acb);
 531    QLIST_INSERT_HEAD(&s->inflight_aiocb_head, acb, aiocb_siblings);
 532    qemu_co_mutex_unlock(&s->queue_lock);
 533}
 534
 535static SocketAddress *sd_socket_address(const char *path,
 536                                        const char *host, const char *port)
 537{
 538    SocketAddress *addr = g_new0(SocketAddress, 1);
 539
 540    if (path) {
 541        addr->type = SOCKET_ADDRESS_TYPE_UNIX;
 542        addr->u.q_unix.path = g_strdup(path);
 543    } else {
 544        addr->type = SOCKET_ADDRESS_TYPE_INET;
 545        addr->u.inet.host = g_strdup(host ?: SD_DEFAULT_ADDR);
 546        addr->u.inet.port = g_strdup(port ?: stringify(SD_DEFAULT_PORT));
 547    }
 548
 549    return addr;
 550}
 551
 552static SocketAddress *sd_server_config(QDict *options, Error **errp)
 553{
 554    QDict *server = NULL;
 555    QObject *crumpled_server = NULL;
 556    Visitor *iv = NULL;
 557    SocketAddress *saddr = NULL;
 558    Error *local_err = NULL;
 559
 560    qdict_extract_subqdict(options, &server, "server.");
 561
 562    crumpled_server = qdict_crumple(server, errp);
 563    if (!crumpled_server) {
 564        goto done;
 565    }
 566
 567    /*
 568     * FIXME .numeric, .to, .ipv4 or .ipv6 don't work with -drive
 569     * server.type=inet.  .to doesn't matter, it's ignored anyway.
 570     * That's because when @options come from -blockdev or
 571     * blockdev_add, members are typed according to the QAPI schema,
 572     * but when they come from -drive, they're all QString.  The
 573     * visitor expects the former.
 574     */
 575    iv = qobject_input_visitor_new(crumpled_server);
 576    visit_type_SocketAddress(iv, NULL, &saddr, &local_err);
 577    if (local_err) {
 578        error_propagate(errp, local_err);
 579        goto done;
 580    }
 581
 582done:
 583    visit_free(iv);
 584    qobject_decref(crumpled_server);
 585    QDECREF(server);
 586    return saddr;
 587}
 588
 589/* Return -EIO in case of error, file descriptor on success */
 590static int connect_to_sdog(BDRVSheepdogState *s, Error **errp)
 591{
 592    int fd;
 593
 594    fd = socket_connect(s->addr, errp);
 595
 596    if (s->addr->type == SOCKET_ADDRESS_TYPE_INET && fd >= 0) {
 597        int ret = socket_set_nodelay(fd);
 598        if (ret < 0) {
 599            error_report("%s", strerror(errno));
 600        }
 601    }
 602
 603    if (fd >= 0) {
 604        qemu_set_nonblock(fd);
 605    } else {
 606        fd = -EIO;
 607    }
 608
 609    return fd;
 610}
 611
 612/* Return 0 on success and -errno in case of error */
 613static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
 614                                    unsigned int *wlen)
 615{
 616    int ret;
 617
 618    ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
 619    if (ret != sizeof(*hdr)) {
 620        error_report("failed to send a req, %s", strerror(errno));
 621        return -errno;
 622    }
 623
 624    ret = qemu_co_send(sockfd, data, *wlen);
 625    if (ret != *wlen) {
 626        error_report("failed to send a req, %s", strerror(errno));
 627        return -errno;
 628    }
 629
 630    return ret;
 631}
 632
 633typedef struct SheepdogReqCo {
 634    int sockfd;
 635    BlockDriverState *bs;
 636    AioContext *aio_context;
 637    SheepdogReq *hdr;
 638    void *data;
 639    unsigned int *wlen;
 640    unsigned int *rlen;
 641    int ret;
 642    bool finished;
 643    Coroutine *co;
 644} SheepdogReqCo;
 645
 646static void restart_co_req(void *opaque)
 647{
 648    SheepdogReqCo *srco = opaque;
 649
 650    aio_co_wake(srco->co);
 651}
 652
 653static coroutine_fn void do_co_req(void *opaque)
 654{
 655    int ret;
 656    SheepdogReqCo *srco = opaque;
 657    int sockfd = srco->sockfd;
 658    SheepdogReq *hdr = srco->hdr;
 659    void *data = srco->data;
 660    unsigned int *wlen = srco->wlen;
 661    unsigned int *rlen = srco->rlen;
 662
 663    srco->co = qemu_coroutine_self();
 664    aio_set_fd_handler(srco->aio_context, sockfd, false,
 665                       NULL, restart_co_req, NULL, srco);
 666
 667    ret = send_co_req(sockfd, hdr, data, wlen);
 668    if (ret < 0) {
 669        goto out;
 670    }
 671
 672    aio_set_fd_handler(srco->aio_context, sockfd, false,
 673                       restart_co_req, NULL, NULL, srco);
 674
 675    ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
 676    if (ret != sizeof(*hdr)) {
 677        error_report("failed to get a rsp, %s", strerror(errno));
 678        ret = -errno;
 679        goto out;
 680    }
 681
 682    if (*rlen > hdr->data_length) {
 683        *rlen = hdr->data_length;
 684    }
 685
 686    if (*rlen) {
 687        ret = qemu_co_recv(sockfd, data, *rlen);
 688        if (ret != *rlen) {
 689            error_report("failed to get the data, %s", strerror(errno));
 690            ret = -errno;
 691            goto out;
 692        }
 693    }
 694    ret = 0;
 695out:
 696    /* there is at most one request for this sockfd, so it is safe to
 697     * set each handler to NULL. */
 698    aio_set_fd_handler(srco->aio_context, sockfd, false,
 699                       NULL, NULL, NULL, NULL);
 700
 701    srco->co = NULL;
 702    srco->ret = ret;
 703    /* Set srco->finished before reading bs->wakeup.  */
 704    atomic_mb_set(&srco->finished, true);
 705    if (srco->bs) {
 706        bdrv_wakeup(srco->bs);
 707    }
 708}
 709
 710/*
 711 * Send the request to the sheep in a synchronous manner.
 712 *
 713 * Return 0 on success, -errno in case of error.
 714 */
 715static int do_req(int sockfd, BlockDriverState *bs, SheepdogReq *hdr,
 716                  void *data, unsigned int *wlen, unsigned int *rlen)
 717{
 718    Coroutine *co;
 719    SheepdogReqCo srco = {
 720        .sockfd = sockfd,
 721        .aio_context = bs ? bdrv_get_aio_context(bs) : qemu_get_aio_context(),
 722        .bs = bs,
 723        .hdr = hdr,
 724        .data = data,
 725        .wlen = wlen,
 726        .rlen = rlen,
 727        .ret = 0,
 728        .finished = false,
 729    };
 730
 731    if (qemu_in_coroutine()) {
 732        do_co_req(&srco);
 733    } else {
 734        co = qemu_coroutine_create(do_co_req, &srco);
 735        if (bs) {
 736            bdrv_coroutine_enter(bs, co);
 737            BDRV_POLL_WHILE(bs, !srco.finished);
 738        } else {
 739            qemu_coroutine_enter(co);
 740            while (!srco.finished) {
 741                aio_poll(qemu_get_aio_context(), true);
 742            }
 743        }
 744    }
 745
 746    return srco.ret;
 747}
 748
 749static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
 750                                         struct iovec *iov, int niov,
 751                                         enum AIOCBState aiocb_type);
 752static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
 753static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag);
 754static int get_sheep_fd(BDRVSheepdogState *s, Error **errp);
 755static void co_write_request(void *opaque);
 756
 757static coroutine_fn void reconnect_to_sdog(void *opaque)
 758{
 759    BDRVSheepdogState *s = opaque;
 760    AIOReq *aio_req, *next;
 761
 762    aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
 763                       NULL, NULL, NULL);
 764    close(s->fd);
 765    s->fd = -1;
 766
 767    /* Wait for outstanding write requests to be completed. */
 768    while (s->co_send != NULL) {
 769        co_write_request(opaque);
 770    }
 771
 772    /* Try to reconnect the sheepdog server every one second. */
 773    while (s->fd < 0) {
 774        Error *local_err = NULL;
 775        s->fd = get_sheep_fd(s, &local_err);
 776        if (s->fd < 0) {
 777            DPRINTF("Wait for connection to be established\n");
 778            error_report_err(local_err);
 779            co_aio_sleep_ns(bdrv_get_aio_context(s->bs), QEMU_CLOCK_REALTIME,
 780                            1000000000ULL);
 781        }
 782    };
 783
 784    /*
 785     * Now we have to resend all the request in the inflight queue.  However,
 786     * resend_aioreq() can yield and newly created requests can be added to the
 787     * inflight queue before the coroutine is resumed.  To avoid mixing them, we
 788     * have to move all the inflight requests to the failed queue before
 789     * resend_aioreq() is called.
 790     */
 791    qemu_co_mutex_lock(&s->queue_lock);
 792    QLIST_FOREACH_SAFE(aio_req, &s->inflight_aio_head, aio_siblings, next) {
 793        QLIST_REMOVE(aio_req, aio_siblings);
 794        QLIST_INSERT_HEAD(&s->failed_aio_head, aio_req, aio_siblings);
 795    }
 796
 797    /* Resend all the failed aio requests. */
 798    while (!QLIST_EMPTY(&s->failed_aio_head)) {
 799        aio_req = QLIST_FIRST(&s->failed_aio_head);
 800        QLIST_REMOVE(aio_req, aio_siblings);
 801        qemu_co_mutex_unlock(&s->queue_lock);
 802        resend_aioreq(s, aio_req);
 803        qemu_co_mutex_lock(&s->queue_lock);
 804    }
 805    qemu_co_mutex_unlock(&s->queue_lock);
 806}
 807
 808/*
 809 * Receive responses of the I/O requests.
 810 *
 811 * This function is registered as a fd handler, and called from the
 812 * main loop when s->fd is ready for reading responses.
 813 */
 814static void coroutine_fn aio_read_response(void *opaque)
 815{
 816    SheepdogObjRsp rsp;
 817    BDRVSheepdogState *s = opaque;
 818    int fd = s->fd;
 819    int ret;
 820    AIOReq *aio_req = NULL;
 821    SheepdogAIOCB *acb;
 822    uint64_t idx;
 823
 824    /* read a header */
 825    ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
 826    if (ret != sizeof(rsp)) {
 827        error_report("failed to get the header, %s", strerror(errno));
 828        goto err;
 829    }
 830
 831    /* find the right aio_req from the inflight aio list */
 832    QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
 833        if (aio_req->id == rsp.id) {
 834            break;
 835        }
 836    }
 837    if (!aio_req) {
 838        error_report("cannot find aio_req %x", rsp.id);
 839        goto err;
 840    }
 841
 842    acb = aio_req->aiocb;
 843
 844    switch (acb->aiocb_type) {
 845    case AIOCB_WRITE_UDATA:
 846        if (!is_data_obj(aio_req->oid)) {
 847            break;
 848        }
 849        idx = data_oid_to_idx(aio_req->oid);
 850
 851        if (aio_req->create) {
 852            /*
 853             * If the object is newly created one, we need to update
 854             * the vdi object (metadata object).  min_dirty_data_idx
 855             * and max_dirty_data_idx are changed to include updated
 856             * index between them.
 857             */
 858            if (rsp.result == SD_RES_SUCCESS) {
 859                s->inode.data_vdi_id[idx] = s->inode.vdi_id;
 860                acb->max_dirty_data_idx = MAX(idx, acb->max_dirty_data_idx);
 861                acb->min_dirty_data_idx = MIN(idx, acb->min_dirty_data_idx);
 862            }
 863        }
 864        break;
 865    case AIOCB_READ_UDATA:
 866        ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
 867                            aio_req->iov_offset, rsp.data_length);
 868        if (ret != rsp.data_length) {
 869            error_report("failed to get the data, %s", strerror(errno));
 870            goto err;
 871        }
 872        break;
 873    case AIOCB_FLUSH_CACHE:
 874        if (rsp.result == SD_RES_INVALID_PARMS) {
 875            DPRINTF("disable cache since the server doesn't support it\n");
 876            s->cache_flags = SD_FLAG_CMD_DIRECT;
 877            rsp.result = SD_RES_SUCCESS;
 878        }
 879        break;
 880    case AIOCB_DISCARD_OBJ:
 881        switch (rsp.result) {
 882        case SD_RES_INVALID_PARMS:
 883            error_report("server doesn't support discard command");
 884            rsp.result = SD_RES_SUCCESS;
 885            s->discard_supported = false;
 886            break;
 887        default:
 888            break;
 889        }
 890    }
 891
 892    /* No more data for this aio_req (reload_inode below uses its own file
 893     * descriptor handler which doesn't use co_recv).
 894    */
 895    s->co_recv = NULL;
 896
 897    qemu_co_mutex_lock(&s->queue_lock);
 898    QLIST_REMOVE(aio_req, aio_siblings);
 899    qemu_co_mutex_unlock(&s->queue_lock);
 900
 901    switch (rsp.result) {
 902    case SD_RES_SUCCESS:
 903        break;
 904    case SD_RES_READONLY:
 905        if (s->inode.vdi_id == oid_to_vid(aio_req->oid)) {
 906            ret = reload_inode(s, 0, "");
 907            if (ret < 0) {
 908                goto err;
 909            }
 910        }
 911        if (is_data_obj(aio_req->oid)) {
 912            aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
 913                                           data_oid_to_idx(aio_req->oid));
 914        } else {
 915            aio_req->oid = vid_to_vdi_oid(s->inode.vdi_id);
 916        }
 917        resend_aioreq(s, aio_req);
 918        return;
 919    default:
 920        acb->ret = -EIO;
 921        error_report("%s", sd_strerror(rsp.result));
 922        break;
 923    }
 924
 925    g_free(aio_req);
 926
 927    if (!--acb->nr_pending) {
 928        /*
 929         * We've finished all requests which belong to the AIOCB, so
 930         * we can switch back to sd_co_readv/writev now.
 931         */
 932        aio_co_wake(acb->coroutine);
 933    }
 934
 935    return;
 936
 937err:
 938    reconnect_to_sdog(opaque);
 939}
 940
 941static void co_read_response(void *opaque)
 942{
 943    BDRVSheepdogState *s = opaque;
 944
 945    if (!s->co_recv) {
 946        s->co_recv = qemu_coroutine_create(aio_read_response, opaque);
 947    }
 948
 949    aio_co_enter(s->aio_context, s->co_recv);
 950}
 951
 952static void co_write_request(void *opaque)
 953{
 954    BDRVSheepdogState *s = opaque;
 955
 956    aio_co_wake(s->co_send);
 957}
 958
 959/*
 960 * Return a socket descriptor to read/write objects.
 961 *
 962 * We cannot use this descriptor for other operations because
 963 * the block driver may be on waiting response from the server.
 964 */
 965static int get_sheep_fd(BDRVSheepdogState *s, Error **errp)
 966{
 967    int fd;
 968
 969    fd = connect_to_sdog(s, errp);
 970    if (fd < 0) {
 971        return fd;
 972    }
 973
 974    aio_set_fd_handler(s->aio_context, fd, false,
 975                       co_read_response, NULL, NULL, s);
 976    return fd;
 977}
 978
 979/*
 980 * Parse numeric snapshot ID in @str
 981 * If @str can't be parsed as number, return false.
 982 * Else, if the number is zero or too large, set *@snapid to zero and
 983 * return true.
 984 * Else, set *@snapid to the number and return true.
 985 */
 986static bool sd_parse_snapid(const char *str, uint32_t *snapid)
 987{
 988    unsigned long ul;
 989    int ret;
 990
 991    ret = qemu_strtoul(str, NULL, 10, &ul);
 992    if (ret == -ERANGE) {
 993        ul = ret = 0;
 994    }
 995    if (ret) {
 996        return false;
 997    }
 998    if (ul > UINT32_MAX) {
 999        ul = 0;
1000    }
1001
1002    *snapid = ul;
1003    return true;
1004}
1005
1006static bool sd_parse_snapid_or_tag(const char *str,
1007                                   uint32_t *snapid, char tag[])
1008{
1009    if (!sd_parse_snapid(str, snapid)) {
1010        *snapid = 0;
1011        if (g_strlcpy(tag, str, SD_MAX_VDI_TAG_LEN) >= SD_MAX_VDI_TAG_LEN) {
1012            return false;
1013        }
1014    } else if (!*snapid) {
1015        return false;
1016    } else {
1017        tag[0] = 0;
1018    }
1019    return true;
1020}
1021
1022typedef struct {
1023    const char *path;           /* non-null iff transport is tcp */
1024    const char *host;           /* valid when transport is tcp */
1025    int port;                   /* valid when transport is tcp */
1026    char vdi[SD_MAX_VDI_LEN];
1027    char tag[SD_MAX_VDI_TAG_LEN];
1028    uint32_t snap_id;
1029    /* Remainder is only for sd_config_done() */
1030    URI *uri;
1031    QueryParams *qp;
1032} SheepdogConfig;
1033
1034static void sd_config_done(SheepdogConfig *cfg)
1035{
1036    if (cfg->qp) {
1037        query_params_free(cfg->qp);
1038    }
1039    uri_free(cfg->uri);
1040}
1041
1042static void sd_parse_uri(SheepdogConfig *cfg, const char *filename,
1043                         Error **errp)
1044{
1045    Error *err = NULL;
1046    QueryParams *qp = NULL;
1047    bool is_unix;
1048    URI *uri;
1049
1050    memset(cfg, 0, sizeof(*cfg));
1051
1052    cfg->uri = uri = uri_parse(filename);
1053    if (!uri) {
1054        error_setg(&err, "invalid URI");
1055        goto out;
1056    }
1057
1058    /* transport */
1059    if (!g_strcmp0(uri->scheme, "sheepdog")) {
1060        is_unix = false;
1061    } else if (!g_strcmp0(uri->scheme, "sheepdog+tcp")) {
1062        is_unix = false;
1063    } else if (!g_strcmp0(uri->scheme, "sheepdog+unix")) {
1064        is_unix = true;
1065    } else {
1066        error_setg(&err, "URI scheme must be 'sheepdog', 'sheepdog+tcp',"
1067                   " or 'sheepdog+unix'");
1068        goto out;
1069    }
1070
1071    if (uri->path == NULL || !strcmp(uri->path, "/")) {
1072        error_setg(&err, "missing file path in URI");
1073        goto out;
1074    }
1075    if (g_strlcpy(cfg->vdi, uri->path + 1, SD_MAX_VDI_LEN)
1076        >= SD_MAX_VDI_LEN) {
1077        error_setg(&err, "VDI name is too long");
1078        goto out;
1079    }
1080
1081    cfg->qp = qp = query_params_parse(uri->query);
1082
1083    if (is_unix) {
1084        /* sheepdog+unix:///vdiname?socket=path */
1085        if (uri->server || uri->port) {
1086            error_setg(&err, "URI scheme %s doesn't accept a server address",
1087                       uri->scheme);
1088            goto out;
1089        }
1090        if (!qp->n) {
1091            error_setg(&err,
1092                       "URI scheme %s requires query parameter 'socket'",
1093                       uri->scheme);
1094            goto out;
1095        }
1096        if (qp->n != 1 || strcmp(qp->p[0].name, "socket")) {
1097            error_setg(&err, "unexpected query parameters");
1098            goto out;
1099        }
1100        cfg->path = qp->p[0].value;
1101    } else {
1102        /* sheepdog[+tcp]://[host:port]/vdiname */
1103        if (qp->n) {
1104            error_setg(&err, "unexpected query parameters");
1105            goto out;
1106        }
1107        cfg->host = uri->server;
1108        cfg->port = uri->port;
1109    }
1110
1111    /* snapshot tag */
1112    if (uri->fragment) {
1113        if (!sd_parse_snapid_or_tag(uri->fragment,
1114                                    &cfg->snap_id, cfg->tag)) {
1115            error_setg(&err, "'%s' is not a valid snapshot ID",
1116                       uri->fragment);
1117            goto out;
1118        }
1119    } else {
1120        cfg->snap_id = CURRENT_VDI_ID; /* search current vdi */
1121    }
1122
1123out:
1124    if (err) {
1125        error_propagate(errp, err);
1126        sd_config_done(cfg);
1127    }
1128}
1129
1130/*
1131 * Parse a filename (old syntax)
1132 *
1133 * filename must be one of the following formats:
1134 *   1. [vdiname]
1135 *   2. [vdiname]:[snapid]
1136 *   3. [vdiname]:[tag]
1137 *   4. [hostname]:[port]:[vdiname]
1138 *   5. [hostname]:[port]:[vdiname]:[snapid]
1139 *   6. [hostname]:[port]:[vdiname]:[tag]
1140 *
1141 * You can boot from the snapshot images by specifying `snapid` or
1142 * `tag'.
1143 *
1144 * You can run VMs outside the Sheepdog cluster by specifying
1145 * `hostname' and `port' (experimental).
1146 */
1147static void parse_vdiname(SheepdogConfig *cfg, const char *filename,
1148                          Error **errp)
1149{
1150    Error *err = NULL;
1151    char *p, *q, *uri;
1152    const char *host_spec, *vdi_spec;
1153    int nr_sep;
1154
1155    strstart(filename, "sheepdog:", &filename);
1156    p = q = g_strdup(filename);
1157
1158    /* count the number of separators */
1159    nr_sep = 0;
1160    while (*p) {
1161        if (*p == ':') {
1162            nr_sep++;
1163        }
1164        p++;
1165    }
1166    p = q;
1167
1168    /* use the first two tokens as host_spec. */
1169    if (nr_sep >= 2) {
1170        host_spec = p;
1171        p = strchr(p, ':');
1172        p++;
1173        p = strchr(p, ':');
1174        *p++ = '\0';
1175    } else {
1176        host_spec = "";
1177    }
1178
1179    vdi_spec = p;
1180
1181    p = strchr(vdi_spec, ':');
1182    if (p) {
1183        *p++ = '#';
1184    }
1185
1186    uri = g_strdup_printf("sheepdog://%s/%s", host_spec, vdi_spec);
1187
1188    /*
1189     * FIXME We to escape URI meta-characters, e.g. "x?y=z"
1190     * produces "sheepdog://x?y=z".  Because of that ...
1191     */
1192    sd_parse_uri(cfg, uri, &err);
1193    if (err) {
1194        /*
1195         * ... this can fail, but the error message is misleading.
1196         * Replace it by the traditional useless one until the
1197         * escaping is fixed.
1198         */
1199        error_free(err);
1200        error_setg(errp, "Can't parse filename");
1201    }
1202
1203    g_free(q);
1204    g_free(uri);
1205}
1206
1207static void sd_parse_filename(const char *filename, QDict *options,
1208                              Error **errp)
1209{
1210    Error *err = NULL;
1211    SheepdogConfig cfg;
1212    char buf[32];
1213
1214    if (strstr(filename, "://")) {
1215        sd_parse_uri(&cfg, filename, &err);
1216    } else {
1217        parse_vdiname(&cfg, filename, &err);
1218    }
1219    if (err) {
1220        error_propagate(errp, err);
1221        return;
1222    }
1223
1224    if (cfg.path) {
1225        qdict_set_default_str(options, "server.path", cfg.path);
1226        qdict_set_default_str(options, "server.type", "unix");
1227    } else {
1228        qdict_set_default_str(options, "server.type", "inet");
1229        qdict_set_default_str(options, "server.host",
1230                              cfg.host ?: SD_DEFAULT_ADDR);
1231        snprintf(buf, sizeof(buf), "%d", cfg.port ?: SD_DEFAULT_PORT);
1232        qdict_set_default_str(options, "server.port", buf);
1233    }
1234    qdict_set_default_str(options, "vdi", cfg.vdi);
1235    qdict_set_default_str(options, "tag", cfg.tag);
1236    if (cfg.snap_id) {
1237        snprintf(buf, sizeof(buf), "%d", cfg.snap_id);
1238        qdict_set_default_str(options, "snap-id", buf);
1239    }
1240
1241    sd_config_done(&cfg);
1242}
1243
1244static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
1245                         uint32_t snapid, const char *tag, uint32_t *vid,
1246                         bool lock, Error **errp)
1247{
1248    int ret, fd;
1249    SheepdogVdiReq hdr;
1250    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1251    unsigned int wlen, rlen = 0;
1252    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
1253
1254    fd = connect_to_sdog(s, errp);
1255    if (fd < 0) {
1256        return fd;
1257    }
1258
1259    /* This pair of strncpy calls ensures that the buffer is zero-filled,
1260     * which is desirable since we'll soon be sending those bytes, and
1261     * don't want the send_req to read uninitialized data.
1262     */
1263    strncpy(buf, filename, SD_MAX_VDI_LEN);
1264    strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
1265
1266    memset(&hdr, 0, sizeof(hdr));
1267    if (lock) {
1268        hdr.opcode = SD_OP_LOCK_VDI;
1269        hdr.type = LOCK_TYPE_NORMAL;
1270    } else {
1271        hdr.opcode = SD_OP_GET_VDI_INFO;
1272    }
1273    wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
1274    hdr.proto_ver = SD_PROTO_VER;
1275    hdr.data_length = wlen;
1276    hdr.snapid = snapid;
1277    hdr.flags = SD_FLAG_CMD_WRITE;
1278
1279    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1280    if (ret) {
1281        error_setg_errno(errp, -ret, "cannot get vdi info");
1282        goto out;
1283    }
1284
1285    if (rsp->result != SD_RES_SUCCESS) {
1286        error_setg(errp, "cannot get vdi info, %s, %s %" PRIu32 " %s",
1287                   sd_strerror(rsp->result), filename, snapid, tag);
1288        if (rsp->result == SD_RES_NO_VDI) {
1289            ret = -ENOENT;
1290        } else if (rsp->result == SD_RES_VDI_LOCKED) {
1291            ret = -EBUSY;
1292        } else {
1293            ret = -EIO;
1294        }
1295        goto out;
1296    }
1297    *vid = rsp->vdi_id;
1298
1299    ret = 0;
1300out:
1301    closesocket(fd);
1302    return ret;
1303}
1304
1305static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
1306                                         struct iovec *iov, int niov,
1307                                         enum AIOCBState aiocb_type)
1308{
1309    int nr_copies = s->inode.nr_copies;
1310    SheepdogObjReq hdr;
1311    unsigned int wlen = 0;
1312    int ret;
1313    uint64_t oid = aio_req->oid;
1314    unsigned int datalen = aio_req->data_len;
1315    uint64_t offset = aio_req->offset;
1316    uint8_t flags = aio_req->flags;
1317    uint64_t old_oid = aio_req->base_oid;
1318    bool create = aio_req->create;
1319
1320    qemu_co_mutex_lock(&s->queue_lock);
1321    QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1322    qemu_co_mutex_unlock(&s->queue_lock);
1323
1324    if (!nr_copies) {
1325        error_report("bug");
1326    }
1327
1328    memset(&hdr, 0, sizeof(hdr));
1329
1330    switch (aiocb_type) {
1331    case AIOCB_FLUSH_CACHE:
1332        hdr.opcode = SD_OP_FLUSH_VDI;
1333        break;
1334    case AIOCB_READ_UDATA:
1335        hdr.opcode = SD_OP_READ_OBJ;
1336        hdr.flags = flags;
1337        break;
1338    case AIOCB_WRITE_UDATA:
1339        if (create) {
1340            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1341        } else {
1342            hdr.opcode = SD_OP_WRITE_OBJ;
1343        }
1344        wlen = datalen;
1345        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1346        break;
1347    case AIOCB_DISCARD_OBJ:
1348        hdr.opcode = SD_OP_WRITE_OBJ;
1349        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1350        s->inode.data_vdi_id[data_oid_to_idx(oid)] = 0;
1351        offset = offsetof(SheepdogInode,
1352                          data_vdi_id[data_oid_to_idx(oid)]);
1353        oid = vid_to_vdi_oid(s->inode.vdi_id);
1354        wlen = datalen = sizeof(uint32_t);
1355        break;
1356    }
1357
1358    if (s->cache_flags) {
1359        hdr.flags |= s->cache_flags;
1360    }
1361
1362    hdr.oid = oid;
1363    hdr.cow_oid = old_oid;
1364    hdr.copies = s->inode.nr_copies;
1365
1366    hdr.data_length = datalen;
1367    hdr.offset = offset;
1368
1369    hdr.id = aio_req->id;
1370
1371    qemu_co_mutex_lock(&s->lock);
1372    s->co_send = qemu_coroutine_self();
1373    aio_set_fd_handler(s->aio_context, s->fd, false,
1374                       co_read_response, co_write_request, NULL, s);
1375    socket_set_cork(s->fd, 1);
1376
1377    /* send a header */
1378    ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
1379    if (ret != sizeof(hdr)) {
1380        error_report("failed to send a req, %s", strerror(errno));
1381        goto out;
1382    }
1383
1384    if (wlen) {
1385        ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
1386        if (ret != wlen) {
1387            error_report("failed to send a data, %s", strerror(errno));
1388        }
1389    }
1390out:
1391    socket_set_cork(s->fd, 0);
1392    aio_set_fd_handler(s->aio_context, s->fd, false,
1393                       co_read_response, NULL, NULL, s);
1394    s->co_send = NULL;
1395    qemu_co_mutex_unlock(&s->lock);
1396}
1397
1398static int read_write_object(int fd, BlockDriverState *bs, char *buf,
1399                             uint64_t oid, uint8_t copies,
1400                             unsigned int datalen, uint64_t offset,
1401                             bool write, bool create, uint32_t cache_flags)
1402{
1403    SheepdogObjReq hdr;
1404    SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1405    unsigned int wlen, rlen;
1406    int ret;
1407
1408    memset(&hdr, 0, sizeof(hdr));
1409
1410    if (write) {
1411        wlen = datalen;
1412        rlen = 0;
1413        hdr.flags = SD_FLAG_CMD_WRITE;
1414        if (create) {
1415            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1416        } else {
1417            hdr.opcode = SD_OP_WRITE_OBJ;
1418        }
1419    } else {
1420        wlen = 0;
1421        rlen = datalen;
1422        hdr.opcode = SD_OP_READ_OBJ;
1423    }
1424
1425    hdr.flags |= cache_flags;
1426
1427    hdr.oid = oid;
1428    hdr.data_length = datalen;
1429    hdr.offset = offset;
1430    hdr.copies = copies;
1431
1432    ret = do_req(fd, bs, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1433    if (ret) {
1434        error_report("failed to send a request to the sheep");
1435        return ret;
1436    }
1437
1438    switch (rsp->result) {
1439    case SD_RES_SUCCESS:
1440        return 0;
1441    default:
1442        error_report("%s", sd_strerror(rsp->result));
1443        return -EIO;
1444    }
1445}
1446
1447static int read_object(int fd, BlockDriverState *bs, char *buf,
1448                       uint64_t oid, uint8_t copies,
1449                       unsigned int datalen, uint64_t offset,
1450                       uint32_t cache_flags)
1451{
1452    return read_write_object(fd, bs, buf, oid, copies,
1453                             datalen, offset, false,
1454                             false, cache_flags);
1455}
1456
1457static int write_object(int fd, BlockDriverState *bs, char *buf,
1458                        uint64_t oid, uint8_t copies,
1459                        unsigned int datalen, uint64_t offset, bool create,
1460                        uint32_t cache_flags)
1461{
1462    return read_write_object(fd, bs, buf, oid, copies,
1463                             datalen, offset, true,
1464                             create, cache_flags);
1465}
1466
1467/* update inode with the latest state */
1468static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
1469{
1470    Error *local_err = NULL;
1471    SheepdogInode *inode;
1472    int ret = 0, fd;
1473    uint32_t vid = 0;
1474
1475    fd = connect_to_sdog(s, &local_err);
1476    if (fd < 0) {
1477        error_report_err(local_err);
1478        return -EIO;
1479    }
1480
1481    inode = g_malloc(SD_INODE_HEADER_SIZE);
1482
1483    ret = find_vdi_name(s, s->name, snapid, tag, &vid, false, &local_err);
1484    if (ret) {
1485        error_report_err(local_err);
1486        goto out;
1487    }
1488
1489    ret = read_object(fd, s->bs, (char *)inode, vid_to_vdi_oid(vid),
1490                      s->inode.nr_copies, SD_INODE_HEADER_SIZE, 0,
1491                      s->cache_flags);
1492    if (ret < 0) {
1493        goto out;
1494    }
1495
1496    if (inode->vdi_id != s->inode.vdi_id) {
1497        memcpy(&s->inode, inode, SD_INODE_HEADER_SIZE);
1498    }
1499
1500out:
1501    g_free(inode);
1502    closesocket(fd);
1503
1504    return ret;
1505}
1506
1507static void coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
1508{
1509    SheepdogAIOCB *acb = aio_req->aiocb;
1510
1511    aio_req->create = false;
1512
1513    /* check whether this request becomes a CoW one */
1514    if (acb->aiocb_type == AIOCB_WRITE_UDATA && is_data_obj(aio_req->oid)) {
1515        int idx = data_oid_to_idx(aio_req->oid);
1516
1517        if (is_data_obj_writable(&s->inode, idx)) {
1518            goto out;
1519        }
1520
1521        if (s->inode.data_vdi_id[idx]) {
1522            aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
1523            aio_req->flags |= SD_FLAG_CMD_COW;
1524        }
1525        aio_req->create = true;
1526    }
1527out:
1528    if (is_data_obj(aio_req->oid)) {
1529        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1530                        acb->aiocb_type);
1531    } else {
1532        struct iovec iov;
1533        iov.iov_base = &s->inode;
1534        iov.iov_len = sizeof(s->inode);
1535        add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
1536    }
1537}
1538
1539static void sd_detach_aio_context(BlockDriverState *bs)
1540{
1541    BDRVSheepdogState *s = bs->opaque;
1542
1543    aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
1544                       NULL, NULL, NULL);
1545}
1546
1547static void sd_attach_aio_context(BlockDriverState *bs,
1548                                  AioContext *new_context)
1549{
1550    BDRVSheepdogState *s = bs->opaque;
1551
1552    s->aio_context = new_context;
1553    aio_set_fd_handler(new_context, s->fd, false,
1554                       co_read_response, NULL, NULL, s);
1555}
1556
1557static QemuOptsList runtime_opts = {
1558    .name = "sheepdog",
1559    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
1560    .desc = {
1561        {
1562            .name = "vdi",
1563            .type = QEMU_OPT_STRING,
1564        },
1565        {
1566            .name = "snap-id",
1567            .type = QEMU_OPT_NUMBER,
1568        },
1569        {
1570            .name = "tag",
1571            .type = QEMU_OPT_STRING,
1572        },
1573        { /* end of list */ }
1574    },
1575};
1576
1577static int sd_open(BlockDriverState *bs, QDict *options, int flags,
1578                   Error **errp)
1579{
1580    int ret, fd;
1581    uint32_t vid = 0;
1582    BDRVSheepdogState *s = bs->opaque;
1583    const char *vdi, *snap_id_str, *tag;
1584    uint64_t snap_id;
1585    char *buf = NULL;
1586    QemuOpts *opts;
1587    Error *local_err = NULL;
1588
1589    s->bs = bs;
1590    s->aio_context = bdrv_get_aio_context(bs);
1591
1592    opts = qemu_opts_create(&runtime_opts, NULL, 0, &error_abort);
1593    qemu_opts_absorb_qdict(opts, options, &local_err);
1594    if (local_err) {
1595        error_propagate(errp, local_err);
1596        ret = -EINVAL;
1597        goto err_no_fd;
1598    }
1599
1600    s->addr = sd_server_config(options, errp);
1601    if (!s->addr) {
1602        ret = -EINVAL;
1603        goto err_no_fd;
1604    }
1605
1606    vdi = qemu_opt_get(opts, "vdi");
1607    snap_id_str = qemu_opt_get(opts, "snap-id");
1608    snap_id = qemu_opt_get_number(opts, "snap-id", CURRENT_VDI_ID);
1609    tag = qemu_opt_get(opts, "tag");
1610
1611    if (!vdi) {
1612        error_setg(errp, "parameter 'vdi' is missing");
1613        ret = -EINVAL;
1614        goto err_no_fd;
1615    }
1616    if (strlen(vdi) >= SD_MAX_VDI_LEN) {
1617        error_setg(errp, "value of parameter 'vdi' is too long");
1618        ret = -EINVAL;
1619        goto err_no_fd;
1620    }
1621
1622    if (snap_id > UINT32_MAX) {
1623        snap_id = 0;
1624    }
1625    if (snap_id_str && !snap_id) {
1626        error_setg(errp, "'snap-id=%s' is not a valid snapshot ID",
1627                   snap_id_str);
1628        ret = -EINVAL;
1629        goto err_no_fd;
1630    }
1631
1632    if (!tag) {
1633        tag = "";
1634    }
1635    if (tag && strlen(tag) >= SD_MAX_VDI_TAG_LEN) {
1636        error_setg(errp, "value of parameter 'tag' is too long");
1637        ret = -EINVAL;
1638        goto err_no_fd;
1639    }
1640
1641    QLIST_INIT(&s->inflight_aio_head);
1642    QLIST_INIT(&s->failed_aio_head);
1643    QLIST_INIT(&s->inflight_aiocb_head);
1644
1645    s->fd = get_sheep_fd(s, errp);
1646    if (s->fd < 0) {
1647        ret = s->fd;
1648        goto err_no_fd;
1649    }
1650
1651    ret = find_vdi_name(s, vdi, (uint32_t)snap_id, tag, &vid, true, errp);
1652    if (ret) {
1653        goto err;
1654    }
1655
1656    /*
1657     * QEMU block layer emulates writethrough cache as 'writeback + flush', so
1658     * we always set SD_FLAG_CMD_CACHE (writeback cache) as default.
1659     */
1660    s->cache_flags = SD_FLAG_CMD_CACHE;
1661    if (flags & BDRV_O_NOCACHE) {
1662        s->cache_flags = SD_FLAG_CMD_DIRECT;
1663    }
1664    s->discard_supported = true;
1665
1666    if (snap_id || tag[0]) {
1667        DPRINTF("%" PRIx32 " snapshot inode was open.\n", vid);
1668        s->is_snapshot = true;
1669    }
1670
1671    fd = connect_to_sdog(s, errp);
1672    if (fd < 0) {
1673        ret = fd;
1674        goto err;
1675    }
1676
1677    buf = g_malloc(SD_INODE_SIZE);
1678    ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
1679                      0, SD_INODE_SIZE, 0, s->cache_flags);
1680
1681    closesocket(fd);
1682
1683    if (ret) {
1684        error_setg(errp, "Can't read snapshot inode");
1685        goto err;
1686    }
1687
1688    memcpy(&s->inode, buf, sizeof(s->inode));
1689
1690    bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
1691    pstrcpy(s->name, sizeof(s->name), vdi);
1692    qemu_co_mutex_init(&s->lock);
1693    qemu_co_mutex_init(&s->queue_lock);
1694    qemu_co_queue_init(&s->overlapping_queue);
1695    qemu_opts_del(opts);
1696    g_free(buf);
1697    return 0;
1698
1699err:
1700    aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
1701                       false, NULL, NULL, NULL, NULL);
1702    closesocket(s->fd);
1703err_no_fd:
1704    qemu_opts_del(opts);
1705    g_free(buf);
1706    return ret;
1707}
1708
1709static int sd_reopen_prepare(BDRVReopenState *state, BlockReopenQueue *queue,
1710                             Error **errp)
1711{
1712    BDRVSheepdogState *s = state->bs->opaque;
1713    BDRVSheepdogReopenState *re_s;
1714    int ret = 0;
1715
1716    re_s = state->opaque = g_new0(BDRVSheepdogReopenState, 1);
1717
1718    re_s->cache_flags = SD_FLAG_CMD_CACHE;
1719    if (state->flags & BDRV_O_NOCACHE) {
1720        re_s->cache_flags = SD_FLAG_CMD_DIRECT;
1721    }
1722
1723    re_s->fd = get_sheep_fd(s, errp);
1724    if (re_s->fd < 0) {
1725        ret = re_s->fd;
1726        return ret;
1727    }
1728
1729    return ret;
1730}
1731
1732static void sd_reopen_commit(BDRVReopenState *state)
1733{
1734    BDRVSheepdogReopenState *re_s = state->opaque;
1735    BDRVSheepdogState *s = state->bs->opaque;
1736
1737    if (s->fd) {
1738        aio_set_fd_handler(s->aio_context, s->fd, false,
1739                           NULL, NULL, NULL, NULL);
1740        closesocket(s->fd);
1741    }
1742
1743    s->fd = re_s->fd;
1744    s->cache_flags = re_s->cache_flags;
1745
1746    g_free(state->opaque);
1747    state->opaque = NULL;
1748
1749    return;
1750}
1751
1752static void sd_reopen_abort(BDRVReopenState *state)
1753{
1754    BDRVSheepdogReopenState *re_s = state->opaque;
1755    BDRVSheepdogState *s = state->bs->opaque;
1756
1757    if (re_s == NULL) {
1758        return;
1759    }
1760
1761    if (re_s->fd) {
1762        aio_set_fd_handler(s->aio_context, re_s->fd, false,
1763                           NULL, NULL, NULL, NULL);
1764        closesocket(re_s->fd);
1765    }
1766
1767    g_free(state->opaque);
1768    state->opaque = NULL;
1769
1770    return;
1771}
1772
1773static int do_sd_create(BDRVSheepdogState *s, uint32_t *vdi_id, int snapshot,
1774                        Error **errp)
1775{
1776    SheepdogVdiReq hdr;
1777    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1778    int fd, ret;
1779    unsigned int wlen, rlen = 0;
1780    char buf[SD_MAX_VDI_LEN];
1781
1782    fd = connect_to_sdog(s, errp);
1783    if (fd < 0) {
1784        return fd;
1785    }
1786
1787    /* FIXME: would it be better to fail (e.g., return -EIO) when filename
1788     * does not fit in buf?  For now, just truncate and avoid buffer overrun.
1789     */
1790    memset(buf, 0, sizeof(buf));
1791    pstrcpy(buf, sizeof(buf), s->name);
1792
1793    memset(&hdr, 0, sizeof(hdr));
1794    hdr.opcode = SD_OP_NEW_VDI;
1795    hdr.base_vdi_id = s->inode.vdi_id;
1796
1797    wlen = SD_MAX_VDI_LEN;
1798
1799    hdr.flags = SD_FLAG_CMD_WRITE;
1800    hdr.snapid = snapshot;
1801
1802    hdr.data_length = wlen;
1803    hdr.vdi_size = s->inode.vdi_size;
1804    hdr.copy_policy = s->inode.copy_policy;
1805    hdr.copies = s->inode.nr_copies;
1806    hdr.block_size_shift = s->inode.block_size_shift;
1807
1808    ret = do_req(fd, NULL, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1809
1810    closesocket(fd);
1811
1812    if (ret) {
1813        error_setg_errno(errp, -ret, "create failed");
1814        return ret;
1815    }
1816
1817    if (rsp->result != SD_RES_SUCCESS) {
1818        error_setg(errp, "%s, %s", sd_strerror(rsp->result), s->inode.name);
1819        return -EIO;
1820    }
1821
1822    if (vdi_id) {
1823        *vdi_id = rsp->vdi_id;
1824    }
1825
1826    return 0;
1827}
1828
1829static int sd_prealloc(const char *filename, Error **errp)
1830{
1831    BlockBackend *blk = NULL;
1832    BDRVSheepdogState *base = NULL;
1833    unsigned long buf_size;
1834    uint32_t idx, max_idx;
1835    uint32_t object_size;
1836    int64_t vdi_size;
1837    void *buf = NULL;
1838    int ret;
1839
1840    blk = blk_new_open(filename, NULL, NULL,
1841                       BDRV_O_RDWR | BDRV_O_RESIZE | BDRV_O_PROTOCOL, errp);
1842    if (blk == NULL) {
1843        ret = -EIO;
1844        goto out_with_err_set;
1845    }
1846
1847    blk_set_allow_write_beyond_eof(blk, true);
1848
1849    vdi_size = blk_getlength(blk);
1850    if (vdi_size < 0) {
1851        ret = vdi_size;
1852        goto out;
1853    }
1854
1855    base = blk_bs(blk)->opaque;
1856    object_size = (UINT32_C(1) << base->inode.block_size_shift);
1857    buf_size = MIN(object_size, SD_DATA_OBJ_SIZE);
1858    buf = g_malloc0(buf_size);
1859
1860    max_idx = DIV_ROUND_UP(vdi_size, buf_size);
1861
1862    for (idx = 0; idx < max_idx; idx++) {
1863        /*
1864         * The created image can be a cloned image, so we need to read
1865         * a data from the source image.
1866         */
1867        ret = blk_pread(blk, idx * buf_size, buf, buf_size);
1868        if (ret < 0) {
1869            goto out;
1870        }
1871        ret = blk_pwrite(blk, idx * buf_size, buf, buf_size, 0);
1872        if (ret < 0) {
1873            goto out;
1874        }
1875    }
1876
1877    ret = 0;
1878out:
1879    if (ret < 0) {
1880        error_setg_errno(errp, -ret, "Can't pre-allocate");
1881    }
1882out_with_err_set:
1883    if (blk) {
1884        blk_unref(blk);
1885    }
1886    g_free(buf);
1887
1888    return ret;
1889}
1890
1891/*
1892 * Sheepdog support two kinds of redundancy, full replication and erasure
1893 * coding.
1894 *
1895 * # create a fully replicated vdi with x copies
1896 * -o redundancy=x (1 <= x <= SD_MAX_COPIES)
1897 *
1898 * # create a erasure coded vdi with x data strips and y parity strips
1899 * -o redundancy=x:y (x must be one of {2,4,8,16} and 1 <= y < SD_EC_MAX_STRIP)
1900 */
1901static int parse_redundancy(BDRVSheepdogState *s, const char *opt)
1902{
1903    struct SheepdogInode *inode = &s->inode;
1904    const char *n1, *n2;
1905    long copy, parity;
1906    char p[10];
1907
1908    pstrcpy(p, sizeof(p), opt);
1909    n1 = strtok(p, ":");
1910    n2 = strtok(NULL, ":");
1911
1912    if (!n1) {
1913        return -EINVAL;
1914    }
1915
1916    copy = strtol(n1, NULL, 10);
1917    /* FIXME fix error checking by switching to qemu_strtol() */
1918    if (copy > SD_MAX_COPIES || copy < 1) {
1919        return -EINVAL;
1920    }
1921    if (!n2) {
1922        inode->copy_policy = 0;
1923        inode->nr_copies = copy;
1924        return 0;
1925    }
1926
1927    if (copy != 2 && copy != 4 && copy != 8 && copy != 16) {
1928        return -EINVAL;
1929    }
1930
1931    parity = strtol(n2, NULL, 10);
1932    /* FIXME fix error checking by switching to qemu_strtol() */
1933    if (parity >= SD_EC_MAX_STRIP || parity < 1) {
1934        return -EINVAL;
1935    }
1936
1937    /*
1938     * 4 bits for parity and 4 bits for data.
1939     * We have to compress upper data bits because it can't represent 16
1940     */
1941    inode->copy_policy = ((copy / 2) << 4) + parity;
1942    inode->nr_copies = copy + parity;
1943
1944    return 0;
1945}
1946
1947static int parse_block_size_shift(BDRVSheepdogState *s, QemuOpts *opt)
1948{
1949    struct SheepdogInode *inode = &s->inode;
1950    uint64_t object_size;
1951    int obj_order;
1952
1953    object_size = qemu_opt_get_size_del(opt, BLOCK_OPT_OBJECT_SIZE, 0);
1954    if (object_size) {
1955        if ((object_size - 1) & object_size) {    /* not a power of 2? */
1956            return -EINVAL;
1957        }
1958        obj_order = ctz32(object_size);
1959        if (obj_order < 20 || obj_order > 31) {
1960            return -EINVAL;
1961        }
1962        inode->block_size_shift = (uint8_t)obj_order;
1963    }
1964
1965    return 0;
1966}
1967
1968static int sd_create(const char *filename, QemuOpts *opts,
1969                     Error **errp)
1970{
1971    Error *err = NULL;
1972    int ret = 0;
1973    uint32_t vid = 0;
1974    char *backing_file = NULL;
1975    char *buf = NULL;
1976    BDRVSheepdogState *s;
1977    SheepdogConfig cfg;
1978    uint64_t max_vdi_size;
1979    bool prealloc = false;
1980
1981    s = g_new0(BDRVSheepdogState, 1);
1982
1983    if (strstr(filename, "://")) {
1984        sd_parse_uri(&cfg, filename, &err);
1985    } else {
1986        parse_vdiname(&cfg, filename, &err);
1987    }
1988    if (err) {
1989        error_propagate(errp, err);
1990        goto out;
1991    }
1992
1993    buf = cfg.port ? g_strdup_printf("%d", cfg.port) : NULL;
1994    s->addr = sd_socket_address(cfg.path, cfg.host, buf);
1995    g_free(buf);
1996    strcpy(s->name, cfg.vdi);
1997    sd_config_done(&cfg);
1998
1999    s->inode.vdi_size = ROUND_UP(qemu_opt_get_size_del(opts, BLOCK_OPT_SIZE, 0),
2000                                 BDRV_SECTOR_SIZE);
2001    backing_file = qemu_opt_get_del(opts, BLOCK_OPT_BACKING_FILE);
2002    buf = qemu_opt_get_del(opts, BLOCK_OPT_PREALLOC);
2003    if (!buf || !strcmp(buf, "off")) {
2004        prealloc = false;
2005    } else if (!strcmp(buf, "full")) {
2006        prealloc = true;
2007    } else {
2008        error_setg(errp, "Invalid preallocation mode: '%s'", buf);
2009        ret = -EINVAL;
2010        goto out;
2011    }
2012
2013    g_free(buf);
2014    buf = qemu_opt_get_del(opts, BLOCK_OPT_REDUNDANCY);
2015    if (buf) {
2016        ret = parse_redundancy(s, buf);
2017        if (ret < 0) {
2018            error_setg(errp, "Invalid redundancy mode: '%s'", buf);
2019            goto out;
2020        }
2021    }
2022    ret = parse_block_size_shift(s, opts);
2023    if (ret < 0) {
2024        error_setg(errp, "Invalid object_size."
2025                         " obect_size needs to be power of 2"
2026                         " and be limited from 2^20 to 2^31");
2027        goto out;
2028    }
2029
2030    if (backing_file) {
2031        BlockBackend *blk;
2032        BDRVSheepdogState *base;
2033        BlockDriver *drv;
2034
2035        /* Currently, only Sheepdog backing image is supported. */
2036        drv = bdrv_find_protocol(backing_file, true, NULL);
2037        if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
2038            error_setg(errp, "backing_file must be a sheepdog image");
2039            ret = -EINVAL;
2040            goto out;
2041        }
2042
2043        blk = blk_new_open(backing_file, NULL, NULL,
2044                           BDRV_O_PROTOCOL, errp);
2045        if (blk == NULL) {
2046            ret = -EIO;
2047            goto out;
2048        }
2049
2050        base = blk_bs(blk)->opaque;
2051
2052        if (!is_snapshot(&base->inode)) {
2053            error_setg(errp, "cannot clone from a non snapshot vdi");
2054            blk_unref(blk);
2055            ret = -EINVAL;
2056            goto out;
2057        }
2058        s->inode.vdi_id = base->inode.vdi_id;
2059        blk_unref(blk);
2060    }
2061
2062    s->aio_context = qemu_get_aio_context();
2063
2064    /* if block_size_shift is not specified, get cluster default value */
2065    if (s->inode.block_size_shift == 0) {
2066        SheepdogVdiReq hdr;
2067        SheepdogClusterRsp *rsp = (SheepdogClusterRsp *)&hdr;
2068        int fd;
2069        unsigned int wlen = 0, rlen = 0;
2070
2071        fd = connect_to_sdog(s, errp);
2072        if (fd < 0) {
2073            ret = fd;
2074            goto out;
2075        }
2076
2077        memset(&hdr, 0, sizeof(hdr));
2078        hdr.opcode = SD_OP_GET_CLUSTER_DEFAULT;
2079        hdr.proto_ver = SD_PROTO_VER;
2080
2081        ret = do_req(fd, NULL, (SheepdogReq *)&hdr,
2082                     NULL, &wlen, &rlen);
2083        closesocket(fd);
2084        if (ret) {
2085            error_setg_errno(errp, -ret, "failed to get cluster default");
2086            goto out;
2087        }
2088        if (rsp->result == SD_RES_SUCCESS) {
2089            s->inode.block_size_shift = rsp->block_size_shift;
2090        } else {
2091            s->inode.block_size_shift = SD_DEFAULT_BLOCK_SIZE_SHIFT;
2092        }
2093    }
2094
2095    max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
2096
2097    if (s->inode.vdi_size > max_vdi_size) {
2098        error_setg(errp, "An image is too large."
2099                         " The maximum image size is %"PRIu64 "GB",
2100                         max_vdi_size / 1024 / 1024 / 1024);
2101        ret = -EINVAL;
2102        goto out;
2103    }
2104
2105    ret = do_sd_create(s, &vid, 0, errp);
2106    if (ret) {
2107        goto out;
2108    }
2109
2110    if (prealloc) {
2111        ret = sd_prealloc(filename, errp);
2112    }
2113out:
2114    g_free(backing_file);
2115    g_free(buf);
2116    g_free(s);
2117    return ret;
2118}
2119
2120static void sd_close(BlockDriverState *bs)
2121{
2122    Error *local_err = NULL;
2123    BDRVSheepdogState *s = bs->opaque;
2124    SheepdogVdiReq hdr;
2125    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2126    unsigned int wlen, rlen = 0;
2127    int fd, ret;
2128
2129    DPRINTF("%s\n", s->name);
2130
2131    fd = connect_to_sdog(s, &local_err);
2132    if (fd < 0) {
2133        error_report_err(local_err);
2134        return;
2135    }
2136
2137    memset(&hdr, 0, sizeof(hdr));
2138
2139    hdr.opcode = SD_OP_RELEASE_VDI;
2140    hdr.type = LOCK_TYPE_NORMAL;
2141    hdr.base_vdi_id = s->inode.vdi_id;
2142    wlen = strlen(s->name) + 1;
2143    hdr.data_length = wlen;
2144    hdr.flags = SD_FLAG_CMD_WRITE;
2145
2146    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2147                 s->name, &wlen, &rlen);
2148
2149    closesocket(fd);
2150
2151    if (!ret && rsp->result != SD_RES_SUCCESS &&
2152        rsp->result != SD_RES_VDI_NOT_LOCKED) {
2153        error_report("%s, %s", sd_strerror(rsp->result), s->name);
2154    }
2155
2156    aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
2157                       false, NULL, NULL, NULL, NULL);
2158    closesocket(s->fd);
2159    qapi_free_SocketAddress(s->addr);
2160}
2161
2162static int64_t sd_getlength(BlockDriverState *bs)
2163{
2164    BDRVSheepdogState *s = bs->opaque;
2165
2166    return s->inode.vdi_size;
2167}
2168
2169static int sd_truncate(BlockDriverState *bs, int64_t offset,
2170                       PreallocMode prealloc, Error **errp)
2171{
2172    BDRVSheepdogState *s = bs->opaque;
2173    int ret, fd;
2174    unsigned int datalen;
2175    uint64_t max_vdi_size;
2176
2177    if (prealloc != PREALLOC_MODE_OFF) {
2178        error_setg(errp, "Unsupported preallocation mode '%s'",
2179                   PreallocMode_str(prealloc));
2180        return -ENOTSUP;
2181    }
2182
2183    max_vdi_size = (UINT64_C(1) << s->inode.block_size_shift) * MAX_DATA_OBJS;
2184    if (offset < s->inode.vdi_size) {
2185        error_setg(errp, "shrinking is not supported");
2186        return -EINVAL;
2187    } else if (offset > max_vdi_size) {
2188        error_setg(errp, "too big image size");
2189        return -EINVAL;
2190    }
2191
2192    fd = connect_to_sdog(s, errp);
2193    if (fd < 0) {
2194        return fd;
2195    }
2196
2197    /* we don't need to update entire object */
2198    datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
2199    s->inode.vdi_size = offset;
2200    ret = write_object(fd, s->bs, (char *)&s->inode,
2201                       vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2202                       datalen, 0, false, s->cache_flags);
2203    close(fd);
2204
2205    if (ret < 0) {
2206        error_setg_errno(errp, -ret, "failed to update an inode");
2207    }
2208
2209    return ret;
2210}
2211
2212/*
2213 * This function is called after writing data objects.  If we need to
2214 * update metadata, this sends a write request to the vdi object.
2215 */
2216static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
2217{
2218    BDRVSheepdogState *s = acb->s;
2219    struct iovec iov;
2220    AIOReq *aio_req;
2221    uint32_t offset, data_len, mn, mx;
2222
2223    mn = acb->min_dirty_data_idx;
2224    mx = acb->max_dirty_data_idx;
2225    if (mn <= mx) {
2226        /* we need to update the vdi object. */
2227        ++acb->nr_pending;
2228        offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
2229            mn * sizeof(s->inode.data_vdi_id[0]);
2230        data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
2231
2232        acb->min_dirty_data_idx = UINT32_MAX;
2233        acb->max_dirty_data_idx = 0;
2234
2235        iov.iov_base = &s->inode;
2236        iov.iov_len = sizeof(s->inode);
2237        aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
2238                                data_len, offset, 0, false, 0, offset);
2239        add_aio_request(s, aio_req, &iov, 1, AIOCB_WRITE_UDATA);
2240        if (--acb->nr_pending) {
2241            qemu_coroutine_yield();
2242        }
2243    }
2244}
2245
2246/* Delete current working VDI on the snapshot chain */
2247static bool sd_delete(BDRVSheepdogState *s)
2248{
2249    Error *local_err = NULL;
2250    unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
2251    SheepdogVdiReq hdr = {
2252        .opcode = SD_OP_DEL_VDI,
2253        .base_vdi_id = s->inode.vdi_id,
2254        .data_length = wlen,
2255        .flags = SD_FLAG_CMD_WRITE,
2256    };
2257    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2258    int fd, ret;
2259
2260    fd = connect_to_sdog(s, &local_err);
2261    if (fd < 0) {
2262        error_report_err(local_err);
2263        return false;
2264    }
2265
2266    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2267                 s->name, &wlen, &rlen);
2268    closesocket(fd);
2269    if (ret) {
2270        return false;
2271    }
2272    switch (rsp->result) {
2273    case SD_RES_NO_VDI:
2274        error_report("%s was already deleted", s->name);
2275        /* fall through */
2276    case SD_RES_SUCCESS:
2277        break;
2278    default:
2279        error_report("%s, %s", sd_strerror(rsp->result), s->name);
2280        return false;
2281    }
2282
2283    return true;
2284}
2285
2286/*
2287 * Create a writable VDI from a snapshot
2288 */
2289static int sd_create_branch(BDRVSheepdogState *s)
2290{
2291    Error *local_err = NULL;
2292    int ret, fd;
2293    uint32_t vid;
2294    char *buf;
2295    bool deleted;
2296
2297    DPRINTF("%" PRIx32 " is snapshot.\n", s->inode.vdi_id);
2298
2299    buf = g_malloc(SD_INODE_SIZE);
2300
2301    /*
2302     * Even If deletion fails, we will just create extra snapshot based on
2303     * the working VDI which was supposed to be deleted. So no need to
2304     * false bail out.
2305     */
2306    deleted = sd_delete(s);
2307    ret = do_sd_create(s, &vid, !deleted, &local_err);
2308    if (ret) {
2309        error_report_err(local_err);
2310        goto out;
2311    }
2312
2313    DPRINTF("%" PRIx32 " is created.\n", vid);
2314
2315    fd = connect_to_sdog(s, &local_err);
2316    if (fd < 0) {
2317        error_report_err(local_err);
2318        ret = fd;
2319        goto out;
2320    }
2321
2322    ret = read_object(fd, s->bs, buf, vid_to_vdi_oid(vid),
2323                      s->inode.nr_copies, SD_INODE_SIZE, 0, s->cache_flags);
2324
2325    closesocket(fd);
2326
2327    if (ret < 0) {
2328        goto out;
2329    }
2330
2331    memcpy(&s->inode, buf, sizeof(s->inode));
2332
2333    s->is_snapshot = false;
2334    ret = 0;
2335    DPRINTF("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
2336
2337out:
2338    g_free(buf);
2339
2340    return ret;
2341}
2342
2343/*
2344 * Send I/O requests to the server.
2345 *
2346 * This function sends requests to the server, links the requests to
2347 * the inflight_list in BDRVSheepdogState, and exits without
2348 * waiting the response.  The responses are received in the
2349 * `aio_read_response' function which is called from the main loop as
2350 * a fd handler.
2351 *
2352 * Returns 1 when we need to wait a response, 0 when there is no sent
2353 * request and -errno in error cases.
2354 */
2355static void coroutine_fn sd_co_rw_vector(SheepdogAIOCB *acb)
2356{
2357    int ret = 0;
2358    unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
2359    unsigned long idx;
2360    uint32_t object_size;
2361    uint64_t oid;
2362    uint64_t offset;
2363    BDRVSheepdogState *s = acb->s;
2364    SheepdogInode *inode = &s->inode;
2365    AIOReq *aio_req;
2366
2367    if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
2368        /*
2369         * In the case we open the snapshot VDI, Sheepdog creates the
2370         * writable VDI when we do a write operation first.
2371         */
2372        ret = sd_create_branch(s);
2373        if (ret) {
2374            acb->ret = -EIO;
2375            return;
2376        }
2377    }
2378
2379    object_size = (UINT32_C(1) << inode->block_size_shift);
2380    idx = acb->sector_num * BDRV_SECTOR_SIZE / object_size;
2381    offset = (acb->sector_num * BDRV_SECTOR_SIZE) % object_size;
2382
2383    /*
2384     * Make sure we don't free the aiocb before we are done with all requests.
2385     * This additional reference is dropped at the end of this function.
2386     */
2387    acb->nr_pending++;
2388
2389    while (done != total) {
2390        uint8_t flags = 0;
2391        uint64_t old_oid = 0;
2392        bool create = false;
2393
2394        oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
2395
2396        len = MIN(total - done, object_size - offset);
2397
2398        switch (acb->aiocb_type) {
2399        case AIOCB_READ_UDATA:
2400            if (!inode->data_vdi_id[idx]) {
2401                qemu_iovec_memset(acb->qiov, done, 0, len);
2402                goto done;
2403            }
2404            break;
2405        case AIOCB_WRITE_UDATA:
2406            if (!inode->data_vdi_id[idx]) {
2407                create = true;
2408            } else if (!is_data_obj_writable(inode, idx)) {
2409                /* Copy-On-Write */
2410                create = true;
2411                old_oid = oid;
2412                flags = SD_FLAG_CMD_COW;
2413            }
2414            break;
2415        case AIOCB_DISCARD_OBJ:
2416            /*
2417             * We discard the object only when the whole object is
2418             * 1) allocated 2) trimmed. Otherwise, simply skip it.
2419             */
2420            if (len != object_size || inode->data_vdi_id[idx] == 0) {
2421                goto done;
2422            }
2423            break;
2424        default:
2425            break;
2426        }
2427
2428        if (create) {
2429            DPRINTF("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
2430                    inode->vdi_id, oid,
2431                    vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
2432            oid = vid_to_data_oid(inode->vdi_id, idx);
2433            DPRINTF("new oid %" PRIx64 "\n", oid);
2434        }
2435
2436        aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, create,
2437                                old_oid,
2438                                acb->aiocb_type == AIOCB_DISCARD_OBJ ?
2439                                0 : done);
2440        add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
2441                        acb->aiocb_type);
2442    done:
2443        offset = 0;
2444        idx++;
2445        done += len;
2446    }
2447    if (--acb->nr_pending) {
2448        qemu_coroutine_yield();
2449    }
2450}
2451
2452static void sd_aio_complete(SheepdogAIOCB *acb)
2453{
2454    BDRVSheepdogState *s;
2455    if (acb->aiocb_type == AIOCB_FLUSH_CACHE) {
2456        return;
2457    }
2458
2459    s = acb->s;
2460    qemu_co_mutex_lock(&s->queue_lock);
2461    QLIST_REMOVE(acb, aiocb_siblings);
2462    qemu_co_queue_restart_all(&s->overlapping_queue);
2463    qemu_co_mutex_unlock(&s->queue_lock);
2464}
2465
2466static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
2467                        int nb_sectors, QEMUIOVector *qiov)
2468{
2469    SheepdogAIOCB acb;
2470    int ret;
2471    int64_t offset = (sector_num + nb_sectors) * BDRV_SECTOR_SIZE;
2472    BDRVSheepdogState *s = bs->opaque;
2473
2474    if (offset > s->inode.vdi_size) {
2475        ret = sd_truncate(bs, offset, PREALLOC_MODE_OFF, NULL);
2476        if (ret < 0) {
2477            return ret;
2478        }
2479    }
2480
2481    sd_aio_setup(&acb, s, qiov, sector_num, nb_sectors, AIOCB_WRITE_UDATA);
2482    sd_co_rw_vector(&acb);
2483    sd_write_done(&acb);
2484    sd_aio_complete(&acb);
2485
2486    return acb.ret;
2487}
2488
2489static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
2490                       int nb_sectors, QEMUIOVector *qiov)
2491{
2492    SheepdogAIOCB acb;
2493    BDRVSheepdogState *s = bs->opaque;
2494
2495    sd_aio_setup(&acb, s, qiov, sector_num, nb_sectors, AIOCB_READ_UDATA);
2496    sd_co_rw_vector(&acb);
2497    sd_aio_complete(&acb);
2498
2499    return acb.ret;
2500}
2501
2502static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
2503{
2504    BDRVSheepdogState *s = bs->opaque;
2505    SheepdogAIOCB acb;
2506    AIOReq *aio_req;
2507
2508    if (s->cache_flags != SD_FLAG_CMD_CACHE) {
2509        return 0;
2510    }
2511
2512    sd_aio_setup(&acb, s, NULL, 0, 0, AIOCB_FLUSH_CACHE);
2513
2514    acb.nr_pending++;
2515    aio_req = alloc_aio_req(s, &acb, vid_to_vdi_oid(s->inode.vdi_id),
2516                            0, 0, 0, false, 0, 0);
2517    add_aio_request(s, aio_req, NULL, 0, acb.aiocb_type);
2518
2519    if (--acb.nr_pending) {
2520        qemu_coroutine_yield();
2521    }
2522
2523    sd_aio_complete(&acb);
2524    return acb.ret;
2525}
2526
2527static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
2528{
2529    Error *local_err = NULL;
2530    BDRVSheepdogState *s = bs->opaque;
2531    int ret, fd;
2532    uint32_t new_vid;
2533    SheepdogInode *inode;
2534    unsigned int datalen;
2535
2536    DPRINTF("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
2537            "is_snapshot %d\n", sn_info->name, sn_info->id_str,
2538            s->name, sn_info->vm_state_size, s->is_snapshot);
2539
2540    if (s->is_snapshot) {
2541        error_report("You can't create a snapshot of a snapshot VDI, "
2542                     "%s (%" PRIu32 ").", s->name, s->inode.vdi_id);
2543
2544        return -EINVAL;
2545    }
2546
2547    DPRINTF("%s %s\n", sn_info->name, sn_info->id_str);
2548
2549    s->inode.vm_state_size = sn_info->vm_state_size;
2550    s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
2551    /* It appears that inode.tag does not require a NUL terminator,
2552     * which means this use of strncpy is ok.
2553     */
2554    strncpy(s->inode.tag, sn_info->name, sizeof(s->inode.tag));
2555    /* we don't need to update entire object */
2556    datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
2557    inode = g_malloc(datalen);
2558
2559    /* refresh inode. */
2560    fd = connect_to_sdog(s, &local_err);
2561    if (fd < 0) {
2562        error_report_err(local_err);
2563        ret = fd;
2564        goto cleanup;
2565    }
2566
2567    ret = write_object(fd, s->bs, (char *)&s->inode,
2568                       vid_to_vdi_oid(s->inode.vdi_id), s->inode.nr_copies,
2569                       datalen, 0, false, s->cache_flags);
2570    if (ret < 0) {
2571        error_report("failed to write snapshot's inode.");
2572        goto cleanup;
2573    }
2574
2575    ret = do_sd_create(s, &new_vid, 1, &local_err);
2576    if (ret < 0) {
2577        error_reportf_err(local_err,
2578                          "failed to create inode for snapshot: ");
2579        goto cleanup;
2580    }
2581
2582    ret = read_object(fd, s->bs, (char *)inode,
2583                      vid_to_vdi_oid(new_vid), s->inode.nr_copies, datalen, 0,
2584                      s->cache_flags);
2585
2586    if (ret < 0) {
2587        error_report("failed to read new inode info. %s", strerror(errno));
2588        goto cleanup;
2589    }
2590
2591    memcpy(&s->inode, inode, datalen);
2592    DPRINTF("s->inode: name %s snap_id %x oid %x\n",
2593            s->inode.name, s->inode.snap_id, s->inode.vdi_id);
2594
2595cleanup:
2596    g_free(inode);
2597    closesocket(fd);
2598    return ret;
2599}
2600
2601/*
2602 * We implement rollback(loadvm) operation to the specified snapshot by
2603 * 1) switch to the snapshot
2604 * 2) rely on sd_create_branch to delete working VDI and
2605 * 3) create a new working VDI based on the specified snapshot
2606 */
2607static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
2608{
2609    BDRVSheepdogState *s = bs->opaque;
2610    BDRVSheepdogState *old_s;
2611    char tag[SD_MAX_VDI_TAG_LEN];
2612    uint32_t snapid = 0;
2613    int ret;
2614
2615    if (!sd_parse_snapid_or_tag(snapshot_id, &snapid, tag)) {
2616        return -EINVAL;
2617    }
2618
2619    old_s = g_new(BDRVSheepdogState, 1);
2620
2621    memcpy(old_s, s, sizeof(BDRVSheepdogState));
2622
2623    ret = reload_inode(s, snapid, tag);
2624    if (ret) {
2625        goto out;
2626    }
2627
2628    ret = sd_create_branch(s);
2629    if (ret) {
2630        goto out;
2631    }
2632
2633    g_free(old_s);
2634
2635    return 0;
2636out:
2637    /* recover bdrv_sd_state */
2638    memcpy(s, old_s, sizeof(BDRVSheepdogState));
2639    g_free(old_s);
2640
2641    error_report("failed to open. recover old bdrv_sd_state.");
2642
2643    return ret;
2644}
2645
2646#define NR_BATCHED_DISCARD 128
2647
2648static int remove_objects(BDRVSheepdogState *s, Error **errp)
2649{
2650    int fd, i = 0, nr_objs = 0;
2651    int ret;
2652    SheepdogInode *inode = &s->inode;
2653
2654    fd = connect_to_sdog(s, errp);
2655    if (fd < 0) {
2656        return fd;
2657    }
2658
2659    nr_objs = count_data_objs(inode);
2660    while (i < nr_objs) {
2661        int start_idx, nr_filled_idx;
2662
2663        while (i < nr_objs && !inode->data_vdi_id[i]) {
2664            i++;
2665        }
2666        start_idx = i;
2667
2668        nr_filled_idx = 0;
2669        while (i < nr_objs && nr_filled_idx < NR_BATCHED_DISCARD) {
2670            if (inode->data_vdi_id[i]) {
2671                inode->data_vdi_id[i] = 0;
2672                nr_filled_idx++;
2673            }
2674
2675            i++;
2676        }
2677
2678        ret = write_object(fd, s->bs,
2679                           (char *)&inode->data_vdi_id[start_idx],
2680                           vid_to_vdi_oid(s->inode.vdi_id), inode->nr_copies,
2681                           (i - start_idx) * sizeof(uint32_t),
2682                           offsetof(struct SheepdogInode,
2683                                    data_vdi_id[start_idx]),
2684                           false, s->cache_flags);
2685        if (ret < 0) {
2686            error_setg(errp, "Failed to discard snapshot inode");
2687            goto out;
2688        }
2689    }
2690
2691    ret = 0;
2692out:
2693    closesocket(fd);
2694    return ret;
2695}
2696
2697static int sd_snapshot_delete(BlockDriverState *bs,
2698                              const char *snapshot_id,
2699                              const char *name,
2700                              Error **errp)
2701{
2702    /*
2703     * FIXME should delete the snapshot matching both @snapshot_id and
2704     * @name, but @name not used here
2705     */
2706    unsigned long snap_id = 0;
2707    char snap_tag[SD_MAX_VDI_TAG_LEN];
2708    int fd, ret;
2709    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
2710    BDRVSheepdogState *s = bs->opaque;
2711    unsigned int wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN, rlen = 0;
2712    uint32_t vid;
2713    SheepdogVdiReq hdr = {
2714        .opcode = SD_OP_DEL_VDI,
2715        .data_length = wlen,
2716        .flags = SD_FLAG_CMD_WRITE,
2717    };
2718    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
2719
2720    ret = remove_objects(s, errp);
2721    if (ret) {
2722        return ret;
2723    }
2724
2725    memset(buf, 0, sizeof(buf));
2726    memset(snap_tag, 0, sizeof(snap_tag));
2727    pstrcpy(buf, SD_MAX_VDI_LEN, s->name);
2728    /* TODO Use sd_parse_snapid() once this mess is cleaned up */
2729    ret = qemu_strtoul(snapshot_id, NULL, 10, &snap_id);
2730    if (ret || snap_id > UINT32_MAX) {
2731        /*
2732         * FIXME Since qemu_strtoul() returns -EINVAL when
2733         * @snapshot_id is null, @snapshot_id is mandatory.  Correct
2734         * would be to require at least one of @snapshot_id and @name.
2735         */
2736        error_setg(errp, "Invalid snapshot ID: %s",
2737                         snapshot_id ? snapshot_id : "<null>");
2738        return -EINVAL;
2739    }
2740
2741    if (snap_id) {
2742        hdr.snapid = (uint32_t) snap_id;
2743    } else {
2744        /* FIXME I suspect we should use @name here */
2745        /* FIXME don't truncate silently */
2746        pstrcpy(snap_tag, sizeof(snap_tag), snapshot_id);
2747        pstrcpy(buf + SD_MAX_VDI_LEN, SD_MAX_VDI_TAG_LEN, snap_tag);
2748    }
2749
2750    ret = find_vdi_name(s, s->name, snap_id, snap_tag, &vid, true, errp);
2751    if (ret) {
2752        return ret;
2753    }
2754
2755    fd = connect_to_sdog(s, errp);
2756    if (fd < 0) {
2757        return fd;
2758    }
2759
2760    ret = do_req(fd, s->bs, (SheepdogReq *)&hdr,
2761                 buf, &wlen, &rlen);
2762    closesocket(fd);
2763    if (ret) {
2764        error_setg_errno(errp, -ret, "Couldn't send request to server");
2765        return ret;
2766    }
2767
2768    switch (rsp->result) {
2769    case SD_RES_NO_VDI:
2770        error_setg(errp, "Can't find the snapshot");
2771        return -ENOENT;
2772    case SD_RES_SUCCESS:
2773        break;
2774    default:
2775        error_setg(errp, "%s", sd_strerror(rsp->result));
2776        return -EIO;
2777    }
2778
2779    return 0;
2780}
2781
2782static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
2783{
2784    Error *local_err = NULL;
2785    BDRVSheepdogState *s = bs->opaque;
2786    SheepdogReq req;
2787    int fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
2788    QEMUSnapshotInfo *sn_tab = NULL;
2789    unsigned wlen, rlen;
2790    int found = 0;
2791    static SheepdogInode inode;
2792    unsigned long *vdi_inuse;
2793    unsigned int start_nr;
2794    uint64_t hval;
2795    uint32_t vid;
2796
2797    vdi_inuse = g_malloc(max);
2798
2799    fd = connect_to_sdog(s, &local_err);
2800    if (fd < 0) {
2801        error_report_err(local_err);
2802        ret = fd;
2803        goto out;
2804    }
2805
2806    rlen = max;
2807    wlen = 0;
2808
2809    memset(&req, 0, sizeof(req));
2810
2811    req.opcode = SD_OP_READ_VDIS;
2812    req.data_length = max;
2813
2814    ret = do_req(fd, s->bs, &req, vdi_inuse, &wlen, &rlen);
2815
2816    closesocket(fd);
2817    if (ret) {
2818        goto out;
2819    }
2820
2821    sn_tab = g_new0(QEMUSnapshotInfo, nr);
2822
2823    /* calculate a vdi id with hash function */
2824    hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
2825    start_nr = hval & (SD_NR_VDIS - 1);
2826
2827    fd = connect_to_sdog(s, &local_err);
2828    if (fd < 0) {
2829        error_report_err(local_err);
2830        ret = fd;
2831        goto out;
2832    }
2833
2834    for (vid = start_nr; found < nr; vid = (vid + 1) % SD_NR_VDIS) {
2835        if (!test_bit(vid, vdi_inuse)) {
2836            break;
2837        }
2838
2839        /* we don't need to read entire object */
2840        ret = read_object(fd, s->bs, (char *)&inode,
2841                          vid_to_vdi_oid(vid),
2842                          0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
2843                          s->cache_flags);
2844
2845        if (ret) {
2846            continue;
2847        }
2848
2849        if (!strcmp(inode.name, s->name) && is_snapshot(&inode)) {
2850            sn_tab[found].date_sec = inode.snap_ctime >> 32;
2851            sn_tab[found].date_nsec = inode.snap_ctime & 0xffffffff;
2852            sn_tab[found].vm_state_size = inode.vm_state_size;
2853            sn_tab[found].vm_clock_nsec = inode.vm_clock_nsec;
2854
2855            snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str),
2856                     "%" PRIu32, inode.snap_id);
2857            pstrcpy(sn_tab[found].name,
2858                    MIN(sizeof(sn_tab[found].name), sizeof(inode.tag)),
2859                    inode.tag);
2860            found++;
2861        }
2862    }
2863
2864    closesocket(fd);
2865out:
2866    *psn_tab = sn_tab;
2867
2868    g_free(vdi_inuse);
2869
2870    if (ret < 0) {
2871        return ret;
2872    }
2873
2874    return found;
2875}
2876
2877static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
2878                                int64_t pos, int size, int load)
2879{
2880    Error *local_err = NULL;
2881    bool create;
2882    int fd, ret = 0, remaining = size;
2883    unsigned int data_len;
2884    uint64_t vmstate_oid;
2885    uint64_t offset;
2886    uint32_t vdi_index;
2887    uint32_t vdi_id = load ? s->inode.parent_vdi_id : s->inode.vdi_id;
2888    uint32_t object_size = (UINT32_C(1) << s->inode.block_size_shift);
2889
2890    fd = connect_to_sdog(s, &local_err);
2891    if (fd < 0) {
2892        error_report_err(local_err);
2893        return fd;
2894    }
2895
2896    while (remaining) {
2897        vdi_index = pos / object_size;
2898        offset = pos % object_size;
2899
2900        data_len = MIN(remaining, object_size - offset);
2901
2902        vmstate_oid = vid_to_vmstate_oid(vdi_id, vdi_index);
2903
2904        create = (offset == 0);
2905        if (load) {
2906            ret = read_object(fd, s->bs, (char *)data, vmstate_oid,
2907                              s->inode.nr_copies, data_len, offset,
2908                              s->cache_flags);
2909        } else {
2910            ret = write_object(fd, s->bs, (char *)data, vmstate_oid,
2911                               s->inode.nr_copies, data_len, offset, create,
2912                               s->cache_flags);
2913        }
2914
2915        if (ret < 0) {
2916            error_report("failed to save vmstate %s", strerror(errno));
2917            goto cleanup;
2918        }
2919
2920        pos += data_len;
2921        data += data_len;
2922        remaining -= data_len;
2923    }
2924    ret = size;
2925cleanup:
2926    closesocket(fd);
2927    return ret;
2928}
2929
2930static int sd_save_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
2931                           int64_t pos)
2932{
2933    BDRVSheepdogState *s = bs->opaque;
2934    void *buf;
2935    int ret;
2936
2937    buf = qemu_blockalign(bs, qiov->size);
2938    qemu_iovec_to_buf(qiov, 0, buf, qiov->size);
2939    ret = do_load_save_vmstate(s, (uint8_t *) buf, pos, qiov->size, 0);
2940    qemu_vfree(buf);
2941
2942    return ret;
2943}
2944
2945static int sd_load_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
2946                           int64_t pos)
2947{
2948    BDRVSheepdogState *s = bs->opaque;
2949    void *buf;
2950    int ret;
2951
2952    buf = qemu_blockalign(bs, qiov->size);
2953    ret = do_load_save_vmstate(s, buf, pos, qiov->size, 1);
2954    qemu_iovec_from_buf(qiov, 0, buf, qiov->size);
2955    qemu_vfree(buf);
2956
2957    return ret;
2958}
2959
2960
2961static coroutine_fn int sd_co_pdiscard(BlockDriverState *bs, int64_t offset,
2962                                      int bytes)
2963{
2964    SheepdogAIOCB acb;
2965    BDRVSheepdogState *s = bs->opaque;
2966    QEMUIOVector discard_iov;
2967    struct iovec iov;
2968    uint32_t zero = 0;
2969
2970    if (!s->discard_supported) {
2971        return 0;
2972    }
2973
2974    memset(&discard_iov, 0, sizeof(discard_iov));
2975    memset(&iov, 0, sizeof(iov));
2976    iov.iov_base = &zero;
2977    iov.iov_len = sizeof(zero);
2978    discard_iov.iov = &iov;
2979    discard_iov.niov = 1;
2980    if (!QEMU_IS_ALIGNED(offset | bytes, BDRV_SECTOR_SIZE)) {
2981        return -ENOTSUP;
2982    }
2983    sd_aio_setup(&acb, s, &discard_iov, offset >> BDRV_SECTOR_BITS,
2984                 bytes >> BDRV_SECTOR_BITS, AIOCB_DISCARD_OBJ);
2985    sd_co_rw_vector(&acb);
2986    sd_aio_complete(&acb);
2987
2988    return acb.ret;
2989}
2990
2991static coroutine_fn int64_t
2992sd_co_get_block_status(BlockDriverState *bs, int64_t sector_num, int nb_sectors,
2993                       int *pnum, BlockDriverState **file)
2994{
2995    BDRVSheepdogState *s = bs->opaque;
2996    SheepdogInode *inode = &s->inode;
2997    uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
2998    uint64_t offset = sector_num * BDRV_SECTOR_SIZE;
2999    unsigned long start = offset / object_size,
3000                  end = DIV_ROUND_UP((sector_num + nb_sectors) *
3001                                     BDRV_SECTOR_SIZE, object_size);
3002    unsigned long idx;
3003    int64_t ret = BDRV_BLOCK_DATA | BDRV_BLOCK_OFFSET_VALID | offset;
3004
3005    for (idx = start; idx < end; idx++) {
3006        if (inode->data_vdi_id[idx] == 0) {
3007            break;
3008        }
3009    }
3010    if (idx == start) {
3011        /* Get the longest length of unallocated sectors */
3012        ret = 0;
3013        for (idx = start + 1; idx < end; idx++) {
3014            if (inode->data_vdi_id[idx] != 0) {
3015                break;
3016            }
3017        }
3018    }
3019
3020    *pnum = (idx - start) * object_size / BDRV_SECTOR_SIZE;
3021    if (*pnum > nb_sectors) {
3022        *pnum = nb_sectors;
3023    }
3024    if (ret > 0 && ret & BDRV_BLOCK_OFFSET_VALID) {
3025        *file = bs;
3026    }
3027    return ret;
3028}
3029
3030static int64_t sd_get_allocated_file_size(BlockDriverState *bs)
3031{
3032    BDRVSheepdogState *s = bs->opaque;
3033    SheepdogInode *inode = &s->inode;
3034    uint32_t object_size = (UINT32_C(1) << inode->block_size_shift);
3035    unsigned long i, last = DIV_ROUND_UP(inode->vdi_size, object_size);
3036    uint64_t size = 0;
3037
3038    for (i = 0; i < last; i++) {
3039        if (inode->data_vdi_id[i] == 0) {
3040            continue;
3041        }
3042        size += object_size;
3043    }
3044    return size;
3045}
3046
3047static QemuOptsList sd_create_opts = {
3048    .name = "sheepdog-create-opts",
3049    .head = QTAILQ_HEAD_INITIALIZER(sd_create_opts.head),
3050    .desc = {
3051        {
3052            .name = BLOCK_OPT_SIZE,
3053            .type = QEMU_OPT_SIZE,
3054            .help = "Virtual disk size"
3055        },
3056        {
3057            .name = BLOCK_OPT_BACKING_FILE,
3058            .type = QEMU_OPT_STRING,
3059            .help = "File name of a base image"
3060        },
3061        {
3062            .name = BLOCK_OPT_PREALLOC,
3063            .type = QEMU_OPT_STRING,
3064            .help = "Preallocation mode (allowed values: off, full)"
3065        },
3066        {
3067            .name = BLOCK_OPT_REDUNDANCY,
3068            .type = QEMU_OPT_STRING,
3069            .help = "Redundancy of the image"
3070        },
3071        {
3072            .name = BLOCK_OPT_OBJECT_SIZE,
3073            .type = QEMU_OPT_SIZE,
3074            .help = "Object size of the image"
3075        },
3076        { /* end of list */ }
3077    }
3078};
3079
3080static BlockDriver bdrv_sheepdog = {
3081    .format_name    = "sheepdog",
3082    .protocol_name  = "sheepdog",
3083    .instance_size  = sizeof(BDRVSheepdogState),
3084    .bdrv_parse_filename    = sd_parse_filename,
3085    .bdrv_file_open = sd_open,
3086    .bdrv_reopen_prepare    = sd_reopen_prepare,
3087    .bdrv_reopen_commit     = sd_reopen_commit,
3088    .bdrv_reopen_abort      = sd_reopen_abort,
3089    .bdrv_close     = sd_close,
3090    .bdrv_create    = sd_create,
3091    .bdrv_has_zero_init = bdrv_has_zero_init_1,
3092    .bdrv_getlength = sd_getlength,
3093    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3094    .bdrv_truncate  = sd_truncate,
3095
3096    .bdrv_co_readv  = sd_co_readv,
3097    .bdrv_co_writev = sd_co_writev,
3098    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
3099    .bdrv_co_pdiscard = sd_co_pdiscard,
3100    .bdrv_co_get_block_status = sd_co_get_block_status,
3101
3102    .bdrv_snapshot_create   = sd_snapshot_create,
3103    .bdrv_snapshot_goto     = sd_snapshot_goto,
3104    .bdrv_snapshot_delete   = sd_snapshot_delete,
3105    .bdrv_snapshot_list     = sd_snapshot_list,
3106
3107    .bdrv_save_vmstate  = sd_save_vmstate,
3108    .bdrv_load_vmstate  = sd_load_vmstate,
3109
3110    .bdrv_detach_aio_context = sd_detach_aio_context,
3111    .bdrv_attach_aio_context = sd_attach_aio_context,
3112
3113    .create_opts    = &sd_create_opts,
3114};
3115
3116static BlockDriver bdrv_sheepdog_tcp = {
3117    .format_name    = "sheepdog",
3118    .protocol_name  = "sheepdog+tcp",
3119    .instance_size  = sizeof(BDRVSheepdogState),
3120    .bdrv_parse_filename    = sd_parse_filename,
3121    .bdrv_file_open = sd_open,
3122    .bdrv_reopen_prepare    = sd_reopen_prepare,
3123    .bdrv_reopen_commit     = sd_reopen_commit,
3124    .bdrv_reopen_abort      = sd_reopen_abort,
3125    .bdrv_close     = sd_close,
3126    .bdrv_create    = sd_create,
3127    .bdrv_has_zero_init = bdrv_has_zero_init_1,
3128    .bdrv_getlength = sd_getlength,
3129    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3130    .bdrv_truncate  = sd_truncate,
3131
3132    .bdrv_co_readv  = sd_co_readv,
3133    .bdrv_co_writev = sd_co_writev,
3134    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
3135    .bdrv_co_pdiscard = sd_co_pdiscard,
3136    .bdrv_co_get_block_status = sd_co_get_block_status,
3137
3138    .bdrv_snapshot_create   = sd_snapshot_create,
3139    .bdrv_snapshot_goto     = sd_snapshot_goto,
3140    .bdrv_snapshot_delete   = sd_snapshot_delete,
3141    .bdrv_snapshot_list     = sd_snapshot_list,
3142
3143    .bdrv_save_vmstate  = sd_save_vmstate,
3144    .bdrv_load_vmstate  = sd_load_vmstate,
3145
3146    .bdrv_detach_aio_context = sd_detach_aio_context,
3147    .bdrv_attach_aio_context = sd_attach_aio_context,
3148
3149    .create_opts    = &sd_create_opts,
3150};
3151
3152static BlockDriver bdrv_sheepdog_unix = {
3153    .format_name    = "sheepdog",
3154    .protocol_name  = "sheepdog+unix",
3155    .instance_size  = sizeof(BDRVSheepdogState),
3156    .bdrv_parse_filename    = sd_parse_filename,
3157    .bdrv_file_open = sd_open,
3158    .bdrv_reopen_prepare    = sd_reopen_prepare,
3159    .bdrv_reopen_commit     = sd_reopen_commit,
3160    .bdrv_reopen_abort      = sd_reopen_abort,
3161    .bdrv_close     = sd_close,
3162    .bdrv_create    = sd_create,
3163    .bdrv_has_zero_init = bdrv_has_zero_init_1,
3164    .bdrv_getlength = sd_getlength,
3165    .bdrv_get_allocated_file_size = sd_get_allocated_file_size,
3166    .bdrv_truncate  = sd_truncate,
3167
3168    .bdrv_co_readv  = sd_co_readv,
3169    .bdrv_co_writev = sd_co_writev,
3170    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
3171    .bdrv_co_pdiscard = sd_co_pdiscard,
3172    .bdrv_co_get_block_status = sd_co_get_block_status,
3173
3174    .bdrv_snapshot_create   = sd_snapshot_create,
3175    .bdrv_snapshot_goto     = sd_snapshot_goto,
3176    .bdrv_snapshot_delete   = sd_snapshot_delete,
3177    .bdrv_snapshot_list     = sd_snapshot_list,
3178
3179    .bdrv_save_vmstate  = sd_save_vmstate,
3180    .bdrv_load_vmstate  = sd_load_vmstate,
3181
3182    .bdrv_detach_aio_context = sd_detach_aio_context,
3183    .bdrv_attach_aio_context = sd_attach_aio_context,
3184
3185    .create_opts    = &sd_create_opts,
3186};
3187
3188static void bdrv_sheepdog_init(void)
3189{
3190    bdrv_register(&bdrv_sheepdog);
3191    bdrv_register(&bdrv_sheepdog_tcp);
3192    bdrv_register(&bdrv_sheepdog_unix);
3193}
3194block_init(bdrv_sheepdog_init);
3195