qemu/block/archipelago.c
<<
>>
Prefs
   1/*
   2 * QEMU Block driver for Archipelago
   3 *
   4 * Copyright (C) 2014 Chrysostomos Nanakos <cnanakos@grnet.gr>
   5 *
   6 * This work is licensed under the terms of the GNU GPL, version 2 or later.
   7 * See the COPYING file in the top-level directory.
   8 *
   9 */
  10
  11/*
  12 * VM Image on Archipelago volume is specified like this:
  13 *
  14 * file.driver=archipelago,file.volume=<volumename>
  15 * [,file.mport=<mapperd_port>[,file.vport=<vlmcd_port>]
  16 * [,file.segment=<segment_name>]]
  17 *
  18 * or
  19 *
  20 * file=archipelago:<volumename>[/mport=<mapperd_port>[:vport=<vlmcd_port>][:
  21 * segment=<segment_name>]]
  22 *
  23 * 'archipelago' is the protocol.
  24 *
  25 * 'mport' is the port number on which mapperd is listening. This is optional
  26 * and if not specified, QEMU will make Archipelago to use the default port.
  27 *
  28 * 'vport' is the port number on which vlmcd is listening. This is optional
  29 * and if not specified, QEMU will make Archipelago to use the default port.
  30 *
  31 * 'segment' is the name of the shared memory segment Archipelago stack
  32 * is using. This is optional and if not specified, QEMU will make Archipelago
  33 * to use the default value, 'archipelago'.
  34 *
  35 * Examples:
  36 *
  37 * file.driver=archipelago,file.volume=my_vm_volume
  38 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123
  39 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
  40 *  file.vport=1234
  41 * file.driver=archipelago,file.volume=my_vm_volume,file.mport=123,
  42 *  file.vport=1234,file.segment=my_segment
  43 *
  44 * or
  45 *
  46 * file=archipelago:my_vm_volume
  47 * file=archipelago:my_vm_volume/mport=123
  48 * file=archipelago:my_vm_volume/mport=123:vport=1234
  49 * file=archipelago:my_vm_volume/mport=123:vport=1234:segment=my_segment
  50 *
  51 */
  52
  53#include "qemu/osdep.h"
  54#include "qemu/cutils.h"
  55#include "block/block_int.h"
  56#include "qemu/error-report.h"
  57#include "qemu/thread.h"
  58#include "qapi/qmp/qint.h"
  59#include "qapi/qmp/qstring.h"
  60#include "qapi/qmp/qjson.h"
  61#include "qemu/atomic.h"
  62
  63#include <xseg/xseg.h>
  64#include <xseg/protocol.h>
  65
  66#define MAX_REQUEST_SIZE    524288
  67
  68#define ARCHIPELAGO_OPT_VOLUME      "volume"
  69#define ARCHIPELAGO_OPT_SEGMENT     "segment"
  70#define ARCHIPELAGO_OPT_MPORT       "mport"
  71#define ARCHIPELAGO_OPT_VPORT       "vport"
  72#define ARCHIPELAGO_DFL_MPORT       1001
  73#define ARCHIPELAGO_DFL_VPORT       501
  74
  75#define archipelagolog(fmt, ...) \
  76    do {                         \
  77        fprintf(stderr, "archipelago\t%-24s: " fmt, __func__, ##__VA_ARGS__); \
  78    } while (0)
  79
  80typedef enum {
  81    ARCHIP_OP_READ,
  82    ARCHIP_OP_WRITE,
  83    ARCHIP_OP_FLUSH,
  84    ARCHIP_OP_VOLINFO,
  85    ARCHIP_OP_TRUNCATE,
  86} ARCHIPCmd;
  87
  88typedef struct ArchipelagoAIOCB {
  89    BlockAIOCB common;
  90    QEMUBH *bh;
  91    struct BDRVArchipelagoState *s;
  92    QEMUIOVector *qiov;
  93    ARCHIPCmd cmd;
  94    int status;
  95    int64_t size;
  96    int64_t ret;
  97} ArchipelagoAIOCB;
  98
  99typedef struct BDRVArchipelagoState {
 100    ArchipelagoAIOCB *event_acb;
 101    char *volname;
 102    char *segment_name;
 103    uint64_t size;
 104    /* Archipelago specific */
 105    struct xseg *xseg;
 106    struct xseg_port *port;
 107    xport srcport;
 108    xport sport;
 109    xport mportno;
 110    xport vportno;
 111    QemuMutex archip_mutex;
 112    QemuCond archip_cond;
 113    bool is_signaled;
 114    /* Request handler specific */
 115    QemuThread request_th;
 116    QemuCond request_cond;
 117    QemuMutex request_mutex;
 118    bool th_is_signaled;
 119    bool stopping;
 120} BDRVArchipelagoState;
 121
 122typedef struct ArchipelagoSegmentedRequest {
 123    size_t count;
 124    size_t total;
 125    int ref;
 126    int failed;
 127} ArchipelagoSegmentedRequest;
 128
 129typedef struct AIORequestData {
 130    const char *volname;
 131    off_t offset;
 132    size_t size;
 133    uint64_t bufidx;
 134    int ret;
 135    int op;
 136    ArchipelagoAIOCB *aio_cb;
 137    ArchipelagoSegmentedRequest *segreq;
 138} AIORequestData;
 139
 140static void qemu_archipelago_complete_aio(void *opaque);
 141
 142static void init_local_signal(struct xseg *xseg, xport sport, xport srcport)
 143{
 144    if (xseg && (sport != srcport)) {
 145        xseg_init_local_signal(xseg, srcport);
 146        sport = srcport;
 147    }
 148}
 149
 150static void archipelago_finish_aiocb(AIORequestData *reqdata)
 151{
 152    if (reqdata->aio_cb->ret != reqdata->segreq->total) {
 153        reqdata->aio_cb->ret = -EIO;
 154    } else if (reqdata->aio_cb->ret == reqdata->segreq->total) {
 155        reqdata->aio_cb->ret = 0;
 156    }
 157    reqdata->aio_cb->bh = aio_bh_new(
 158                        bdrv_get_aio_context(reqdata->aio_cb->common.bs),
 159                        qemu_archipelago_complete_aio, reqdata
 160                        );
 161    qemu_bh_schedule(reqdata->aio_cb->bh);
 162}
 163
 164static int wait_reply(struct xseg *xseg, xport srcport, struct xseg_port *port,
 165                      struct xseg_request *expected_req)
 166{
 167    struct xseg_request *req;
 168    xseg_prepare_wait(xseg, srcport);
 169    void *psd = xseg_get_signal_desc(xseg, port);
 170    while (1) {
 171        req = xseg_receive(xseg, srcport, X_NONBLOCK);
 172        if (req) {
 173            if (req != expected_req) {
 174                archipelagolog("Unknown received request\n");
 175                xseg_put_request(xseg, req, srcport);
 176            } else if (!(req->state & XS_SERVED)) {
 177                return -1;
 178            } else {
 179                break;
 180            }
 181        }
 182        xseg_wait_signal(xseg, psd, 100000UL);
 183    }
 184    xseg_cancel_wait(xseg, srcport);
 185    return 0;
 186}
 187
 188static void xseg_request_handler(void *state)
 189{
 190    BDRVArchipelagoState *s = (BDRVArchipelagoState *) state;
 191    void *psd = xseg_get_signal_desc(s->xseg, s->port);
 192    qemu_mutex_lock(&s->request_mutex);
 193
 194    while (!s->stopping) {
 195        struct xseg_request *req;
 196        void *data;
 197        xseg_prepare_wait(s->xseg, s->srcport);
 198        req = xseg_receive(s->xseg, s->srcport, X_NONBLOCK);
 199        if (req) {
 200            AIORequestData *reqdata;
 201            ArchipelagoSegmentedRequest *segreq;
 202            xseg_get_req_data(s->xseg, req, (void **)&reqdata);
 203
 204            switch (reqdata->op) {
 205            case ARCHIP_OP_READ:
 206                data = xseg_get_data(s->xseg, req);
 207                segreq = reqdata->segreq;
 208                segreq->count += req->serviced;
 209
 210                qemu_iovec_from_buf(reqdata->aio_cb->qiov, reqdata->bufidx,
 211                                    data,
 212                                    req->serviced);
 213
 214                xseg_put_request(s->xseg, req, s->srcport);
 215
 216                if (atomic_fetch_dec(&segreq->ref) == 1) {
 217                    if (!segreq->failed) {
 218                        reqdata->aio_cb->ret = segreq->count;
 219                        archipelago_finish_aiocb(reqdata);
 220                        g_free(segreq);
 221                    } else {
 222                        g_free(segreq);
 223                        g_free(reqdata);
 224                    }
 225                } else {
 226                    g_free(reqdata);
 227                }
 228                break;
 229            case ARCHIP_OP_WRITE:
 230            case ARCHIP_OP_FLUSH:
 231                segreq = reqdata->segreq;
 232                segreq->count += req->serviced;
 233                xseg_put_request(s->xseg, req, s->srcport);
 234
 235                if (atomic_fetch_dec(&segreq->ref) == 1) {
 236                    if (!segreq->failed) {
 237                        reqdata->aio_cb->ret = segreq->count;
 238                        archipelago_finish_aiocb(reqdata);
 239                        g_free(segreq);
 240                    } else {
 241                        g_free(segreq);
 242                        g_free(reqdata);
 243                    }
 244                } else {
 245                    g_free(reqdata);
 246                }
 247                break;
 248            case ARCHIP_OP_VOLINFO:
 249            case ARCHIP_OP_TRUNCATE:
 250                s->is_signaled = true;
 251                qemu_cond_signal(&s->archip_cond);
 252                break;
 253            }
 254        } else {
 255            xseg_wait_signal(s->xseg, psd, 100000UL);
 256        }
 257        xseg_cancel_wait(s->xseg, s->srcport);
 258    }
 259
 260    s->th_is_signaled = true;
 261    qemu_cond_signal(&s->request_cond);
 262    qemu_mutex_unlock(&s->request_mutex);
 263    qemu_thread_exit(NULL);
 264}
 265
 266static int qemu_archipelago_xseg_init(BDRVArchipelagoState *s)
 267{
 268    if (xseg_initialize()) {
 269        archipelagolog("Cannot initialize XSEG\n");
 270        goto err_exit;
 271    }
 272
 273    s->xseg = xseg_join("posix", s->segment_name,
 274                        "posixfd", NULL);
 275    if (!s->xseg) {
 276        archipelagolog("Cannot join XSEG shared memory segment\n");
 277        goto err_exit;
 278    }
 279    s->port = xseg_bind_dynport(s->xseg);
 280    s->srcport = s->port->portno;
 281    init_local_signal(s->xseg, s->sport, s->srcport);
 282    return 0;
 283
 284err_exit:
 285    return -1;
 286}
 287
 288static int qemu_archipelago_init(BDRVArchipelagoState *s)
 289{
 290    int ret;
 291
 292    ret = qemu_archipelago_xseg_init(s);
 293    if (ret < 0) {
 294        error_report("Cannot initialize XSEG. Aborting...");
 295        goto err_exit;
 296    }
 297
 298    qemu_cond_init(&s->archip_cond);
 299    qemu_mutex_init(&s->archip_mutex);
 300    qemu_cond_init(&s->request_cond);
 301    qemu_mutex_init(&s->request_mutex);
 302    s->th_is_signaled = false;
 303    qemu_thread_create(&s->request_th, "xseg_io_th",
 304                       (void *) xseg_request_handler,
 305                       (void *) s, QEMU_THREAD_JOINABLE);
 306
 307err_exit:
 308    return ret;
 309}
 310
 311static void qemu_archipelago_complete_aio(void *opaque)
 312{
 313    AIORequestData *reqdata = (AIORequestData *) opaque;
 314    ArchipelagoAIOCB *aio_cb = (ArchipelagoAIOCB *) reqdata->aio_cb;
 315
 316    qemu_bh_delete(aio_cb->bh);
 317    aio_cb->common.cb(aio_cb->common.opaque, aio_cb->ret);
 318    aio_cb->status = 0;
 319
 320    qemu_aio_unref(aio_cb);
 321    g_free(reqdata);
 322}
 323
 324static void xseg_find_port(char *pstr, const char *needle, xport *aport)
 325{
 326    const char *a;
 327    char *endptr = NULL;
 328    unsigned long port;
 329    if (strstart(pstr, needle, &a)) {
 330        if (strlen(a) > 0) {
 331            port = strtoul(a, &endptr, 10);
 332            if (strlen(endptr)) {
 333                *aport = -2;
 334                return;
 335            }
 336            *aport = (xport) port;
 337        }
 338    }
 339}
 340
 341static void xseg_find_segment(char *pstr, const char *needle,
 342                              char **segment_name)
 343{
 344    const char *a;
 345    if (strstart(pstr, needle, &a)) {
 346        if (strlen(a) > 0) {
 347            *segment_name = g_strdup(a);
 348        }
 349    }
 350}
 351
 352static void parse_filename_opts(const char *filename, Error **errp,
 353                                char **volume, char **segment_name,
 354                                xport *mport, xport *vport)
 355{
 356    const char *start;
 357    char *tokens[4], *ds;
 358    int idx;
 359    xport lmport = NoPort, lvport = NoPort;
 360
 361    strstart(filename, "archipelago:", &start);
 362
 363    ds = g_strdup(start);
 364    tokens[0] = strtok(ds, "/");
 365    tokens[1] = strtok(NULL, ":");
 366    tokens[2] = strtok(NULL, ":");
 367    tokens[3] = strtok(NULL, "\0");
 368
 369    if (!strlen(tokens[0])) {
 370        error_setg(errp, "volume name must be specified first");
 371        g_free(ds);
 372        return;
 373    }
 374
 375    for (idx = 1; idx < 4; idx++) {
 376        if (tokens[idx] != NULL) {
 377            if (strstart(tokens[idx], "mport=", NULL)) {
 378                xseg_find_port(tokens[idx], "mport=", &lmport);
 379            }
 380            if (strstart(tokens[idx], "vport=", NULL)) {
 381                xseg_find_port(tokens[idx], "vport=", &lvport);
 382            }
 383            if (strstart(tokens[idx], "segment=", NULL)) {
 384                xseg_find_segment(tokens[idx], "segment=", segment_name);
 385            }
 386        }
 387    }
 388
 389    if ((lmport == -2) || (lvport == -2)) {
 390        error_setg(errp, "mport and/or vport must be set");
 391        g_free(ds);
 392        return;
 393    }
 394    *volume = g_strdup(tokens[0]);
 395    *mport = lmport;
 396    *vport = lvport;
 397    g_free(ds);
 398}
 399
 400static void archipelago_parse_filename(const char *filename, QDict *options,
 401                                       Error **errp)
 402{
 403    const char *start;
 404    char *volume = NULL, *segment_name = NULL;
 405    xport mport = NoPort, vport = NoPort;
 406
 407    if (qdict_haskey(options, ARCHIPELAGO_OPT_VOLUME)
 408            || qdict_haskey(options, ARCHIPELAGO_OPT_SEGMENT)
 409            || qdict_haskey(options, ARCHIPELAGO_OPT_MPORT)
 410            || qdict_haskey(options, ARCHIPELAGO_OPT_VPORT)) {
 411        error_setg(errp, "volume/mport/vport/segment and a file name may not"
 412                         " be specified at the same time");
 413        return;
 414    }
 415
 416    if (!strstart(filename, "archipelago:", &start)) {
 417        error_setg(errp, "File name must start with 'archipelago:'");
 418        return;
 419    }
 420
 421    if (!strlen(start) || strstart(start, "/", NULL)) {
 422        error_setg(errp, "volume name must be specified");
 423        return;
 424    }
 425
 426    parse_filename_opts(filename, errp, &volume, &segment_name, &mport, &vport);
 427
 428    if (volume) {
 429        qdict_put(options, ARCHIPELAGO_OPT_VOLUME, qstring_from_str(volume));
 430        g_free(volume);
 431    }
 432    if (segment_name) {
 433        qdict_put(options, ARCHIPELAGO_OPT_SEGMENT,
 434                  qstring_from_str(segment_name));
 435        g_free(segment_name);
 436    }
 437    if (mport != NoPort) {
 438        qdict_put(options, ARCHIPELAGO_OPT_MPORT, qint_from_int(mport));
 439    }
 440    if (vport != NoPort) {
 441        qdict_put(options, ARCHIPELAGO_OPT_VPORT, qint_from_int(vport));
 442    }
 443}
 444
 445static QemuOptsList archipelago_runtime_opts = {
 446    .name = "archipelago",
 447    .head = QTAILQ_HEAD_INITIALIZER(archipelago_runtime_opts.head),
 448    .desc = {
 449        {
 450            .name = ARCHIPELAGO_OPT_VOLUME,
 451            .type = QEMU_OPT_STRING,
 452            .help = "Name of the volume image",
 453        },
 454        {
 455            .name = ARCHIPELAGO_OPT_SEGMENT,
 456            .type = QEMU_OPT_STRING,
 457            .help = "Name of the Archipelago shared memory segment",
 458        },
 459        {
 460            .name = ARCHIPELAGO_OPT_MPORT,
 461            .type = QEMU_OPT_NUMBER,
 462            .help = "Archipelago mapperd port number"
 463        },
 464        {
 465            .name = ARCHIPELAGO_OPT_VPORT,
 466            .type = QEMU_OPT_NUMBER,
 467            .help = "Archipelago vlmcd port number"
 468
 469        },
 470        { /* end of list */ }
 471    },
 472};
 473
 474static int qemu_archipelago_open(BlockDriverState *bs,
 475                                 QDict *options,
 476                                 int bdrv_flags,
 477                                 Error **errp)
 478{
 479    int ret = 0;
 480    const char *volume, *segment_name;
 481    QemuOpts *opts;
 482    Error *local_err = NULL;
 483    BDRVArchipelagoState *s = bs->opaque;
 484
 485    opts = qemu_opts_create(&archipelago_runtime_opts, NULL, 0, &error_abort);
 486    qemu_opts_absorb_qdict(opts, options, &local_err);
 487    if (local_err) {
 488        error_propagate(errp, local_err);
 489        ret = -EINVAL;
 490        goto err_exit;
 491    }
 492
 493    s->mportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_MPORT,
 494                                     ARCHIPELAGO_DFL_MPORT);
 495    s->vportno = qemu_opt_get_number(opts, ARCHIPELAGO_OPT_VPORT,
 496                                     ARCHIPELAGO_DFL_VPORT);
 497
 498    segment_name = qemu_opt_get(opts, ARCHIPELAGO_OPT_SEGMENT);
 499    if (segment_name == NULL) {
 500        s->segment_name = g_strdup("archipelago");
 501    } else {
 502        s->segment_name = g_strdup(segment_name);
 503    }
 504
 505    volume = qemu_opt_get(opts, ARCHIPELAGO_OPT_VOLUME);
 506    if (volume == NULL) {
 507        error_setg(errp, "archipelago block driver requires the 'volume'"
 508                   " option");
 509        ret = -EINVAL;
 510        goto err_exit;
 511    }
 512    s->volname = g_strdup(volume);
 513
 514    /* Initialize XSEG, join shared memory segment */
 515    ret = qemu_archipelago_init(s);
 516    if (ret < 0) {
 517        error_setg(errp, "cannot initialize XSEG and join shared "
 518                   "memory segment");
 519        goto err_exit;
 520    }
 521
 522    qemu_opts_del(opts);
 523    return 0;
 524
 525err_exit:
 526    g_free(s->volname);
 527    g_free(s->segment_name);
 528    qemu_opts_del(opts);
 529    return ret;
 530}
 531
 532static void qemu_archipelago_close(BlockDriverState *bs)
 533{
 534    int r, targetlen;
 535    char *target;
 536    struct xseg_request *req;
 537    BDRVArchipelagoState *s = bs->opaque;
 538
 539    s->stopping = true;
 540
 541    qemu_mutex_lock(&s->request_mutex);
 542    while (!s->th_is_signaled) {
 543        qemu_cond_wait(&s->request_cond,
 544                       &s->request_mutex);
 545    }
 546    qemu_mutex_unlock(&s->request_mutex);
 547    qemu_thread_join(&s->request_th);
 548    qemu_cond_destroy(&s->request_cond);
 549    qemu_mutex_destroy(&s->request_mutex);
 550
 551    qemu_cond_destroy(&s->archip_cond);
 552    qemu_mutex_destroy(&s->archip_mutex);
 553
 554    targetlen = strlen(s->volname);
 555    req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
 556    if (!req) {
 557        archipelagolog("Cannot get XSEG request\n");
 558        goto err_exit;
 559    }
 560    r = xseg_prep_request(s->xseg, req, targetlen, 0);
 561    if (r < 0) {
 562        xseg_put_request(s->xseg, req, s->srcport);
 563        archipelagolog("Cannot prepare XSEG close request\n");
 564        goto err_exit;
 565    }
 566
 567    target = xseg_get_target(s->xseg, req);
 568    memcpy(target, s->volname, targetlen);
 569    req->size = req->datalen;
 570    req->offset = 0;
 571    req->op = X_CLOSE;
 572
 573    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
 574    if (p == NoPort) {
 575        xseg_put_request(s->xseg, req, s->srcport);
 576        archipelagolog("Cannot submit XSEG close request\n");
 577        goto err_exit;
 578    }
 579
 580    xseg_signal(s->xseg, p);
 581    wait_reply(s->xseg, s->srcport, s->port, req);
 582
 583    xseg_put_request(s->xseg, req, s->srcport);
 584
 585err_exit:
 586    g_free(s->volname);
 587    g_free(s->segment_name);
 588    xseg_quit_local_signal(s->xseg, s->srcport);
 589    xseg_leave_dynport(s->xseg, s->port);
 590    xseg_leave(s->xseg);
 591}
 592
 593static int qemu_archipelago_create_volume(Error **errp, const char *volname,
 594                                          char *segment_name,
 595                                          uint64_t size, xport mportno,
 596                                          xport vportno)
 597{
 598    int ret, targetlen;
 599    struct xseg *xseg = NULL;
 600    struct xseg_request *req;
 601    struct xseg_request_clone *xclone;
 602    struct xseg_port *port;
 603    xport srcport = NoPort, sport = NoPort;
 604    char *target;
 605
 606    /* Try default values if none has been set */
 607    if (mportno == (xport) -1) {
 608        mportno = ARCHIPELAGO_DFL_MPORT;
 609    }
 610
 611    if (vportno == (xport) -1) {
 612        vportno = ARCHIPELAGO_DFL_VPORT;
 613    }
 614
 615    if (xseg_initialize()) {
 616        error_setg(errp, "Cannot initialize XSEG");
 617        return -1;
 618    }
 619
 620    xseg = xseg_join("posix", segment_name,
 621                     "posixfd", NULL);
 622
 623    if (!xseg) {
 624        error_setg(errp, "Cannot join XSEG shared memory segment");
 625        return -1;
 626    }
 627
 628    port = xseg_bind_dynport(xseg);
 629    srcport = port->portno;
 630    init_local_signal(xseg, sport, srcport);
 631
 632    req = xseg_get_request(xseg, srcport, mportno, X_ALLOC);
 633    if (!req) {
 634        error_setg(errp, "Cannot get XSEG request");
 635        return -1;
 636    }
 637
 638    targetlen = strlen(volname);
 639    ret = xseg_prep_request(xseg, req, targetlen,
 640                            sizeof(struct xseg_request_clone));
 641    if (ret < 0) {
 642        error_setg(errp, "Cannot prepare XSEG request");
 643        goto err_exit;
 644    }
 645
 646    target = xseg_get_target(xseg, req);
 647    if (!target) {
 648        error_setg(errp, "Cannot get XSEG target.");
 649        goto err_exit;
 650    }
 651    memcpy(target, volname, targetlen);
 652    xclone = (struct xseg_request_clone *) xseg_get_data(xseg, req);
 653    memset(xclone->target, 0 , XSEG_MAX_TARGETLEN);
 654    xclone->targetlen = 0;
 655    xclone->size = size;
 656    req->offset = 0;
 657    req->size = req->datalen;
 658    req->op = X_CLONE;
 659
 660    xport p = xseg_submit(xseg, req, srcport, X_ALLOC);
 661    if (p == NoPort) {
 662        error_setg(errp, "Could not submit XSEG request");
 663        goto err_exit;
 664    }
 665    xseg_signal(xseg, p);
 666
 667    ret = wait_reply(xseg, srcport, port, req);
 668    if (ret < 0) {
 669        error_setg(errp, "wait_reply() error.");
 670    }
 671
 672    xseg_put_request(xseg, req, srcport);
 673    xseg_quit_local_signal(xseg, srcport);
 674    xseg_leave_dynport(xseg, port);
 675    xseg_leave(xseg);
 676    return ret;
 677
 678err_exit:
 679    xseg_put_request(xseg, req, srcport);
 680    xseg_quit_local_signal(xseg, srcport);
 681    xseg_leave_dynport(xseg, port);
 682    xseg_leave(xseg);
 683    return -1;
 684}
 685
 686static int qemu_archipelago_create(const char *filename,
 687                                   QemuOpts *options,
 688                                   Error **errp)
 689{
 690    int ret = 0;
 691    uint64_t total_size = 0;
 692    char *volname = NULL, *segment_name = NULL;
 693    const char *start;
 694    xport mport = NoPort, vport = NoPort;
 695
 696    if (!strstart(filename, "archipelago:", &start)) {
 697        error_setg(errp, "File name must start with 'archipelago:'");
 698        return -1;
 699    }
 700
 701    if (!strlen(start) || strstart(start, "/", NULL)) {
 702        error_setg(errp, "volume name must be specified");
 703        return -1;
 704    }
 705
 706    parse_filename_opts(filename, errp, &volname, &segment_name, &mport,
 707                        &vport);
 708    total_size = ROUND_UP(qemu_opt_get_size_del(options, BLOCK_OPT_SIZE, 0),
 709                          BDRV_SECTOR_SIZE);
 710
 711    if (segment_name == NULL) {
 712        segment_name = g_strdup("archipelago");
 713    }
 714
 715    /* Create an Archipelago volume */
 716    ret = qemu_archipelago_create_volume(errp, volname, segment_name,
 717                                         total_size, mport,
 718                                         vport);
 719
 720    g_free(volname);
 721    g_free(segment_name);
 722    return ret;
 723}
 724
 725static const AIOCBInfo archipelago_aiocb_info = {
 726    .aiocb_size = sizeof(ArchipelagoAIOCB),
 727};
 728
 729static int archipelago_submit_request(BDRVArchipelagoState *s,
 730                                        uint64_t bufidx,
 731                                        size_t count,
 732                                        off_t offset,
 733                                        ArchipelagoAIOCB *aio_cb,
 734                                        ArchipelagoSegmentedRequest *segreq,
 735                                        int op)
 736{
 737    int ret, targetlen;
 738    char *target;
 739    void *data = NULL;
 740    struct xseg_request *req;
 741    AIORequestData *reqdata = g_new(AIORequestData, 1);
 742
 743    targetlen = strlen(s->volname);
 744    req = xseg_get_request(s->xseg, s->srcport, s->vportno, X_ALLOC);
 745    if (!req) {
 746        archipelagolog("Cannot get XSEG request\n");
 747        goto err_exit2;
 748    }
 749    ret = xseg_prep_request(s->xseg, req, targetlen, count);
 750    if (ret < 0) {
 751        archipelagolog("Cannot prepare XSEG request\n");
 752        goto err_exit;
 753    }
 754    target = xseg_get_target(s->xseg, req);
 755    if (!target) {
 756        archipelagolog("Cannot get XSEG target\n");
 757        goto err_exit;
 758    }
 759    memcpy(target, s->volname, targetlen);
 760    req->size = count;
 761    req->offset = offset;
 762
 763    switch (op) {
 764    case ARCHIP_OP_READ:
 765        req->op = X_READ;
 766        break;
 767    case ARCHIP_OP_WRITE:
 768        req->op = X_WRITE;
 769        break;
 770    case ARCHIP_OP_FLUSH:
 771        req->op = X_FLUSH;
 772        break;
 773    }
 774    reqdata->volname = s->volname;
 775    reqdata->offset = offset;
 776    reqdata->size = count;
 777    reqdata->bufidx = bufidx;
 778    reqdata->aio_cb = aio_cb;
 779    reqdata->segreq = segreq;
 780    reqdata->op = op;
 781
 782    xseg_set_req_data(s->xseg, req, reqdata);
 783    if (op == ARCHIP_OP_WRITE) {
 784        data = xseg_get_data(s->xseg, req);
 785        if (!data) {
 786            archipelagolog("Cannot get XSEG data\n");
 787            goto err_exit;
 788        }
 789        qemu_iovec_to_buf(aio_cb->qiov, bufidx, data, count);
 790    }
 791
 792    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
 793    if (p == NoPort) {
 794        archipelagolog("Could not submit XSEG request\n");
 795        goto err_exit;
 796    }
 797    xseg_signal(s->xseg, p);
 798    return 0;
 799
 800err_exit:
 801    g_free(reqdata);
 802    xseg_put_request(s->xseg, req, s->srcport);
 803    return -EIO;
 804err_exit2:
 805    g_free(reqdata);
 806    return -EIO;
 807}
 808
 809static int archipelago_aio_segmented_rw(BDRVArchipelagoState *s,
 810                                        size_t count,
 811                                        off_t offset,
 812                                        ArchipelagoAIOCB *aio_cb,
 813                                        int op)
 814{
 815    int ret, segments_nr;
 816    size_t pos = 0;
 817    ArchipelagoSegmentedRequest *segreq;
 818
 819    segreq = g_new0(ArchipelagoSegmentedRequest, 1);
 820
 821    if (op == ARCHIP_OP_FLUSH) {
 822        segments_nr = 1;
 823    } else {
 824        segments_nr = (int)(count / MAX_REQUEST_SIZE) + \
 825                      ((count % MAX_REQUEST_SIZE) ? 1 : 0);
 826    }
 827    segreq->total = count;
 828    atomic_mb_set(&segreq->ref, segments_nr);
 829
 830    while (segments_nr > 1) {
 831        ret = archipelago_submit_request(s, pos,
 832                                            MAX_REQUEST_SIZE,
 833                                            offset + pos,
 834                                            aio_cb, segreq, op);
 835
 836        if (ret < 0) {
 837            goto err_exit;
 838        }
 839        count -= MAX_REQUEST_SIZE;
 840        pos += MAX_REQUEST_SIZE;
 841        segments_nr--;
 842    }
 843    ret = archipelago_submit_request(s, pos, count, offset + pos,
 844                                     aio_cb, segreq, op);
 845
 846    if (ret < 0) {
 847        goto err_exit;
 848    }
 849    return 0;
 850
 851err_exit:
 852    segreq->failed = 1;
 853    if (atomic_fetch_sub(&segreq->ref, segments_nr) == segments_nr) {
 854        g_free(segreq);
 855    }
 856    return ret;
 857}
 858
 859static BlockAIOCB *qemu_archipelago_aio_rw(BlockDriverState *bs,
 860                                           int64_t sector_num,
 861                                           QEMUIOVector *qiov,
 862                                           int nb_sectors,
 863                                           BlockCompletionFunc *cb,
 864                                           void *opaque,
 865                                           int op)
 866{
 867    ArchipelagoAIOCB *aio_cb;
 868    BDRVArchipelagoState *s = bs->opaque;
 869    int64_t size, off;
 870    int ret;
 871
 872    aio_cb = qemu_aio_get(&archipelago_aiocb_info, bs, cb, opaque);
 873    aio_cb->cmd = op;
 874    aio_cb->qiov = qiov;
 875
 876    aio_cb->ret = 0;
 877    aio_cb->s = s;
 878    aio_cb->status = -EINPROGRESS;
 879
 880    off = sector_num * BDRV_SECTOR_SIZE;
 881    size = nb_sectors * BDRV_SECTOR_SIZE;
 882    aio_cb->size = size;
 883
 884    ret = archipelago_aio_segmented_rw(s, size, off,
 885                                       aio_cb, op);
 886    if (ret < 0) {
 887        goto err_exit;
 888    }
 889    return &aio_cb->common;
 890
 891err_exit:
 892    error_report("qemu_archipelago_aio_rw(): I/O Error");
 893    qemu_aio_unref(aio_cb);
 894    return NULL;
 895}
 896
 897static BlockAIOCB *qemu_archipelago_aio_readv(BlockDriverState *bs,
 898        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
 899        BlockCompletionFunc *cb, void *opaque)
 900{
 901    return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
 902                                   opaque, ARCHIP_OP_READ);
 903}
 904
 905static BlockAIOCB *qemu_archipelago_aio_writev(BlockDriverState *bs,
 906        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
 907        BlockCompletionFunc *cb, void *opaque)
 908{
 909    return qemu_archipelago_aio_rw(bs, sector_num, qiov, nb_sectors, cb,
 910                                   opaque, ARCHIP_OP_WRITE);
 911}
 912
 913static int64_t archipelago_volume_info(BDRVArchipelagoState *s)
 914{
 915    uint64_t size;
 916    int ret, targetlen;
 917    struct xseg_request *req;
 918    struct xseg_reply_info *xinfo;
 919    AIORequestData *reqdata = g_new(AIORequestData, 1);
 920
 921    const char *volname = s->volname;
 922    targetlen = strlen(volname);
 923    req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
 924    if (!req) {
 925        archipelagolog("Cannot get XSEG request\n");
 926        goto err_exit2;
 927    }
 928    ret = xseg_prep_request(s->xseg, req, targetlen,
 929                            sizeof(struct xseg_reply_info));
 930    if (ret < 0) {
 931        archipelagolog("Cannot prepare XSEG request\n");
 932        goto err_exit;
 933    }
 934    char *target = xseg_get_target(s->xseg, req);
 935    if (!target) {
 936        archipelagolog("Cannot get XSEG target\n");
 937        goto err_exit;
 938    }
 939    memcpy(target, volname, targetlen);
 940    req->size = req->datalen;
 941    req->offset = 0;
 942    req->op = X_INFO;
 943
 944    reqdata->op = ARCHIP_OP_VOLINFO;
 945    reqdata->volname = volname;
 946    xseg_set_req_data(s->xseg, req, reqdata);
 947
 948    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
 949    if (p == NoPort) {
 950        archipelagolog("Cannot submit XSEG request\n");
 951        goto err_exit;
 952    }
 953    xseg_signal(s->xseg, p);
 954    qemu_mutex_lock(&s->archip_mutex);
 955    while (!s->is_signaled) {
 956        qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
 957    }
 958    s->is_signaled = false;
 959    qemu_mutex_unlock(&s->archip_mutex);
 960
 961    xinfo = (struct xseg_reply_info *) xseg_get_data(s->xseg, req);
 962    size = xinfo->size;
 963    xseg_put_request(s->xseg, req, s->srcport);
 964    g_free(reqdata);
 965    s->size = size;
 966    return size;
 967
 968err_exit:
 969    xseg_put_request(s->xseg, req, s->srcport);
 970err_exit2:
 971    g_free(reqdata);
 972    return -EIO;
 973}
 974
 975static int64_t qemu_archipelago_getlength(BlockDriverState *bs)
 976{
 977    int64_t ret;
 978    BDRVArchipelagoState *s = bs->opaque;
 979
 980    ret = archipelago_volume_info(s);
 981    return ret;
 982}
 983
 984static int qemu_archipelago_truncate(BlockDriverState *bs, int64_t offset)
 985{
 986    int ret, targetlen;
 987    struct xseg_request *req;
 988    BDRVArchipelagoState *s = bs->opaque;
 989    AIORequestData *reqdata = g_new(AIORequestData, 1);
 990
 991    const char *volname = s->volname;
 992    targetlen = strlen(volname);
 993    req = xseg_get_request(s->xseg, s->srcport, s->mportno, X_ALLOC);
 994    if (!req) {
 995        archipelagolog("Cannot get XSEG request\n");
 996        goto err_exit2;
 997    }
 998
 999    ret = xseg_prep_request(s->xseg, req, targetlen, 0);
1000    if (ret < 0) {
1001        archipelagolog("Cannot prepare XSEG request\n");
1002        goto err_exit;
1003    }
1004    char *target = xseg_get_target(s->xseg, req);
1005    if (!target) {
1006        archipelagolog("Cannot get XSEG target\n");
1007        goto err_exit;
1008    }
1009    memcpy(target, volname, targetlen);
1010    req->offset = offset;
1011    req->op = X_TRUNCATE;
1012
1013    reqdata->op = ARCHIP_OP_TRUNCATE;
1014    reqdata->volname = volname;
1015
1016    xseg_set_req_data(s->xseg, req, reqdata);
1017
1018    xport p = xseg_submit(s->xseg, req, s->srcport, X_ALLOC);
1019    if (p == NoPort) {
1020        archipelagolog("Cannot submit XSEG request\n");
1021        goto err_exit;
1022    }
1023
1024    xseg_signal(s->xseg, p);
1025    qemu_mutex_lock(&s->archip_mutex);
1026    while (!s->is_signaled) {
1027        qemu_cond_wait(&s->archip_cond, &s->archip_mutex);
1028    }
1029    s->is_signaled = false;
1030    qemu_mutex_unlock(&s->archip_mutex);
1031    xseg_put_request(s->xseg, req, s->srcport);
1032    g_free(reqdata);
1033    return 0;
1034
1035err_exit:
1036    xseg_put_request(s->xseg, req, s->srcport);
1037err_exit2:
1038    g_free(reqdata);
1039    return -EIO;
1040}
1041
1042static QemuOptsList qemu_archipelago_create_opts = {
1043    .name = "archipelago-create-opts",
1044    .head = QTAILQ_HEAD_INITIALIZER(qemu_archipelago_create_opts.head),
1045    .desc = {
1046        {
1047            .name = BLOCK_OPT_SIZE,
1048            .type = QEMU_OPT_SIZE,
1049            .help = "Virtual disk size"
1050        },
1051        { /* end of list */ }
1052    }
1053};
1054
1055static BlockAIOCB *qemu_archipelago_aio_flush(BlockDriverState *bs,
1056        BlockCompletionFunc *cb, void *opaque)
1057{
1058    return qemu_archipelago_aio_rw(bs, 0, NULL, 0, cb, opaque,
1059                                   ARCHIP_OP_FLUSH);
1060}
1061
1062static BlockDriver bdrv_archipelago = {
1063    .format_name         = "archipelago",
1064    .protocol_name       = "archipelago",
1065    .instance_size       = sizeof(BDRVArchipelagoState),
1066    .bdrv_parse_filename = archipelago_parse_filename,
1067    .bdrv_file_open      = qemu_archipelago_open,
1068    .bdrv_close          = qemu_archipelago_close,
1069    .bdrv_create         = qemu_archipelago_create,
1070    .bdrv_getlength      = qemu_archipelago_getlength,
1071    .bdrv_truncate       = qemu_archipelago_truncate,
1072    .bdrv_aio_readv      = qemu_archipelago_aio_readv,
1073    .bdrv_aio_writev     = qemu_archipelago_aio_writev,
1074    .bdrv_aio_flush      = qemu_archipelago_aio_flush,
1075    .bdrv_has_zero_init  = bdrv_has_zero_init_1,
1076    .create_opts         = &qemu_archipelago_create_opts,
1077};
1078
1079static void bdrv_archipelago_init(void)
1080{
1081    bdrv_register(&bdrv_archipelago);
1082}
1083
1084block_init(bdrv_archipelago_init);
1085