qemu/blockjob.c
<<
>>
Prefs
   1/*
   2 * QEMU System Emulator block driver
   3 *
   4 * Copyright (c) 2011 IBM Corp.
   5 * Copyright (c) 2012 Red Hat, Inc.
   6 *
   7 * Permission is hereby granted, free of charge, to any person obtaining a copy
   8 * of this software and associated documentation files (the "Software"), to deal
   9 * in the Software without restriction, including without limitation the rights
  10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 * copies of the Software, and to permit persons to whom the Software is
  12 * furnished to do so, subject to the following conditions:
  13 *
  14 * The above copyright notice and this permission notice shall be included in
  15 * all copies or substantial portions of the Software.
  16 *
  17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23 * THE SOFTWARE.
  24 */
  25
  26#include "qemu/osdep.h"
  27#include "qemu-common.h"
  28#include "block/block.h"
  29#include "block/blockjob_int.h"
  30#include "block/block_int.h"
  31#include "block/trace.h"
  32#include "sysemu/block-backend.h"
  33#include "qapi/error.h"
  34#include "qapi/qapi-events-block-core.h"
  35#include "qapi/qmp/qerror.h"
  36#include "qemu/coroutine.h"
  37#include "qemu/id.h"
  38#include "qemu/timer.h"
  39
  40/* Right now, this mutex is only needed to synchronize accesses to job->busy
  41 * and job->sleep_timer, such as concurrent calls to block_job_do_yield and
  42 * block_job_enter. */
  43static QemuMutex block_job_mutex;
  44
  45/* BlockJob State Transition Table */
  46bool BlockJobSTT[BLOCK_JOB_STATUS__MAX][BLOCK_JOB_STATUS__MAX] = {
  47                                          /* U, C, R, P, Y, S, W, D, X, E, N */
  48    /* U: */ [BLOCK_JOB_STATUS_UNDEFINED] = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
  49    /* C: */ [BLOCK_JOB_STATUS_CREATED]   = {0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1},
  50    /* R: */ [BLOCK_JOB_STATUS_RUNNING]   = {0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0},
  51    /* P: */ [BLOCK_JOB_STATUS_PAUSED]    = {0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
  52    /* Y: */ [BLOCK_JOB_STATUS_READY]     = {0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0},
  53    /* S: */ [BLOCK_JOB_STATUS_STANDBY]   = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
  54    /* W: */ [BLOCK_JOB_STATUS_WAITING]   = {0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0},
  55    /* D: */ [BLOCK_JOB_STATUS_PENDING]   = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
  56    /* X: */ [BLOCK_JOB_STATUS_ABORTING]  = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
  57    /* E: */ [BLOCK_JOB_STATUS_CONCLUDED] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
  58    /* N: */ [BLOCK_JOB_STATUS_NULL]      = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
  59};
  60
  61bool BlockJobVerbTable[BLOCK_JOB_VERB__MAX][BLOCK_JOB_STATUS__MAX] = {
  62                                          /* U, C, R, P, Y, S, W, D, X, E, N */
  63    [BLOCK_JOB_VERB_CANCEL]               = {0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0},
  64    [BLOCK_JOB_VERB_PAUSE]                = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
  65    [BLOCK_JOB_VERB_RESUME]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
  66    [BLOCK_JOB_VERB_SET_SPEED]            = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
  67    [BLOCK_JOB_VERB_COMPLETE]             = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
  68    [BLOCK_JOB_VERB_FINALIZE]             = {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
  69    [BLOCK_JOB_VERB_DISMISS]              = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
  70};
  71
  72static void block_job_state_transition(BlockJob *job, BlockJobStatus s1)
  73{
  74    BlockJobStatus s0 = job->status;
  75    assert(s1 >= 0 && s1 <= BLOCK_JOB_STATUS__MAX);
  76    trace_block_job_state_transition(job, job->ret, BlockJobSTT[s0][s1] ?
  77                                     "allowed" : "disallowed",
  78                                     BlockJobStatus_str(s0),
  79                                     BlockJobStatus_str(s1));
  80    assert(BlockJobSTT[s0][s1]);
  81    job->status = s1;
  82}
  83
  84static int block_job_apply_verb(BlockJob *job, BlockJobVerb bv, Error **errp)
  85{
  86    assert(bv >= 0 && bv <= BLOCK_JOB_VERB__MAX);
  87    trace_block_job_apply_verb(job, BlockJobStatus_str(job->status),
  88                               BlockJobVerb_str(bv),
  89                               BlockJobVerbTable[bv][job->status] ?
  90                               "allowed" : "prohibited");
  91    if (BlockJobVerbTable[bv][job->status]) {
  92        return 0;
  93    }
  94    error_setg(errp, "Job '%s' in state '%s' cannot accept command verb '%s'",
  95               job->id, BlockJobStatus_str(job->status), BlockJobVerb_str(bv));
  96    return -EPERM;
  97}
  98
  99static void block_job_lock(void)
 100{
 101    qemu_mutex_lock(&block_job_mutex);
 102}
 103
 104static void block_job_unlock(void)
 105{
 106    qemu_mutex_unlock(&block_job_mutex);
 107}
 108
 109static void __attribute__((__constructor__)) block_job_init(void)
 110{
 111    qemu_mutex_init(&block_job_mutex);
 112}
 113
 114static void block_job_event_cancelled(BlockJob *job);
 115static void block_job_event_completed(BlockJob *job, const char *msg);
 116static int block_job_event_pending(BlockJob *job);
 117static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job));
 118
 119/* Transactional group of block jobs */
 120struct BlockJobTxn {
 121
 122    /* Is this txn being cancelled? */
 123    bool aborting;
 124
 125    /* List of jobs */
 126    QLIST_HEAD(, BlockJob) jobs;
 127
 128    /* Reference count */
 129    int refcnt;
 130};
 131
 132static QLIST_HEAD(, BlockJob) block_jobs = QLIST_HEAD_INITIALIZER(block_jobs);
 133
 134/*
 135 * The block job API is composed of two categories of functions.
 136 *
 137 * The first includes functions used by the monitor.  The monitor is
 138 * peculiar in that it accesses the block job list with block_job_get, and
 139 * therefore needs consistency across block_job_get and the actual operation
 140 * (e.g. block_job_set_speed).  The consistency is achieved with
 141 * aio_context_acquire/release.  These functions are declared in blockjob.h.
 142 *
 143 * The second includes functions used by the block job drivers and sometimes
 144 * by the core block layer.  These do not care about locking, because the
 145 * whole coroutine runs under the AioContext lock, and are declared in
 146 * blockjob_int.h.
 147 */
 148
 149BlockJob *block_job_next(BlockJob *job)
 150{
 151    if (!job) {
 152        return QLIST_FIRST(&block_jobs);
 153    }
 154    return QLIST_NEXT(job, job_list);
 155}
 156
 157BlockJob *block_job_get(const char *id)
 158{
 159    BlockJob *job;
 160
 161    QLIST_FOREACH(job, &block_jobs, job_list) {
 162        if (job->id && !strcmp(id, job->id)) {
 163            return job;
 164        }
 165    }
 166
 167    return NULL;
 168}
 169
 170BlockJobTxn *block_job_txn_new(void)
 171{
 172    BlockJobTxn *txn = g_new0(BlockJobTxn, 1);
 173    QLIST_INIT(&txn->jobs);
 174    txn->refcnt = 1;
 175    return txn;
 176}
 177
 178static void block_job_txn_ref(BlockJobTxn *txn)
 179{
 180    txn->refcnt++;
 181}
 182
 183void block_job_txn_unref(BlockJobTxn *txn)
 184{
 185    if (txn && --txn->refcnt == 0) {
 186        g_free(txn);
 187    }
 188}
 189
 190void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
 191{
 192    if (!txn) {
 193        return;
 194    }
 195
 196    assert(!job->txn);
 197    job->txn = txn;
 198
 199    QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
 200    block_job_txn_ref(txn);
 201}
 202
 203static void block_job_txn_del_job(BlockJob *job)
 204{
 205    if (job->txn) {
 206        QLIST_REMOVE(job, txn_list);
 207        block_job_txn_unref(job->txn);
 208        job->txn = NULL;
 209    }
 210}
 211
 212static void block_job_pause(BlockJob *job)
 213{
 214    job->pause_count++;
 215}
 216
 217static void block_job_resume(BlockJob *job)
 218{
 219    assert(job->pause_count > 0);
 220    job->pause_count--;
 221    if (job->pause_count) {
 222        return;
 223    }
 224    block_job_enter(job);
 225}
 226
 227void block_job_ref(BlockJob *job)
 228{
 229    ++job->refcnt;
 230}
 231
 232static void block_job_attached_aio_context(AioContext *new_context,
 233                                           void *opaque);
 234static void block_job_detach_aio_context(void *opaque);
 235
 236void block_job_unref(BlockJob *job)
 237{
 238    if (--job->refcnt == 0) {
 239        assert(job->status == BLOCK_JOB_STATUS_NULL);
 240        assert(!job->txn);
 241        BlockDriverState *bs = blk_bs(job->blk);
 242        QLIST_REMOVE(job, job_list);
 243        bs->job = NULL;
 244        block_job_remove_all_bdrv(job);
 245        blk_remove_aio_context_notifier(job->blk,
 246                                        block_job_attached_aio_context,
 247                                        block_job_detach_aio_context, job);
 248        blk_unref(job->blk);
 249        error_free(job->blocker);
 250        g_free(job->id);
 251        assert(!timer_pending(&job->sleep_timer));
 252        g_free(job);
 253    }
 254}
 255
 256static void block_job_attached_aio_context(AioContext *new_context,
 257                                           void *opaque)
 258{
 259    BlockJob *job = opaque;
 260
 261    if (job->driver->attached_aio_context) {
 262        job->driver->attached_aio_context(job, new_context);
 263    }
 264
 265    block_job_resume(job);
 266}
 267
 268static void block_job_drain(BlockJob *job)
 269{
 270    /* If job is !job->busy this kicks it into the next pause point. */
 271    block_job_enter(job);
 272
 273    blk_drain(job->blk);
 274    if (job->driver->drain) {
 275        job->driver->drain(job);
 276    }
 277}
 278
 279static void block_job_detach_aio_context(void *opaque)
 280{
 281    BlockJob *job = opaque;
 282
 283    /* In case the job terminates during aio_poll()... */
 284    block_job_ref(job);
 285
 286    block_job_pause(job);
 287
 288    while (!job->paused && !job->completed) {
 289        block_job_drain(job);
 290    }
 291
 292    block_job_unref(job);
 293}
 294
 295static char *child_job_get_parent_desc(BdrvChild *c)
 296{
 297    BlockJob *job = c->opaque;
 298    return g_strdup_printf("%s job '%s'",
 299                           BlockJobType_str(job->driver->job_type),
 300                           job->id);
 301}
 302
 303static void child_job_drained_begin(BdrvChild *c)
 304{
 305    BlockJob *job = c->opaque;
 306    block_job_pause(job);
 307}
 308
 309static void child_job_drained_end(BdrvChild *c)
 310{
 311    BlockJob *job = c->opaque;
 312    block_job_resume(job);
 313}
 314
 315static const BdrvChildRole child_job = {
 316    .get_parent_desc    = child_job_get_parent_desc,
 317    .drained_begin      = child_job_drained_begin,
 318    .drained_end        = child_job_drained_end,
 319    .stay_at_node       = true,
 320};
 321
 322void block_job_remove_all_bdrv(BlockJob *job)
 323{
 324    GSList *l;
 325    for (l = job->nodes; l; l = l->next) {
 326        BdrvChild *c = l->data;
 327        bdrv_op_unblock_all(c->bs, job->blocker);
 328        bdrv_root_unref_child(c);
 329    }
 330    g_slist_free(job->nodes);
 331    job->nodes = NULL;
 332}
 333
 334int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
 335                       uint64_t perm, uint64_t shared_perm, Error **errp)
 336{
 337    BdrvChild *c;
 338
 339    c = bdrv_root_attach_child(bs, name, &child_job, perm, shared_perm,
 340                               job, errp);
 341    if (c == NULL) {
 342        return -EPERM;
 343    }
 344
 345    job->nodes = g_slist_prepend(job->nodes, c);
 346    bdrv_ref(bs);
 347    bdrv_op_block_all(bs, job->blocker);
 348
 349    return 0;
 350}
 351
 352bool block_job_is_internal(BlockJob *job)
 353{
 354    return (job->id == NULL);
 355}
 356
 357static bool block_job_started(BlockJob *job)
 358{
 359    return job->co;
 360}
 361
 362/**
 363 * All jobs must allow a pause point before entering their job proper. This
 364 * ensures that jobs can be paused prior to being started, then resumed later.
 365 */
 366static void coroutine_fn block_job_co_entry(void *opaque)
 367{
 368    BlockJob *job = opaque;
 369
 370    assert(job && job->driver && job->driver->start);
 371    block_job_pause_point(job);
 372    job->driver->start(job);
 373}
 374
 375static void block_job_sleep_timer_cb(void *opaque)
 376{
 377    BlockJob *job = opaque;
 378
 379    block_job_enter(job);
 380}
 381
 382void block_job_start(BlockJob *job)
 383{
 384    assert(job && !block_job_started(job) && job->paused &&
 385           job->driver && job->driver->start);
 386    job->co = qemu_coroutine_create(block_job_co_entry, job);
 387    job->pause_count--;
 388    job->busy = true;
 389    job->paused = false;
 390    block_job_state_transition(job, BLOCK_JOB_STATUS_RUNNING);
 391    bdrv_coroutine_enter(blk_bs(job->blk), job->co);
 392}
 393
 394static void block_job_decommission(BlockJob *job)
 395{
 396    assert(job);
 397    job->completed = true;
 398    job->busy = false;
 399    job->paused = false;
 400    job->deferred_to_main_loop = true;
 401    block_job_txn_del_job(job);
 402    block_job_state_transition(job, BLOCK_JOB_STATUS_NULL);
 403    block_job_unref(job);
 404}
 405
 406static void block_job_do_dismiss(BlockJob *job)
 407{
 408    block_job_decommission(job);
 409}
 410
 411static void block_job_conclude(BlockJob *job)
 412{
 413    block_job_state_transition(job, BLOCK_JOB_STATUS_CONCLUDED);
 414    if (job->auto_dismiss || !block_job_started(job)) {
 415        block_job_do_dismiss(job);
 416    }
 417}
 418
 419static void block_job_update_rc(BlockJob *job)
 420{
 421    if (!job->ret && block_job_is_cancelled(job)) {
 422        job->ret = -ECANCELED;
 423    }
 424    if (job->ret) {
 425        block_job_state_transition(job, BLOCK_JOB_STATUS_ABORTING);
 426    }
 427}
 428
 429static int block_job_prepare(BlockJob *job)
 430{
 431    if (job->ret == 0 && job->driver->prepare) {
 432        job->ret = job->driver->prepare(job);
 433    }
 434    return job->ret;
 435}
 436
 437static void block_job_commit(BlockJob *job)
 438{
 439    assert(!job->ret);
 440    if (job->driver->commit) {
 441        job->driver->commit(job);
 442    }
 443}
 444
 445static void block_job_abort(BlockJob *job)
 446{
 447    assert(job->ret);
 448    if (job->driver->abort) {
 449        job->driver->abort(job);
 450    }
 451}
 452
 453static void block_job_clean(BlockJob *job)
 454{
 455    if (job->driver->clean) {
 456        job->driver->clean(job);
 457    }
 458}
 459
 460static int block_job_finalize_single(BlockJob *job)
 461{
 462    assert(job->completed);
 463
 464    /* Ensure abort is called for late-transactional failures */
 465    block_job_update_rc(job);
 466
 467    if (!job->ret) {
 468        block_job_commit(job);
 469    } else {
 470        block_job_abort(job);
 471    }
 472    block_job_clean(job);
 473
 474    if (job->cb) {
 475        job->cb(job->opaque, job->ret);
 476    }
 477
 478    /* Emit events only if we actually started */
 479    if (block_job_started(job)) {
 480        if (block_job_is_cancelled(job)) {
 481            block_job_event_cancelled(job);
 482        } else {
 483            const char *msg = NULL;
 484            if (job->ret < 0) {
 485                msg = strerror(-job->ret);
 486            }
 487            block_job_event_completed(job, msg);
 488        }
 489    }
 490
 491    block_job_txn_del_job(job);
 492    block_job_conclude(job);
 493    return 0;
 494}
 495
 496static void block_job_cancel_async(BlockJob *job, bool force)
 497{
 498    if (job->iostatus != BLOCK_DEVICE_IO_STATUS_OK) {
 499        block_job_iostatus_reset(job);
 500    }
 501    if (job->user_paused) {
 502        /* Do not call block_job_enter here, the caller will handle it.  */
 503        job->user_paused = false;
 504        job->pause_count--;
 505    }
 506    job->cancelled = true;
 507    /* To prevent 'force == false' overriding a previous 'force == true' */
 508    job->force |= force;
 509}
 510
 511static int block_job_txn_apply(BlockJobTxn *txn, int fn(BlockJob *), bool lock)
 512{
 513    AioContext *ctx;
 514    BlockJob *job, *next;
 515    int rc = 0;
 516
 517    QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) {
 518        if (lock) {
 519            ctx = blk_get_aio_context(job->blk);
 520            aio_context_acquire(ctx);
 521        }
 522        rc = fn(job);
 523        if (lock) {
 524            aio_context_release(ctx);
 525        }
 526        if (rc) {
 527            break;
 528        }
 529    }
 530    return rc;
 531}
 532
 533static int block_job_finish_sync(BlockJob *job,
 534                                 void (*finish)(BlockJob *, Error **errp),
 535                                 Error **errp)
 536{
 537    Error *local_err = NULL;
 538    int ret;
 539
 540    assert(blk_bs(job->blk)->job == job);
 541
 542    block_job_ref(job);
 543
 544    if (finish) {
 545        finish(job, &local_err);
 546    }
 547    if (local_err) {
 548        error_propagate(errp, local_err);
 549        block_job_unref(job);
 550        return -EBUSY;
 551    }
 552    /* block_job_drain calls block_job_enter, and it should be enough to
 553     * induce progress until the job completes or moves to the main thread.
 554    */
 555    while (!job->deferred_to_main_loop && !job->completed) {
 556        block_job_drain(job);
 557    }
 558    while (!job->completed) {
 559        aio_poll(qemu_get_aio_context(), true);
 560    }
 561    ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
 562    block_job_unref(job);
 563    return ret;
 564}
 565
 566static void block_job_completed_txn_abort(BlockJob *job)
 567{
 568    AioContext *ctx;
 569    BlockJobTxn *txn = job->txn;
 570    BlockJob *other_job;
 571
 572    if (txn->aborting) {
 573        /*
 574         * We are cancelled by another job, which will handle everything.
 575         */
 576        return;
 577    }
 578    txn->aborting = true;
 579    block_job_txn_ref(txn);
 580
 581    /* We are the first failed job. Cancel other jobs. */
 582    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
 583        ctx = blk_get_aio_context(other_job->blk);
 584        aio_context_acquire(ctx);
 585    }
 586
 587    /* Other jobs are effectively cancelled by us, set the status for
 588     * them; this job, however, may or may not be cancelled, depending
 589     * on the caller, so leave it. */
 590    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
 591        if (other_job != job) {
 592            block_job_cancel_async(other_job, false);
 593        }
 594    }
 595    while (!QLIST_EMPTY(&txn->jobs)) {
 596        other_job = QLIST_FIRST(&txn->jobs);
 597        ctx = blk_get_aio_context(other_job->blk);
 598        if (!other_job->completed) {
 599            assert(other_job->cancelled);
 600            block_job_finish_sync(other_job, NULL, NULL);
 601        }
 602        block_job_finalize_single(other_job);
 603        aio_context_release(ctx);
 604    }
 605
 606    block_job_txn_unref(txn);
 607}
 608
 609static int block_job_needs_finalize(BlockJob *job)
 610{
 611    return !job->auto_finalize;
 612}
 613
 614static void block_job_do_finalize(BlockJob *job)
 615{
 616    int rc;
 617    assert(job && job->txn);
 618
 619    /* prepare the transaction to complete */
 620    rc = block_job_txn_apply(job->txn, block_job_prepare, true);
 621    if (rc) {
 622        block_job_completed_txn_abort(job);
 623    } else {
 624        block_job_txn_apply(job->txn, block_job_finalize_single, true);
 625    }
 626}
 627
 628static void block_job_completed_txn_success(BlockJob *job)
 629{
 630    BlockJobTxn *txn = job->txn;
 631    BlockJob *other_job;
 632
 633    block_job_state_transition(job, BLOCK_JOB_STATUS_WAITING);
 634
 635    /*
 636     * Successful completion, see if there are other running jobs in this
 637     * txn.
 638     */
 639    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
 640        if (!other_job->completed) {
 641            return;
 642        }
 643        assert(other_job->ret == 0);
 644    }
 645
 646    block_job_txn_apply(txn, block_job_event_pending, false);
 647
 648    /* If no jobs need manual finalization, automatically do so */
 649    if (block_job_txn_apply(txn, block_job_needs_finalize, false) == 0) {
 650        block_job_do_finalize(job);
 651    }
 652}
 653
 654/* Assumes the block_job_mutex is held */
 655static bool block_job_timer_pending(BlockJob *job)
 656{
 657    return timer_pending(&job->sleep_timer);
 658}
 659
 660void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
 661{
 662    Error *local_err = NULL;
 663    int64_t old_speed = job->speed;
 664
 665    if (!job->driver->set_speed) {
 666        error_setg(errp, QERR_UNSUPPORTED);
 667        return;
 668    }
 669    if (block_job_apply_verb(job, BLOCK_JOB_VERB_SET_SPEED, errp)) {
 670        return;
 671    }
 672    job->driver->set_speed(job, speed, &local_err);
 673    if (local_err) {
 674        error_propagate(errp, local_err);
 675        return;
 676    }
 677
 678    job->speed = speed;
 679    if (speed && speed <= old_speed) {
 680        return;
 681    }
 682
 683    /* kick only if a timer is pending */
 684    block_job_enter_cond(job, block_job_timer_pending);
 685}
 686
 687void block_job_complete(BlockJob *job, Error **errp)
 688{
 689    /* Should not be reachable via external interface for internal jobs */
 690    assert(job->id);
 691    if (block_job_apply_verb(job, BLOCK_JOB_VERB_COMPLETE, errp)) {
 692        return;
 693    }
 694    if (job->pause_count || job->cancelled || !job->driver->complete) {
 695        error_setg(errp, "The active block job '%s' cannot be completed",
 696                   job->id);
 697        return;
 698    }
 699
 700    job->driver->complete(job, errp);
 701}
 702
 703void block_job_finalize(BlockJob *job, Error **errp)
 704{
 705    assert(job && job->id && job->txn);
 706    if (block_job_apply_verb(job, BLOCK_JOB_VERB_FINALIZE, errp)) {
 707        return;
 708    }
 709    block_job_do_finalize(job);
 710}
 711
 712void block_job_dismiss(BlockJob **jobptr, Error **errp)
 713{
 714    BlockJob *job = *jobptr;
 715    /* similarly to _complete, this is QMP-interface only. */
 716    assert(job->id);
 717    if (block_job_apply_verb(job, BLOCK_JOB_VERB_DISMISS, errp)) {
 718        return;
 719    }
 720
 721    block_job_do_dismiss(job);
 722    *jobptr = NULL;
 723}
 724
 725void block_job_user_pause(BlockJob *job, Error **errp)
 726{
 727    if (block_job_apply_verb(job, BLOCK_JOB_VERB_PAUSE, errp)) {
 728        return;
 729    }
 730    if (job->user_paused) {
 731        error_setg(errp, "Job is already paused");
 732        return;
 733    }
 734    job->user_paused = true;
 735    block_job_pause(job);
 736}
 737
 738bool block_job_user_paused(BlockJob *job)
 739{
 740    return job->user_paused;
 741}
 742
 743void block_job_user_resume(BlockJob *job, Error **errp)
 744{
 745    assert(job);
 746    if (!job->user_paused || job->pause_count <= 0) {
 747        error_setg(errp, "Can't resume a job that was not paused");
 748        return;
 749    }
 750    if (block_job_apply_verb(job, BLOCK_JOB_VERB_RESUME, errp)) {
 751        return;
 752    }
 753    block_job_iostatus_reset(job);
 754    job->user_paused = false;
 755    block_job_resume(job);
 756}
 757
 758void block_job_cancel(BlockJob *job, bool force)
 759{
 760    if (job->status == BLOCK_JOB_STATUS_CONCLUDED) {
 761        block_job_do_dismiss(job);
 762        return;
 763    }
 764    block_job_cancel_async(job, force);
 765    if (!block_job_started(job)) {
 766        block_job_completed(job, -ECANCELED);
 767    } else if (job->deferred_to_main_loop) {
 768        block_job_completed_txn_abort(job);
 769    } else {
 770        block_job_enter(job);
 771    }
 772}
 773
 774void block_job_user_cancel(BlockJob *job, bool force, Error **errp)
 775{
 776    if (block_job_apply_verb(job, BLOCK_JOB_VERB_CANCEL, errp)) {
 777        return;
 778    }
 779    block_job_cancel(job, force);
 780}
 781
 782/* A wrapper around block_job_cancel() taking an Error ** parameter so it may be
 783 * used with block_job_finish_sync() without the need for (rather nasty)
 784 * function pointer casts there. */
 785static void block_job_cancel_err(BlockJob *job, Error **errp)
 786{
 787    block_job_cancel(job, false);
 788}
 789
 790int block_job_cancel_sync(BlockJob *job)
 791{
 792    return block_job_finish_sync(job, &block_job_cancel_err, NULL);
 793}
 794
 795void block_job_cancel_sync_all(void)
 796{
 797    BlockJob *job;
 798    AioContext *aio_context;
 799
 800    while ((job = QLIST_FIRST(&block_jobs))) {
 801        aio_context = blk_get_aio_context(job->blk);
 802        aio_context_acquire(aio_context);
 803        block_job_cancel_sync(job);
 804        aio_context_release(aio_context);
 805    }
 806}
 807
 808int block_job_complete_sync(BlockJob *job, Error **errp)
 809{
 810    return block_job_finish_sync(job, &block_job_complete, errp);
 811}
 812
 813BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
 814{
 815    BlockJobInfo *info;
 816
 817    if (block_job_is_internal(job)) {
 818        error_setg(errp, "Cannot query QEMU internal jobs");
 819        return NULL;
 820    }
 821    info = g_new0(BlockJobInfo, 1);
 822    info->type      = g_strdup(BlockJobType_str(job->driver->job_type));
 823    info->device    = g_strdup(job->id);
 824    info->len       = job->len;
 825    info->busy      = atomic_read(&job->busy);
 826    info->paused    = job->pause_count > 0;
 827    info->offset    = job->offset;
 828    info->speed     = job->speed;
 829    info->io_status = job->iostatus;
 830    info->ready     = job->ready;
 831    info->status    = job->status;
 832    info->auto_finalize = job->auto_finalize;
 833    info->auto_dismiss  = job->auto_dismiss;
 834    info->has_error = job->ret != 0;
 835    info->error     = job->ret ? g_strdup(strerror(-job->ret)) : NULL;
 836    return info;
 837}
 838
 839static void block_job_iostatus_set_err(BlockJob *job, int error)
 840{
 841    if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
 842        job->iostatus = error == ENOSPC ? BLOCK_DEVICE_IO_STATUS_NOSPACE :
 843                                          BLOCK_DEVICE_IO_STATUS_FAILED;
 844    }
 845}
 846
 847static void block_job_event_cancelled(BlockJob *job)
 848{
 849    if (block_job_is_internal(job)) {
 850        return;
 851    }
 852
 853    qapi_event_send_block_job_cancelled(job->driver->job_type,
 854                                        job->id,
 855                                        job->len,
 856                                        job->offset,
 857                                        job->speed,
 858                                        &error_abort);
 859}
 860
 861static void block_job_event_completed(BlockJob *job, const char *msg)
 862{
 863    if (block_job_is_internal(job)) {
 864        return;
 865    }
 866
 867    qapi_event_send_block_job_completed(job->driver->job_type,
 868                                        job->id,
 869                                        job->len,
 870                                        job->offset,
 871                                        job->speed,
 872                                        !!msg,
 873                                        msg,
 874                                        &error_abort);
 875}
 876
 877static int block_job_event_pending(BlockJob *job)
 878{
 879    block_job_state_transition(job, BLOCK_JOB_STATUS_PENDING);
 880    if (!job->auto_finalize && !block_job_is_internal(job)) {
 881        qapi_event_send_block_job_pending(job->driver->job_type,
 882                                          job->id,
 883                                          &error_abort);
 884    }
 885    return 0;
 886}
 887
 888/*
 889 * API for block job drivers and the block layer.  These functions are
 890 * declared in blockjob_int.h.
 891 */
 892
 893void *block_job_create(const char *job_id, const BlockJobDriver *driver,
 894                       BlockJobTxn *txn, BlockDriverState *bs, uint64_t perm,
 895                       uint64_t shared_perm, int64_t speed, int flags,
 896                       BlockCompletionFunc *cb, void *opaque, Error **errp)
 897{
 898    BlockBackend *blk;
 899    BlockJob *job;
 900    int ret;
 901
 902    if (bs->job) {
 903        error_setg(errp, QERR_DEVICE_IN_USE, bdrv_get_device_name(bs));
 904        return NULL;
 905    }
 906
 907    if (job_id == NULL && !(flags & BLOCK_JOB_INTERNAL)) {
 908        job_id = bdrv_get_device_name(bs);
 909        if (!*job_id) {
 910            error_setg(errp, "An explicit job ID is required for this node");
 911            return NULL;
 912        }
 913    }
 914
 915    if (job_id) {
 916        if (flags & BLOCK_JOB_INTERNAL) {
 917            error_setg(errp, "Cannot specify job ID for internal block job");
 918            return NULL;
 919        }
 920
 921        if (!id_wellformed(job_id)) {
 922            error_setg(errp, "Invalid job ID '%s'", job_id);
 923            return NULL;
 924        }
 925
 926        if (block_job_get(job_id)) {
 927            error_setg(errp, "Job ID '%s' already in use", job_id);
 928            return NULL;
 929        }
 930    }
 931
 932    blk = blk_new(perm, shared_perm);
 933    ret = blk_insert_bs(blk, bs, errp);
 934    if (ret < 0) {
 935        blk_unref(blk);
 936        return NULL;
 937    }
 938
 939    job = g_malloc0(driver->instance_size);
 940    job->driver        = driver;
 941    job->id            = g_strdup(job_id);
 942    job->blk           = blk;
 943    job->cb            = cb;
 944    job->opaque        = opaque;
 945    job->busy          = false;
 946    job->paused        = true;
 947    job->pause_count   = 1;
 948    job->refcnt        = 1;
 949    job->auto_finalize = !(flags & BLOCK_JOB_MANUAL_FINALIZE);
 950    job->auto_dismiss  = !(flags & BLOCK_JOB_MANUAL_DISMISS);
 951    block_job_state_transition(job, BLOCK_JOB_STATUS_CREATED);
 952    aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
 953                   QEMU_CLOCK_REALTIME, SCALE_NS,
 954                   block_job_sleep_timer_cb, job);
 955
 956    error_setg(&job->blocker, "block device is in use by block job: %s",
 957               BlockJobType_str(driver->job_type));
 958    block_job_add_bdrv(job, "main node", bs, 0, BLK_PERM_ALL, &error_abort);
 959    bs->job = job;
 960
 961    bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker);
 962
 963    QLIST_INSERT_HEAD(&block_jobs, job, job_list);
 964
 965    blk_add_aio_context_notifier(blk, block_job_attached_aio_context,
 966                                 block_job_detach_aio_context, job);
 967
 968    /* Only set speed when necessary to avoid NotSupported error */
 969    if (speed != 0) {
 970        Error *local_err = NULL;
 971
 972        block_job_set_speed(job, speed, &local_err);
 973        if (local_err) {
 974            block_job_early_fail(job);
 975            error_propagate(errp, local_err);
 976            return NULL;
 977        }
 978    }
 979
 980    /* Single jobs are modeled as single-job transactions for sake of
 981     * consolidating the job management logic */
 982    if (!txn) {
 983        txn = block_job_txn_new();
 984        block_job_txn_add_job(txn, job);
 985        block_job_txn_unref(txn);
 986    } else {
 987        block_job_txn_add_job(txn, job);
 988    }
 989
 990    return job;
 991}
 992
 993void block_job_pause_all(void)
 994{
 995    BlockJob *job = NULL;
 996    while ((job = block_job_next(job))) {
 997        AioContext *aio_context = blk_get_aio_context(job->blk);
 998
 999        aio_context_acquire(aio_context);
1000        block_job_ref(job);
1001        block_job_pause(job);
1002        aio_context_release(aio_context);
1003    }
1004}
1005
1006void block_job_early_fail(BlockJob *job)
1007{
1008    assert(job->status == BLOCK_JOB_STATUS_CREATED);
1009    block_job_decommission(job);
1010}
1011
1012void block_job_completed(BlockJob *job, int ret)
1013{
1014    assert(job && job->txn && !job->completed);
1015    assert(blk_bs(job->blk)->job == job);
1016    job->completed = true;
1017    job->ret = ret;
1018    block_job_update_rc(job);
1019    trace_block_job_completed(job, ret, job->ret);
1020    if (job->ret) {
1021        block_job_completed_txn_abort(job);
1022    } else {
1023        block_job_completed_txn_success(job);
1024    }
1025}
1026
1027static bool block_job_should_pause(BlockJob *job)
1028{
1029    return job->pause_count > 0;
1030}
1031
1032/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
1033 * Reentering the job coroutine with block_job_enter() before the timer has
1034 * expired is allowed and cancels the timer.
1035 *
1036 * If @ns is (uint64_t) -1, no timer is scheduled and block_job_enter() must be
1037 * called explicitly. */
1038static void block_job_do_yield(BlockJob *job, uint64_t ns)
1039{
1040    block_job_lock();
1041    if (ns != -1) {
1042        timer_mod(&job->sleep_timer, ns);
1043    }
1044    job->busy = false;
1045    block_job_unlock();
1046    qemu_coroutine_yield();
1047
1048    /* Set by block_job_enter before re-entering the coroutine.  */
1049    assert(job->busy);
1050}
1051
1052void coroutine_fn block_job_pause_point(BlockJob *job)
1053{
1054    assert(job && block_job_started(job));
1055
1056    if (!block_job_should_pause(job)) {
1057        return;
1058    }
1059    if (block_job_is_cancelled(job)) {
1060        return;
1061    }
1062
1063    if (job->driver->pause) {
1064        job->driver->pause(job);
1065    }
1066
1067    if (block_job_should_pause(job) && !block_job_is_cancelled(job)) {
1068        BlockJobStatus status = job->status;
1069        block_job_state_transition(job, status == BLOCK_JOB_STATUS_READY ? \
1070                                   BLOCK_JOB_STATUS_STANDBY :           \
1071                                   BLOCK_JOB_STATUS_PAUSED);
1072        job->paused = true;
1073        block_job_do_yield(job, -1);
1074        job->paused = false;
1075        block_job_state_transition(job, status);
1076    }
1077
1078    if (job->driver->resume) {
1079        job->driver->resume(job);
1080    }
1081}
1082
1083void block_job_resume_all(void)
1084{
1085    BlockJob *job, *next;
1086
1087    QLIST_FOREACH_SAFE(job, &block_jobs, job_list, next) {
1088        AioContext *aio_context = blk_get_aio_context(job->blk);
1089
1090        aio_context_acquire(aio_context);
1091        block_job_resume(job);
1092        block_job_unref(job);
1093        aio_context_release(aio_context);
1094    }
1095}
1096
1097/*
1098 * Conditionally enter a block_job pending a call to fn() while
1099 * under the block_job_lock critical section.
1100 */
1101static void block_job_enter_cond(BlockJob *job, bool(*fn)(BlockJob *job))
1102{
1103    if (!block_job_started(job)) {
1104        return;
1105    }
1106    if (job->deferred_to_main_loop) {
1107        return;
1108    }
1109
1110    block_job_lock();
1111    if (job->busy) {
1112        block_job_unlock();
1113        return;
1114    }
1115
1116    if (fn && !fn(job)) {
1117        block_job_unlock();
1118        return;
1119    }
1120
1121    assert(!job->deferred_to_main_loop);
1122    timer_del(&job->sleep_timer);
1123    job->busy = true;
1124    block_job_unlock();
1125    aio_co_wake(job->co);
1126}
1127
1128void block_job_enter(BlockJob *job)
1129{
1130    block_job_enter_cond(job, NULL);
1131}
1132
1133bool block_job_is_cancelled(BlockJob *job)
1134{
1135    return job->cancelled;
1136}
1137
1138void block_job_sleep_ns(BlockJob *job, int64_t ns)
1139{
1140    assert(job->busy);
1141
1142    /* Check cancellation *before* setting busy = false, too!  */
1143    if (block_job_is_cancelled(job)) {
1144        return;
1145    }
1146
1147    if (!block_job_should_pause(job)) {
1148        block_job_do_yield(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
1149    }
1150
1151    block_job_pause_point(job);
1152}
1153
1154void block_job_yield(BlockJob *job)
1155{
1156    assert(job->busy);
1157
1158    /* Check cancellation *before* setting busy = false, too!  */
1159    if (block_job_is_cancelled(job)) {
1160        return;
1161    }
1162
1163    if (!block_job_should_pause(job)) {
1164        block_job_do_yield(job, -1);
1165    }
1166
1167    block_job_pause_point(job);
1168}
1169
1170void block_job_iostatus_reset(BlockJob *job)
1171{
1172    if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
1173        return;
1174    }
1175    assert(job->user_paused && job->pause_count > 0);
1176    job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
1177}
1178
1179void block_job_event_ready(BlockJob *job)
1180{
1181    block_job_state_transition(job, BLOCK_JOB_STATUS_READY);
1182    job->ready = true;
1183
1184    if (block_job_is_internal(job)) {
1185        return;
1186    }
1187
1188    qapi_event_send_block_job_ready(job->driver->job_type,
1189                                    job->id,
1190                                    job->len,
1191                                    job->offset,
1192                                    job->speed, &error_abort);
1193}
1194
1195BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
1196                                        int is_read, int error)
1197{
1198    BlockErrorAction action;
1199
1200    switch (on_err) {
1201    case BLOCKDEV_ON_ERROR_ENOSPC:
1202    case BLOCKDEV_ON_ERROR_AUTO:
1203        action = (error == ENOSPC) ?
1204                 BLOCK_ERROR_ACTION_STOP : BLOCK_ERROR_ACTION_REPORT;
1205        break;
1206    case BLOCKDEV_ON_ERROR_STOP:
1207        action = BLOCK_ERROR_ACTION_STOP;
1208        break;
1209    case BLOCKDEV_ON_ERROR_REPORT:
1210        action = BLOCK_ERROR_ACTION_REPORT;
1211        break;
1212    case BLOCKDEV_ON_ERROR_IGNORE:
1213        action = BLOCK_ERROR_ACTION_IGNORE;
1214        break;
1215    default:
1216        abort();
1217    }
1218    if (!block_job_is_internal(job)) {
1219        qapi_event_send_block_job_error(job->id,
1220                                        is_read ? IO_OPERATION_TYPE_READ :
1221                                        IO_OPERATION_TYPE_WRITE,
1222                                        action, &error_abort);
1223    }
1224    if (action == BLOCK_ERROR_ACTION_STOP) {
1225        block_job_pause(job);
1226        /* make the pause user visible, which will be resumed from QMP. */
1227        job->user_paused = true;
1228        block_job_iostatus_set_err(job, error);
1229    }
1230    return action;
1231}
1232
1233typedef struct {
1234    BlockJob *job;
1235    AioContext *aio_context;
1236    BlockJobDeferToMainLoopFn *fn;
1237    void *opaque;
1238} BlockJobDeferToMainLoopData;
1239
1240static void block_job_defer_to_main_loop_bh(void *opaque)
1241{
1242    BlockJobDeferToMainLoopData *data = opaque;
1243    AioContext *aio_context;
1244
1245    /* Prevent race with block_job_defer_to_main_loop() */
1246    aio_context_acquire(data->aio_context);
1247
1248    /* Fetch BDS AioContext again, in case it has changed */
1249    aio_context = blk_get_aio_context(data->job->blk);
1250    if (aio_context != data->aio_context) {
1251        aio_context_acquire(aio_context);
1252    }
1253
1254    data->fn(data->job, data->opaque);
1255
1256    if (aio_context != data->aio_context) {
1257        aio_context_release(aio_context);
1258    }
1259
1260    aio_context_release(data->aio_context);
1261
1262    g_free(data);
1263}
1264
1265void block_job_defer_to_main_loop(BlockJob *job,
1266                                  BlockJobDeferToMainLoopFn *fn,
1267                                  void *opaque)
1268{
1269    BlockJobDeferToMainLoopData *data = g_malloc(sizeof(*data));
1270    data->job = job;
1271    data->aio_context = blk_get_aio_context(job->blk);
1272    data->fn = fn;
1273    data->opaque = opaque;
1274    job->deferred_to_main_loop = true;
1275
1276    aio_bh_schedule_oneshot(qemu_get_aio_context(),
1277                            block_job_defer_to_main_loop_bh, data);
1278}
1279