qemu/job.c
<<
>>
Prefs
   1/*
   2 * Background jobs (long-running operations)
   3 *
   4 * Copyright (c) 2011 IBM Corp.
   5 * Copyright (c) 2012, 2018 Red Hat, Inc.
   6 *
   7 * Permission is hereby granted, free of charge, to any person obtaining a copy
   8 * of this software and associated documentation files (the "Software"), to deal
   9 * in the Software without restriction, including without limitation the rights
  10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 * copies of the Software, and to permit persons to whom the Software is
  12 * furnished to do so, subject to the following conditions:
  13 *
  14 * The above copyright notice and this permission notice shall be included in
  15 * all copies or substantial portions of the Software.
  16 *
  17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23 * THE SOFTWARE.
  24 */
  25
  26#include "qemu/osdep.h"
  27#include "qapi/error.h"
  28#include "qemu/job.h"
  29#include "qemu/id.h"
  30#include "qemu/main-loop.h"
  31#include "block/aio-wait.h"
  32#include "trace/trace-root.h"
  33#include "qapi/qapi-events-job.h"
  34
  35/*
  36 * The job API is composed of two categories of functions.
  37 *
  38 * The first includes functions used by the monitor.  The monitor is
  39 * peculiar in that it accesses the job list with job_get, and
  40 * therefore needs consistency across job_get and the actual operation
  41 * (e.g. job_user_cancel). To achieve this consistency, the caller
  42 * calls job_lock/job_unlock itself around the whole operation.
  43 *
  44 *
  45 * The second includes functions used by the job drivers and sometimes
  46 * by the core block layer. These delegate the locking to the callee instead.
  47 */
  48
  49/*
  50 * job_mutex protects the jobs list, but also makes the
  51 * struct job fields thread-safe.
  52 */
  53QemuMutex job_mutex;
  54
  55/* Protected by job_mutex */
  56static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
  57
  58/* Job State Transition Table */
  59bool JobSTT[JOB_STATUS__MAX][JOB_STATUS__MAX] = {
  60                                    /* U, C, R, P, Y, S, W, D, X, E, N */
  61    /* U: */ [JOB_STATUS_UNDEFINED] = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
  62    /* C: */ [JOB_STATUS_CREATED]   = {0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1},
  63    /* R: */ [JOB_STATUS_RUNNING]   = {0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0},
  64    /* P: */ [JOB_STATUS_PAUSED]    = {0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
  65    /* Y: */ [JOB_STATUS_READY]     = {0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0},
  66    /* S: */ [JOB_STATUS_STANDBY]   = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
  67    /* W: */ [JOB_STATUS_WAITING]   = {0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0},
  68    /* D: */ [JOB_STATUS_PENDING]   = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
  69    /* X: */ [JOB_STATUS_ABORTING]  = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
  70    /* E: */ [JOB_STATUS_CONCLUDED] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
  71    /* N: */ [JOB_STATUS_NULL]      = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
  72};
  73
  74bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
  75                                    /* U, C, R, P, Y, S, W, D, X, E, N */
  76    [JOB_VERB_CANCEL]               = {0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0},
  77    [JOB_VERB_PAUSE]                = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
  78    [JOB_VERB_RESUME]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
  79    [JOB_VERB_SET_SPEED]            = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
  80    [JOB_VERB_COMPLETE]             = {0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0},
  81    [JOB_VERB_FINALIZE]             = {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
  82    [JOB_VERB_DISMISS]              = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
  83    [JOB_VERB_CHANGE]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
  84};
  85
  86/* Transactional group of jobs */
  87struct JobTxn {
  88
  89    /* Is this txn being cancelled? */
  90    bool aborting;
  91
  92    /* List of jobs */
  93    QLIST_HEAD(, Job) jobs;
  94
  95    /* Reference count */
  96    int refcnt;
  97};
  98
  99void job_lock(void)
 100{
 101    qemu_mutex_lock(&job_mutex);
 102}
 103
 104void job_unlock(void)
 105{
 106    qemu_mutex_unlock(&job_mutex);
 107}
 108
 109static void __attribute__((__constructor__)) job_init(void)
 110{
 111    qemu_mutex_init(&job_mutex);
 112}
 113
 114JobTxn *job_txn_new(void)
 115{
 116    JobTxn *txn = g_new0(JobTxn, 1);
 117    QLIST_INIT(&txn->jobs);
 118    txn->refcnt = 1;
 119    return txn;
 120}
 121
 122/* Called with job_mutex held. */
 123static void job_txn_ref_locked(JobTxn *txn)
 124{
 125    txn->refcnt++;
 126}
 127
 128void job_txn_unref_locked(JobTxn *txn)
 129{
 130    if (txn && --txn->refcnt == 0) {
 131        g_free(txn);
 132    }
 133}
 134
 135void job_txn_unref(JobTxn *txn)
 136{
 137    JOB_LOCK_GUARD();
 138    job_txn_unref_locked(txn);
 139}
 140
 141/**
 142 * @txn: The transaction (may be NULL)
 143 * @job: Job to add to the transaction
 144 *
 145 * Add @job to the transaction.  The @job must not already be in a transaction.
 146 * The caller must call either job_txn_unref() or job_completed() to release
 147 * the reference that is automatically grabbed here.
 148 *
 149 * If @txn is NULL, the function does nothing.
 150 *
 151 * Called with job_mutex held.
 152 */
 153static void job_txn_add_job_locked(JobTxn *txn, Job *job)
 154{
 155    if (!txn) {
 156        return;
 157    }
 158
 159    assert(!job->txn);
 160    job->txn = txn;
 161
 162    QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
 163    job_txn_ref_locked(txn);
 164}
 165
 166/* Called with job_mutex held. */
 167static void job_txn_del_job_locked(Job *job)
 168{
 169    if (job->txn) {
 170        QLIST_REMOVE(job, txn_list);
 171        job_txn_unref_locked(job->txn);
 172        job->txn = NULL;
 173    }
 174}
 175
 176/* Called with job_mutex held, but releases it temporarily. */
 177static int job_txn_apply_locked(Job *job, int fn(Job *))
 178{
 179    Job *other_job, *next;
 180    JobTxn *txn = job->txn;
 181    int rc = 0;
 182
 183    /*
 184     * Similar to job_completed_txn_abort, we take each job's lock before
 185     * applying fn, but since we assume that outer_ctx is held by the caller,
 186     * we need to release it here to avoid holding the lock twice - which would
 187     * break AIO_WAIT_WHILE from within fn.
 188     */
 189    job_ref_locked(job);
 190
 191    QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
 192        rc = fn(other_job);
 193        if (rc) {
 194            break;
 195        }
 196    }
 197
 198    job_unref_locked(job);
 199    return rc;
 200}
 201
 202bool job_is_internal(Job *job)
 203{
 204    return (job->id == NULL);
 205}
 206
 207/* Called with job_mutex held. */
 208static void job_state_transition_locked(Job *job, JobStatus s1)
 209{
 210    JobStatus s0 = job->status;
 211    assert(s1 >= 0 && s1 < JOB_STATUS__MAX);
 212    trace_job_state_transition(job, job->ret,
 213                               JobSTT[s0][s1] ? "allowed" : "disallowed",
 214                               JobStatus_str(s0), JobStatus_str(s1));
 215    assert(JobSTT[s0][s1]);
 216    job->status = s1;
 217
 218    if (!job_is_internal(job) && s1 != s0) {
 219        qapi_event_send_job_status_change(job->id, job->status);
 220    }
 221}
 222
 223int job_apply_verb_locked(Job *job, JobVerb verb, Error **errp)
 224{
 225    JobStatus s0 = job->status;
 226    assert(verb >= 0 && verb < JOB_VERB__MAX);
 227    trace_job_apply_verb(job, JobStatus_str(s0), JobVerb_str(verb),
 228                         JobVerbTable[verb][s0] ? "allowed" : "prohibited");
 229    if (JobVerbTable[verb][s0]) {
 230        return 0;
 231    }
 232    error_setg(errp, "Job '%s' in state '%s' cannot accept command verb '%s'",
 233               job->id, JobStatus_str(s0), JobVerb_str(verb));
 234    return -EPERM;
 235}
 236
 237JobType job_type(const Job *job)
 238{
 239    return job->driver->job_type;
 240}
 241
 242const char *job_type_str(const Job *job)
 243{
 244    return JobType_str(job_type(job));
 245}
 246
 247bool job_is_cancelled_locked(Job *job)
 248{
 249    /* force_cancel may be true only if cancelled is true, too */
 250    assert(job->cancelled || !job->force_cancel);
 251    return job->force_cancel;
 252}
 253
 254bool job_is_paused(Job *job)
 255{
 256    JOB_LOCK_GUARD();
 257    return job->paused;
 258}
 259
 260bool job_is_cancelled(Job *job)
 261{
 262    JOB_LOCK_GUARD();
 263    return job_is_cancelled_locked(job);
 264}
 265
 266/* Called with job_mutex held. */
 267static bool job_cancel_requested_locked(Job *job)
 268{
 269    return job->cancelled;
 270}
 271
 272bool job_cancel_requested(Job *job)
 273{
 274    JOB_LOCK_GUARD();
 275    return job_cancel_requested_locked(job);
 276}
 277
 278bool job_is_ready_locked(Job *job)
 279{
 280    switch (job->status) {
 281    case JOB_STATUS_UNDEFINED:
 282    case JOB_STATUS_CREATED:
 283    case JOB_STATUS_RUNNING:
 284    case JOB_STATUS_PAUSED:
 285    case JOB_STATUS_WAITING:
 286    case JOB_STATUS_PENDING:
 287    case JOB_STATUS_ABORTING:
 288    case JOB_STATUS_CONCLUDED:
 289    case JOB_STATUS_NULL:
 290        return false;
 291    case JOB_STATUS_READY:
 292    case JOB_STATUS_STANDBY:
 293        return true;
 294    default:
 295        g_assert_not_reached();
 296    }
 297    return false;
 298}
 299
 300bool job_is_ready(Job *job)
 301{
 302    JOB_LOCK_GUARD();
 303    return job_is_ready_locked(job);
 304}
 305
 306bool job_is_completed_locked(Job *job)
 307{
 308    switch (job->status) {
 309    case JOB_STATUS_UNDEFINED:
 310    case JOB_STATUS_CREATED:
 311    case JOB_STATUS_RUNNING:
 312    case JOB_STATUS_PAUSED:
 313    case JOB_STATUS_READY:
 314    case JOB_STATUS_STANDBY:
 315        return false;
 316    case JOB_STATUS_WAITING:
 317    case JOB_STATUS_PENDING:
 318    case JOB_STATUS_ABORTING:
 319    case JOB_STATUS_CONCLUDED:
 320    case JOB_STATUS_NULL:
 321        return true;
 322    default:
 323        g_assert_not_reached();
 324    }
 325    return false;
 326}
 327
 328static bool job_is_completed(Job *job)
 329{
 330    JOB_LOCK_GUARD();
 331    return job_is_completed_locked(job);
 332}
 333
 334static bool job_started_locked(Job *job)
 335{
 336    return job->co;
 337}
 338
 339/* Called with job_mutex held. */
 340static bool job_should_pause_locked(Job *job)
 341{
 342    return job->pause_count > 0;
 343}
 344
 345Job *job_next_locked(Job *job)
 346{
 347    if (!job) {
 348        return QLIST_FIRST(&jobs);
 349    }
 350    return QLIST_NEXT(job, job_list);
 351}
 352
 353Job *job_next(Job *job)
 354{
 355    JOB_LOCK_GUARD();
 356    return job_next_locked(job);
 357}
 358
 359Job *job_get_locked(const char *id)
 360{
 361    Job *job;
 362
 363    QLIST_FOREACH(job, &jobs, job_list) {
 364        if (job->id && !strcmp(id, job->id)) {
 365            return job;
 366        }
 367    }
 368
 369    return NULL;
 370}
 371
 372void job_set_aio_context(Job *job, AioContext *ctx)
 373{
 374    /* protect against read in job_finish_sync_locked and job_start */
 375    GLOBAL_STATE_CODE();
 376    /* protect against read in job_do_yield_locked */
 377    JOB_LOCK_GUARD();
 378    /* ensure the job is quiescent while the AioContext is changed */
 379    assert(job->paused || job_is_completed_locked(job));
 380    job->aio_context = ctx;
 381}
 382
 383/* Called with job_mutex *not* held. */
 384static void job_sleep_timer_cb(void *opaque)
 385{
 386    Job *job = opaque;
 387
 388    job_enter(job);
 389}
 390
 391void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
 392                 AioContext *ctx, int flags, BlockCompletionFunc *cb,
 393                 void *opaque, Error **errp)
 394{
 395    Job *job;
 396
 397    JOB_LOCK_GUARD();
 398
 399    if (job_id) {
 400        if (flags & JOB_INTERNAL) {
 401            error_setg(errp, "Cannot specify job ID for internal job");
 402            return NULL;
 403        }
 404        if (!id_wellformed(job_id)) {
 405            error_setg(errp, "Invalid job ID '%s'", job_id);
 406            return NULL;
 407        }
 408        if (job_get_locked(job_id)) {
 409            error_setg(errp, "Job ID '%s' already in use", job_id);
 410            return NULL;
 411        }
 412    } else if (!(flags & JOB_INTERNAL)) {
 413        error_setg(errp, "An explicit job ID is required");
 414        return NULL;
 415    }
 416
 417    job = g_malloc0(driver->instance_size);
 418    job->driver        = driver;
 419    job->id            = g_strdup(job_id);
 420    job->refcnt        = 1;
 421    job->aio_context   = ctx;
 422    job->busy          = false;
 423    job->paused        = true;
 424    job->pause_count   = 1;
 425    job->auto_finalize = !(flags & JOB_MANUAL_FINALIZE);
 426    job->auto_dismiss  = !(flags & JOB_MANUAL_DISMISS);
 427    job->cb            = cb;
 428    job->opaque        = opaque;
 429
 430    progress_init(&job->progress);
 431
 432    notifier_list_init(&job->on_finalize_cancelled);
 433    notifier_list_init(&job->on_finalize_completed);
 434    notifier_list_init(&job->on_pending);
 435    notifier_list_init(&job->on_ready);
 436    notifier_list_init(&job->on_idle);
 437
 438    job_state_transition_locked(job, JOB_STATUS_CREATED);
 439    aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
 440                   QEMU_CLOCK_REALTIME, SCALE_NS,
 441                   job_sleep_timer_cb, job);
 442
 443    QLIST_INSERT_HEAD(&jobs, job, job_list);
 444
 445    /* Single jobs are modeled as single-job transactions for sake of
 446     * consolidating the job management logic */
 447    if (!txn) {
 448        txn = job_txn_new();
 449        job_txn_add_job_locked(txn, job);
 450        job_txn_unref_locked(txn);
 451    } else {
 452        job_txn_add_job_locked(txn, job);
 453    }
 454
 455    return job;
 456}
 457
 458void job_ref_locked(Job *job)
 459{
 460    ++job->refcnt;
 461}
 462
 463void job_unref_locked(Job *job)
 464{
 465    GLOBAL_STATE_CODE();
 466
 467    if (--job->refcnt == 0) {
 468        assert(job->status == JOB_STATUS_NULL);
 469        assert(!timer_pending(&job->sleep_timer));
 470        assert(!job->txn);
 471
 472        if (job->driver->free) {
 473            job_unlock();
 474            job->driver->free(job);
 475            job_lock();
 476        }
 477
 478        QLIST_REMOVE(job, job_list);
 479
 480        progress_destroy(&job->progress);
 481        error_free(job->err);
 482        g_free(job->id);
 483        g_free(job);
 484    }
 485}
 486
 487void job_progress_update(Job *job, uint64_t done)
 488{
 489    progress_work_done(&job->progress, done);
 490}
 491
 492void job_progress_set_remaining(Job *job, uint64_t remaining)
 493{
 494    progress_set_remaining(&job->progress, remaining);
 495}
 496
 497void job_progress_increase_remaining(Job *job, uint64_t delta)
 498{
 499    progress_increase_remaining(&job->progress, delta);
 500}
 501
 502/**
 503 * To be called when a cancelled job is finalised.
 504 * Called with job_mutex held.
 505 */
 506static void job_event_cancelled_locked(Job *job)
 507{
 508    notifier_list_notify(&job->on_finalize_cancelled, job);
 509}
 510
 511/**
 512 * To be called when a successfully completed job is finalised.
 513 * Called with job_mutex held.
 514 */
 515static void job_event_completed_locked(Job *job)
 516{
 517    notifier_list_notify(&job->on_finalize_completed, job);
 518}
 519
 520/* Called with job_mutex held. */
 521static void job_event_pending_locked(Job *job)
 522{
 523    notifier_list_notify(&job->on_pending, job);
 524}
 525
 526/* Called with job_mutex held. */
 527static void job_event_ready_locked(Job *job)
 528{
 529    notifier_list_notify(&job->on_ready, job);
 530}
 531
 532/* Called with job_mutex held. */
 533static void job_event_idle_locked(Job *job)
 534{
 535    notifier_list_notify(&job->on_idle, job);
 536}
 537
 538void job_enter_cond_locked(Job *job, bool(*fn)(Job *job))
 539{
 540    if (!job_started_locked(job)) {
 541        return;
 542    }
 543    if (job->deferred_to_main_loop) {
 544        return;
 545    }
 546
 547    if (job->busy) {
 548        return;
 549    }
 550
 551    if (fn && !fn(job)) {
 552        return;
 553    }
 554
 555    assert(!job->deferred_to_main_loop);
 556    timer_del(&job->sleep_timer);
 557    job->busy = true;
 558    job_unlock();
 559    aio_co_wake(job->co);
 560    job_lock();
 561}
 562
 563void job_enter(Job *job)
 564{
 565    JOB_LOCK_GUARD();
 566    job_enter_cond_locked(job, NULL);
 567}
 568
 569/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
 570 * Reentering the job coroutine with job_enter() before the timer has expired
 571 * is allowed and cancels the timer.
 572 *
 573 * If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
 574 * called explicitly.
 575 *
 576 * Called with job_mutex held, but releases it temporarily.
 577 */
 578static void coroutine_fn job_do_yield_locked(Job *job, uint64_t ns)
 579{
 580    AioContext *next_aio_context;
 581
 582    if (ns != -1) {
 583        timer_mod(&job->sleep_timer, ns);
 584    }
 585    job->busy = false;
 586    job_event_idle_locked(job);
 587    job_unlock();
 588    qemu_coroutine_yield();
 589    job_lock();
 590
 591    next_aio_context = job->aio_context;
 592    /*
 593     * Coroutine has resumed, but in the meanwhile the job AioContext
 594     * might have changed via bdrv_try_change_aio_context(), so we need to move
 595     * the coroutine too in the new aiocontext.
 596     */
 597    while (qemu_get_current_aio_context() != next_aio_context) {
 598        job_unlock();
 599        aio_co_reschedule_self(next_aio_context);
 600        job_lock();
 601        next_aio_context = job->aio_context;
 602    }
 603
 604    /* Set by job_enter_cond_locked() before re-entering the coroutine.  */
 605    assert(job->busy);
 606}
 607
 608/* Called with job_mutex held, but releases it temporarily. */
 609static void coroutine_fn job_pause_point_locked(Job *job)
 610{
 611    assert(job && job_started_locked(job));
 612
 613    if (!job_should_pause_locked(job)) {
 614        return;
 615    }
 616    if (job_is_cancelled_locked(job)) {
 617        return;
 618    }
 619
 620    if (job->driver->pause) {
 621        job_unlock();
 622        job->driver->pause(job);
 623        job_lock();
 624    }
 625
 626    if (job_should_pause_locked(job) && !job_is_cancelled_locked(job)) {
 627        JobStatus status = job->status;
 628        job_state_transition_locked(job, status == JOB_STATUS_READY
 629                                    ? JOB_STATUS_STANDBY
 630                                    : JOB_STATUS_PAUSED);
 631        job->paused = true;
 632        job_do_yield_locked(job, -1);
 633        job->paused = false;
 634        job_state_transition_locked(job, status);
 635    }
 636
 637    if (job->driver->resume) {
 638        job_unlock();
 639        job->driver->resume(job);
 640        job_lock();
 641    }
 642}
 643
 644void coroutine_fn job_pause_point(Job *job)
 645{
 646    JOB_LOCK_GUARD();
 647    job_pause_point_locked(job);
 648}
 649
 650void coroutine_fn job_yield(Job *job)
 651{
 652    JOB_LOCK_GUARD();
 653    assert(job->busy);
 654
 655    /* Check cancellation *before* setting busy = false, too!  */
 656    if (job_is_cancelled_locked(job)) {
 657        return;
 658    }
 659
 660    if (!job_should_pause_locked(job)) {
 661        job_do_yield_locked(job, -1);
 662    }
 663
 664    job_pause_point_locked(job);
 665}
 666
 667void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
 668{
 669    JOB_LOCK_GUARD();
 670    assert(job->busy);
 671
 672    /* Check cancellation *before* setting busy = false, too!  */
 673    if (job_is_cancelled_locked(job)) {
 674        return;
 675    }
 676
 677    if (!job_should_pause_locked(job)) {
 678        job_do_yield_locked(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
 679    }
 680
 681    job_pause_point_locked(job);
 682}
 683
 684/* Assumes the job_mutex is held */
 685static bool job_timer_not_pending_locked(Job *job)
 686{
 687    return !timer_pending(&job->sleep_timer);
 688}
 689
 690void job_pause_locked(Job *job)
 691{
 692    job->pause_count++;
 693    if (!job->paused) {
 694        job_enter_cond_locked(job, NULL);
 695    }
 696}
 697
 698void job_pause(Job *job)
 699{
 700    JOB_LOCK_GUARD();
 701    job_pause_locked(job);
 702}
 703
 704void job_resume_locked(Job *job)
 705{
 706    assert(job->pause_count > 0);
 707    job->pause_count--;
 708    if (job->pause_count) {
 709        return;
 710    }
 711
 712    /* kick only if no timer is pending */
 713    job_enter_cond_locked(job, job_timer_not_pending_locked);
 714}
 715
 716void job_resume(Job *job)
 717{
 718    JOB_LOCK_GUARD();
 719    job_resume_locked(job);
 720}
 721
 722void job_user_pause_locked(Job *job, Error **errp)
 723{
 724    if (job_apply_verb_locked(job, JOB_VERB_PAUSE, errp)) {
 725        return;
 726    }
 727    if (job->user_paused) {
 728        error_setg(errp, "Job is already paused");
 729        return;
 730    }
 731    job->user_paused = true;
 732    job_pause_locked(job);
 733}
 734
 735bool job_user_paused_locked(Job *job)
 736{
 737    return job->user_paused;
 738}
 739
 740void job_user_resume_locked(Job *job, Error **errp)
 741{
 742    assert(job);
 743    GLOBAL_STATE_CODE();
 744    if (!job->user_paused || job->pause_count <= 0) {
 745        error_setg(errp, "Can't resume a job that was not paused");
 746        return;
 747    }
 748    if (job_apply_verb_locked(job, JOB_VERB_RESUME, errp)) {
 749        return;
 750    }
 751    if (job->driver->user_resume) {
 752        job_unlock();
 753        job->driver->user_resume(job);
 754        job_lock();
 755    }
 756    job->user_paused = false;
 757    job_resume_locked(job);
 758}
 759
 760/* Called with job_mutex held, but releases it temporarily. */
 761static void job_do_dismiss_locked(Job *job)
 762{
 763    assert(job);
 764    job->busy = false;
 765    job->paused = false;
 766    job->deferred_to_main_loop = true;
 767
 768    job_txn_del_job_locked(job);
 769
 770    job_state_transition_locked(job, JOB_STATUS_NULL);
 771    job_unref_locked(job);
 772}
 773
 774void job_dismiss_locked(Job **jobptr, Error **errp)
 775{
 776    Job *job = *jobptr;
 777    /* similarly to _complete, this is QMP-interface only. */
 778    assert(job->id);
 779    if (job_apply_verb_locked(job, JOB_VERB_DISMISS, errp)) {
 780        return;
 781    }
 782
 783    job_do_dismiss_locked(job);
 784    *jobptr = NULL;
 785}
 786
 787void job_early_fail(Job *job)
 788{
 789    JOB_LOCK_GUARD();
 790    assert(job->status == JOB_STATUS_CREATED);
 791    job_do_dismiss_locked(job);
 792}
 793
 794/* Called with job_mutex held. */
 795static void job_conclude_locked(Job *job)
 796{
 797    job_state_transition_locked(job, JOB_STATUS_CONCLUDED);
 798    if (job->auto_dismiss || !job_started_locked(job)) {
 799        job_do_dismiss_locked(job);
 800    }
 801}
 802
 803/* Called with job_mutex held. */
 804static void job_update_rc_locked(Job *job)
 805{
 806    if (!job->ret && job_is_cancelled_locked(job)) {
 807        job->ret = -ECANCELED;
 808    }
 809    if (job->ret) {
 810        if (!job->err) {
 811            error_setg(&job->err, "%s", strerror(-job->ret));
 812        }
 813        job_state_transition_locked(job, JOB_STATUS_ABORTING);
 814    }
 815}
 816
 817static void job_commit(Job *job)
 818{
 819    assert(!job->ret);
 820    GLOBAL_STATE_CODE();
 821    if (job->driver->commit) {
 822        job->driver->commit(job);
 823    }
 824}
 825
 826static void job_abort(Job *job)
 827{
 828    assert(job->ret);
 829    GLOBAL_STATE_CODE();
 830    if (job->driver->abort) {
 831        job->driver->abort(job);
 832    }
 833}
 834
 835static void job_clean(Job *job)
 836{
 837    GLOBAL_STATE_CODE();
 838    if (job->driver->clean) {
 839        job->driver->clean(job);
 840    }
 841}
 842
 843/*
 844 * Called with job_mutex held, but releases it temporarily.
 845 */
 846static int job_finalize_single_locked(Job *job)
 847{
 848    int job_ret;
 849
 850    assert(job_is_completed_locked(job));
 851
 852    /* Ensure abort is called for late-transactional failures */
 853    job_update_rc_locked(job);
 854
 855    job_ret = job->ret;
 856    job_unlock();
 857
 858    if (!job_ret) {
 859        job_commit(job);
 860    } else {
 861        job_abort(job);
 862    }
 863    job_clean(job);
 864
 865    if (job->cb) {
 866        job->cb(job->opaque, job_ret);
 867    }
 868
 869    job_lock();
 870
 871    /* Emit events only if we actually started */
 872    if (job_started_locked(job)) {
 873        if (job_is_cancelled_locked(job)) {
 874            job_event_cancelled_locked(job);
 875        } else {
 876            job_event_completed_locked(job);
 877        }
 878    }
 879
 880    job_txn_del_job_locked(job);
 881    job_conclude_locked(job);
 882    return 0;
 883}
 884
 885/*
 886 * Called with job_mutex held, but releases it temporarily.
 887 */
 888static void job_cancel_async_locked(Job *job, bool force)
 889{
 890    GLOBAL_STATE_CODE();
 891    if (job->driver->cancel) {
 892        job_unlock();
 893        force = job->driver->cancel(job, force);
 894        job_lock();
 895    } else {
 896        /* No .cancel() means the job will behave as if force-cancelled */
 897        force = true;
 898    }
 899
 900    if (job->user_paused) {
 901        /* Do not call job_enter here, the caller will handle it.  */
 902        if (job->driver->user_resume) {
 903            job_unlock();
 904            job->driver->user_resume(job);
 905            job_lock();
 906        }
 907        job->user_paused = false;
 908        assert(job->pause_count > 0);
 909        job->pause_count--;
 910    }
 911
 912    /*
 913     * Ignore soft cancel requests after the job is already done
 914     * (We will still invoke job->driver->cancel() above, but if the
 915     * job driver supports soft cancelling and the job is done, that
 916     * should be a no-op, too.  We still call it so it can override
 917     * @force.)
 918     */
 919    if (force || !job->deferred_to_main_loop) {
 920        job->cancelled = true;
 921        /* To prevent 'force == false' overriding a previous 'force == true' */
 922        job->force_cancel |= force;
 923    }
 924}
 925
 926/*
 927 * Called with job_mutex held, but releases it temporarily.
 928 */
 929static void job_completed_txn_abort_locked(Job *job)
 930{
 931    JobTxn *txn = job->txn;
 932    Job *other_job;
 933
 934    if (txn->aborting) {
 935        /*
 936         * We are cancelled by another job, which will handle everything.
 937         */
 938        return;
 939    }
 940    txn->aborting = true;
 941    job_txn_ref_locked(txn);
 942
 943    job_ref_locked(job);
 944
 945    /* Other jobs are effectively cancelled by us, set the status for
 946     * them; this job, however, may or may not be cancelled, depending
 947     * on the caller, so leave it. */
 948    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
 949        if (other_job != job) {
 950            /*
 951             * This is a transaction: If one job failed, no result will matter.
 952             * Therefore, pass force=true to terminate all other jobs as quickly
 953             * as possible.
 954             */
 955            job_cancel_async_locked(other_job, true);
 956        }
 957    }
 958    while (!QLIST_EMPTY(&txn->jobs)) {
 959        other_job = QLIST_FIRST(&txn->jobs);
 960        if (!job_is_completed_locked(other_job)) {
 961            assert(job_cancel_requested_locked(other_job));
 962            job_finish_sync_locked(other_job, NULL, NULL);
 963        }
 964        job_finalize_single_locked(other_job);
 965    }
 966
 967    job_unref_locked(job);
 968    job_txn_unref_locked(txn);
 969}
 970
 971/* Called with job_mutex held, but releases it temporarily */
 972static int job_prepare_locked(Job *job)
 973{
 974    int ret;
 975
 976    GLOBAL_STATE_CODE();
 977
 978    if (job->ret == 0 && job->driver->prepare) {
 979        job_unlock();
 980        ret = job->driver->prepare(job);
 981        job_lock();
 982        job->ret = ret;
 983        job_update_rc_locked(job);
 984    }
 985
 986    return job->ret;
 987}
 988
 989/* Called with job_mutex held */
 990static int job_needs_finalize_locked(Job *job)
 991{
 992    return !job->auto_finalize;
 993}
 994
 995/* Called with job_mutex held */
 996static void job_do_finalize_locked(Job *job)
 997{
 998    int rc;
 999    assert(job && job->txn);
1000
1001    /* prepare the transaction to complete */
1002    rc = job_txn_apply_locked(job, job_prepare_locked);
1003    if (rc) {
1004        job_completed_txn_abort_locked(job);
1005    } else {
1006        job_txn_apply_locked(job, job_finalize_single_locked);
1007    }
1008}
1009
1010void job_finalize_locked(Job *job, Error **errp)
1011{
1012    assert(job && job->id);
1013    if (job_apply_verb_locked(job, JOB_VERB_FINALIZE, errp)) {
1014        return;
1015    }
1016    job_do_finalize_locked(job);
1017}
1018
1019/* Called with job_mutex held. */
1020static int job_transition_to_pending_locked(Job *job)
1021{
1022    job_state_transition_locked(job, JOB_STATUS_PENDING);
1023    if (!job->auto_finalize) {
1024        job_event_pending_locked(job);
1025    }
1026    return 0;
1027}
1028
1029void job_transition_to_ready(Job *job)
1030{
1031    JOB_LOCK_GUARD();
1032    job_state_transition_locked(job, JOB_STATUS_READY);
1033    job_event_ready_locked(job);
1034}
1035
1036/* Called with job_mutex held. */
1037static void job_completed_txn_success_locked(Job *job)
1038{
1039    JobTxn *txn = job->txn;
1040    Job *other_job;
1041
1042    job_state_transition_locked(job, JOB_STATUS_WAITING);
1043
1044    /*
1045     * Successful completion, see if there are other running jobs in this
1046     * txn.
1047     */
1048    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
1049        if (!job_is_completed_locked(other_job)) {
1050            return;
1051        }
1052        assert(other_job->ret == 0);
1053    }
1054
1055    job_txn_apply_locked(job, job_transition_to_pending_locked);
1056
1057    /* If no jobs need manual finalization, automatically do so */
1058    if (job_txn_apply_locked(job, job_needs_finalize_locked) == 0) {
1059        job_do_finalize_locked(job);
1060    }
1061}
1062
1063/* Called with job_mutex held. */
1064static void job_completed_locked(Job *job)
1065{
1066    assert(job && job->txn && !job_is_completed_locked(job));
1067
1068    job_update_rc_locked(job);
1069    trace_job_completed(job, job->ret);
1070    if (job->ret) {
1071        job_completed_txn_abort_locked(job);
1072    } else {
1073        job_completed_txn_success_locked(job);
1074    }
1075}
1076
1077/**
1078 * Useful only as a type shim for aio_bh_schedule_oneshot.
1079 * Called with job_mutex *not* held.
1080 */
1081static void job_exit(void *opaque)
1082{
1083    Job *job = (Job *)opaque;
1084    JOB_LOCK_GUARD();
1085    job_ref_locked(job);
1086
1087    /* This is a lie, we're not quiescent, but still doing the completion
1088     * callbacks. However, completion callbacks tend to involve operations that
1089     * drain block nodes, and if .drained_poll still returned true, we would
1090     * deadlock. */
1091    job->busy = false;
1092    job_event_idle_locked(job);
1093
1094    job_completed_locked(job);
1095    job_unref_locked(job);
1096}
1097
1098/**
1099 * All jobs must allow a pause point before entering their job proper. This
1100 * ensures that jobs can be paused prior to being started, then resumed later.
1101 */
1102static void coroutine_fn job_co_entry(void *opaque)
1103{
1104    Job *job = opaque;
1105    int ret;
1106
1107    assert(job && job->driver && job->driver->run);
1108    WITH_JOB_LOCK_GUARD() {
1109        assert(job->aio_context == qemu_get_current_aio_context());
1110        job_pause_point_locked(job);
1111    }
1112    ret = job->driver->run(job, &job->err);
1113    WITH_JOB_LOCK_GUARD() {
1114        job->ret = ret;
1115        job->deferred_to_main_loop = true;
1116        job->busy = true;
1117    }
1118    aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
1119}
1120
1121void job_start(Job *job)
1122{
1123    assert(qemu_in_main_thread());
1124
1125    WITH_JOB_LOCK_GUARD() {
1126        assert(job && !job_started_locked(job) && job->paused &&
1127            job->driver && job->driver->run);
1128        job->co = qemu_coroutine_create(job_co_entry, job);
1129        job->pause_count--;
1130        job->busy = true;
1131        job->paused = false;
1132        job_state_transition_locked(job, JOB_STATUS_RUNNING);
1133    }
1134    aio_co_enter(job->aio_context, job->co);
1135}
1136
1137void job_cancel_locked(Job *job, bool force)
1138{
1139    if (job->status == JOB_STATUS_CONCLUDED) {
1140        job_do_dismiss_locked(job);
1141        return;
1142    }
1143    job_cancel_async_locked(job, force);
1144    if (!job_started_locked(job)) {
1145        job_completed_locked(job);
1146    } else if (job->deferred_to_main_loop) {
1147        /*
1148         * job_cancel_async() ignores soft-cancel requests for jobs
1149         * that are already done (i.e. deferred to the main loop).  We
1150         * have to check again whether the job is really cancelled.
1151         * (job_cancel_requested() and job_is_cancelled() are equivalent
1152         * here, because job_cancel_async() will make soft-cancel
1153         * requests no-ops when deferred_to_main_loop is true.  We
1154         * choose to call job_is_cancelled() to show that we invoke
1155         * job_completed_txn_abort() only for force-cancelled jobs.)
1156         */
1157        if (job_is_cancelled_locked(job)) {
1158            job_completed_txn_abort_locked(job);
1159        }
1160    } else {
1161        job_enter_cond_locked(job, NULL);
1162    }
1163}
1164
1165void job_user_cancel_locked(Job *job, bool force, Error **errp)
1166{
1167    if (job_apply_verb_locked(job, JOB_VERB_CANCEL, errp)) {
1168        return;
1169    }
1170    job_cancel_locked(job, force);
1171}
1172
1173/* A wrapper around job_cancel_locked() taking an Error ** parameter so it may
1174 * be used with job_finish_sync_locked() without the need for (rather nasty)
1175 * function pointer casts there.
1176 *
1177 * Called with job_mutex held.
1178 */
1179static void job_cancel_err_locked(Job *job, Error **errp)
1180{
1181    job_cancel_locked(job, false);
1182}
1183
1184/**
1185 * Same as job_cancel_err(), but force-cancel.
1186 * Called with job_mutex held.
1187 */
1188static void job_force_cancel_err_locked(Job *job, Error **errp)
1189{
1190    job_cancel_locked(job, true);
1191}
1192
1193int job_cancel_sync_locked(Job *job, bool force)
1194{
1195    if (force) {
1196        return job_finish_sync_locked(job, &job_force_cancel_err_locked, NULL);
1197    } else {
1198        return job_finish_sync_locked(job, &job_cancel_err_locked, NULL);
1199    }
1200}
1201
1202int job_cancel_sync(Job *job, bool force)
1203{
1204    JOB_LOCK_GUARD();
1205    return job_cancel_sync_locked(job, force);
1206}
1207
1208void job_cancel_sync_all(void)
1209{
1210    Job *job;
1211    JOB_LOCK_GUARD();
1212
1213    while ((job = job_next_locked(NULL))) {
1214        job_cancel_sync_locked(job, true);
1215    }
1216}
1217
1218int job_complete_sync_locked(Job *job, Error **errp)
1219{
1220    return job_finish_sync_locked(job, job_complete_locked, errp);
1221}
1222
1223void job_complete_locked(Job *job, Error **errp)
1224{
1225    /* Should not be reachable via external interface for internal jobs */
1226    assert(job->id);
1227    GLOBAL_STATE_CODE();
1228    if (job_apply_verb_locked(job, JOB_VERB_COMPLETE, errp)) {
1229        return;
1230    }
1231    if (job_cancel_requested_locked(job) || !job->driver->complete) {
1232        error_setg(errp, "The active block job '%s' cannot be completed",
1233                   job->id);
1234        return;
1235    }
1236
1237    job_unlock();
1238    job->driver->complete(job, errp);
1239    job_lock();
1240}
1241
1242int job_finish_sync_locked(Job *job,
1243                           void (*finish)(Job *, Error **errp),
1244                           Error **errp)
1245{
1246    Error *local_err = NULL;
1247    int ret;
1248    GLOBAL_STATE_CODE();
1249
1250    job_ref_locked(job);
1251
1252    if (finish) {
1253        finish(job, &local_err);
1254    }
1255    if (local_err) {
1256        error_propagate(errp, local_err);
1257        job_unref_locked(job);
1258        return -EBUSY;
1259    }
1260
1261    job_unlock();
1262    AIO_WAIT_WHILE_UNLOCKED(job->aio_context,
1263                            (job_enter(job), !job_is_completed(job)));
1264    job_lock();
1265
1266    ret = (job_is_cancelled_locked(job) && job->ret == 0)
1267          ? -ECANCELED : job->ret;
1268    job_unref_locked(job);
1269    return ret;
1270}
1271