qemu/job.c
<<
>>
Prefs
   1/*
   2 * Background jobs (long-running operations)
   3 *
   4 * Copyright (c) 2011 IBM Corp.
   5 * Copyright (c) 2012, 2018 Red Hat, Inc.
   6 *
   7 * Permission is hereby granted, free of charge, to any person obtaining a copy
   8 * of this software and associated documentation files (the "Software"), to deal
   9 * in the Software without restriction, including without limitation the rights
  10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 * copies of the Software, and to permit persons to whom the Software is
  12 * furnished to do so, subject to the following conditions:
  13 *
  14 * The above copyright notice and this permission notice shall be included in
  15 * all copies or substantial portions of the Software.
  16 *
  17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23 * THE SOFTWARE.
  24 */
  25
  26#include "qemu/osdep.h"
  27#include "qapi/error.h"
  28#include "qemu/job.h"
  29#include "qemu/id.h"
  30#include "qemu/main-loop.h"
  31#include "block/aio-wait.h"
  32#include "trace/trace-root.h"
  33#include "qapi/qapi-events-job.h"
  34
  35/*
  36 * The job API is composed of two categories of functions.
  37 *
  38 * The first includes functions used by the monitor.  The monitor is
  39 * peculiar in that it accesses the job list with job_get, and
  40 * therefore needs consistency across job_get and the actual operation
  41 * (e.g. job_user_cancel). To achieve this consistency, the caller
  42 * calls job_lock/job_unlock itself around the whole operation.
  43 *
  44 *
  45 * The second includes functions used by the job drivers and sometimes
  46 * by the core block layer. These delegate the locking to the callee instead.
  47 */
  48
  49/*
  50 * job_mutex protects the jobs list, but also makes the
  51 * struct job fields thread-safe.
  52 */
  53QemuMutex job_mutex;
  54
  55/* Protected by job_mutex */
  56static QLIST_HEAD(, Job) jobs = QLIST_HEAD_INITIALIZER(jobs);
  57
  58/* Job State Transition Table */
  59bool JobSTT[JOB_STATUS__MAX][JOB_STATUS__MAX] = {
  60                                    /* U, C, R, P, Y, S, W, D, X, E, N */
  61    /* U: */ [JOB_STATUS_UNDEFINED] = {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0},
  62    /* C: */ [JOB_STATUS_CREATED]   = {0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1},
  63    /* R: */ [JOB_STATUS_RUNNING]   = {0, 0, 0, 1, 1, 0, 1, 0, 1, 0, 0},
  64    /* P: */ [JOB_STATUS_PAUSED]    = {0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0},
  65    /* Y: */ [JOB_STATUS_READY]     = {0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0},
  66    /* S: */ [JOB_STATUS_STANDBY]   = {0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0},
  67    /* W: */ [JOB_STATUS_WAITING]   = {0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 0},
  68    /* D: */ [JOB_STATUS_PENDING]   = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
  69    /* X: */ [JOB_STATUS_ABORTING]  = {0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0},
  70    /* E: */ [JOB_STATUS_CONCLUDED] = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1},
  71    /* N: */ [JOB_STATUS_NULL]      = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
  72};
  73
  74bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] = {
  75                                    /* U, C, R, P, Y, S, W, D, X, E, N */
  76    [JOB_VERB_CANCEL]               = {0, 1, 1, 1, 1, 1, 1, 1, 0, 0, 0},
  77    [JOB_VERB_PAUSE]                = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
  78    [JOB_VERB_RESUME]               = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
  79    [JOB_VERB_SET_SPEED]            = {0, 1, 1, 1, 1, 1, 0, 0, 0, 0, 0},
  80    [JOB_VERB_COMPLETE]             = {0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0},
  81    [JOB_VERB_FINALIZE]             = {0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0},
  82    [JOB_VERB_DISMISS]              = {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0},
  83};
  84
  85/* Transactional group of jobs */
  86struct JobTxn {
  87
  88    /* Is this txn being cancelled? */
  89    bool aborting;
  90
  91    /* List of jobs */
  92    QLIST_HEAD(, Job) jobs;
  93
  94    /* Reference count */
  95    int refcnt;
  96};
  97
  98void job_lock(void)
  99{
 100    qemu_mutex_lock(&job_mutex);
 101}
 102
 103void job_unlock(void)
 104{
 105    qemu_mutex_unlock(&job_mutex);
 106}
 107
 108static void __attribute__((__constructor__)) job_init(void)
 109{
 110    qemu_mutex_init(&job_mutex);
 111}
 112
 113JobTxn *job_txn_new(void)
 114{
 115    JobTxn *txn = g_new0(JobTxn, 1);
 116    QLIST_INIT(&txn->jobs);
 117    txn->refcnt = 1;
 118    return txn;
 119}
 120
 121/* Called with job_mutex held. */
 122static void job_txn_ref_locked(JobTxn *txn)
 123{
 124    txn->refcnt++;
 125}
 126
 127void job_txn_unref_locked(JobTxn *txn)
 128{
 129    if (txn && --txn->refcnt == 0) {
 130        g_free(txn);
 131    }
 132}
 133
 134void job_txn_unref(JobTxn *txn)
 135{
 136    JOB_LOCK_GUARD();
 137    job_txn_unref_locked(txn);
 138}
 139
 140/**
 141 * @txn: The transaction (may be NULL)
 142 * @job: Job to add to the transaction
 143 *
 144 * Add @job to the transaction.  The @job must not already be in a transaction.
 145 * The caller must call either job_txn_unref() or job_completed() to release
 146 * the reference that is automatically grabbed here.
 147 *
 148 * If @txn is NULL, the function does nothing.
 149 *
 150 * Called with job_mutex held.
 151 */
 152static void job_txn_add_job_locked(JobTxn *txn, Job *job)
 153{
 154    if (!txn) {
 155        return;
 156    }
 157
 158    assert(!job->txn);
 159    job->txn = txn;
 160
 161    QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
 162    job_txn_ref_locked(txn);
 163}
 164
 165/* Called with job_mutex held. */
 166static void job_txn_del_job_locked(Job *job)
 167{
 168    if (job->txn) {
 169        QLIST_REMOVE(job, txn_list);
 170        job_txn_unref_locked(job->txn);
 171        job->txn = NULL;
 172    }
 173}
 174
 175/* Called with job_mutex held, but releases it temporarily. */
 176static int job_txn_apply_locked(Job *job, int fn(Job *))
 177{
 178    Job *other_job, *next;
 179    JobTxn *txn = job->txn;
 180    int rc = 0;
 181
 182    /*
 183     * Similar to job_completed_txn_abort, we take each job's lock before
 184     * applying fn, but since we assume that outer_ctx is held by the caller,
 185     * we need to release it here to avoid holding the lock twice - which would
 186     * break AIO_WAIT_WHILE from within fn.
 187     */
 188    job_ref_locked(job);
 189
 190    QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
 191        rc = fn(other_job);
 192        if (rc) {
 193            break;
 194        }
 195    }
 196
 197    job_unref_locked(job);
 198    return rc;
 199}
 200
 201bool job_is_internal(Job *job)
 202{
 203    return (job->id == NULL);
 204}
 205
 206/* Called with job_mutex held. */
 207static void job_state_transition_locked(Job *job, JobStatus s1)
 208{
 209    JobStatus s0 = job->status;
 210    assert(s1 >= 0 && s1 < JOB_STATUS__MAX);
 211    trace_job_state_transition(job, job->ret,
 212                               JobSTT[s0][s1] ? "allowed" : "disallowed",
 213                               JobStatus_str(s0), JobStatus_str(s1));
 214    assert(JobSTT[s0][s1]);
 215    job->status = s1;
 216
 217    if (!job_is_internal(job) && s1 != s0) {
 218        qapi_event_send_job_status_change(job->id, job->status);
 219    }
 220}
 221
 222int job_apply_verb_locked(Job *job, JobVerb verb, Error **errp)
 223{
 224    JobStatus s0 = job->status;
 225    assert(verb >= 0 && verb < JOB_VERB__MAX);
 226    trace_job_apply_verb(job, JobStatus_str(s0), JobVerb_str(verb),
 227                         JobVerbTable[verb][s0] ? "allowed" : "prohibited");
 228    if (JobVerbTable[verb][s0]) {
 229        return 0;
 230    }
 231    error_setg(errp, "Job '%s' in state '%s' cannot accept command verb '%s'",
 232               job->id, JobStatus_str(s0), JobVerb_str(verb));
 233    return -EPERM;
 234}
 235
 236JobType job_type(const Job *job)
 237{
 238    return job->driver->job_type;
 239}
 240
 241const char *job_type_str(const Job *job)
 242{
 243    return JobType_str(job_type(job));
 244}
 245
 246bool job_is_cancelled_locked(Job *job)
 247{
 248    /* force_cancel may be true only if cancelled is true, too */
 249    assert(job->cancelled || !job->force_cancel);
 250    return job->force_cancel;
 251}
 252
 253bool job_is_cancelled(Job *job)
 254{
 255    JOB_LOCK_GUARD();
 256    return job_is_cancelled_locked(job);
 257}
 258
 259/* Called with job_mutex held. */
 260static bool job_cancel_requested_locked(Job *job)
 261{
 262    return job->cancelled;
 263}
 264
 265bool job_cancel_requested(Job *job)
 266{
 267    JOB_LOCK_GUARD();
 268    return job_cancel_requested_locked(job);
 269}
 270
 271bool job_is_ready_locked(Job *job)
 272{
 273    switch (job->status) {
 274    case JOB_STATUS_UNDEFINED:
 275    case JOB_STATUS_CREATED:
 276    case JOB_STATUS_RUNNING:
 277    case JOB_STATUS_PAUSED:
 278    case JOB_STATUS_WAITING:
 279    case JOB_STATUS_PENDING:
 280    case JOB_STATUS_ABORTING:
 281    case JOB_STATUS_CONCLUDED:
 282    case JOB_STATUS_NULL:
 283        return false;
 284    case JOB_STATUS_READY:
 285    case JOB_STATUS_STANDBY:
 286        return true;
 287    default:
 288        g_assert_not_reached();
 289    }
 290    return false;
 291}
 292
 293bool job_is_ready(Job *job)
 294{
 295    JOB_LOCK_GUARD();
 296    return job_is_ready_locked(job);
 297}
 298
 299bool job_is_completed_locked(Job *job)
 300{
 301    switch (job->status) {
 302    case JOB_STATUS_UNDEFINED:
 303    case JOB_STATUS_CREATED:
 304    case JOB_STATUS_RUNNING:
 305    case JOB_STATUS_PAUSED:
 306    case JOB_STATUS_READY:
 307    case JOB_STATUS_STANDBY:
 308        return false;
 309    case JOB_STATUS_WAITING:
 310    case JOB_STATUS_PENDING:
 311    case JOB_STATUS_ABORTING:
 312    case JOB_STATUS_CONCLUDED:
 313    case JOB_STATUS_NULL:
 314        return true;
 315    default:
 316        g_assert_not_reached();
 317    }
 318    return false;
 319}
 320
 321static bool job_is_completed(Job *job)
 322{
 323    JOB_LOCK_GUARD();
 324    return job_is_completed_locked(job);
 325}
 326
 327static bool job_started_locked(Job *job)
 328{
 329    return job->co;
 330}
 331
 332/* Called with job_mutex held. */
 333static bool job_should_pause_locked(Job *job)
 334{
 335    return job->pause_count > 0;
 336}
 337
 338Job *job_next_locked(Job *job)
 339{
 340    if (!job) {
 341        return QLIST_FIRST(&jobs);
 342    }
 343    return QLIST_NEXT(job, job_list);
 344}
 345
 346Job *job_next(Job *job)
 347{
 348    JOB_LOCK_GUARD();
 349    return job_next_locked(job);
 350}
 351
 352Job *job_get_locked(const char *id)
 353{
 354    Job *job;
 355
 356    QLIST_FOREACH(job, &jobs, job_list) {
 357        if (job->id && !strcmp(id, job->id)) {
 358            return job;
 359        }
 360    }
 361
 362    return NULL;
 363}
 364
 365void job_set_aio_context(Job *job, AioContext *ctx)
 366{
 367    /* protect against read in job_finish_sync_locked and job_start */
 368    GLOBAL_STATE_CODE();
 369    /* protect against read in job_do_yield_locked */
 370    JOB_LOCK_GUARD();
 371    /* ensure the job is quiescent while the AioContext is changed */
 372    assert(job->paused || job_is_completed_locked(job));
 373    job->aio_context = ctx;
 374}
 375
 376/* Called with job_mutex *not* held. */
 377static void job_sleep_timer_cb(void *opaque)
 378{
 379    Job *job = opaque;
 380
 381    job_enter(job);
 382}
 383
 384void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn,
 385                 AioContext *ctx, int flags, BlockCompletionFunc *cb,
 386                 void *opaque, Error **errp)
 387{
 388    Job *job;
 389
 390    JOB_LOCK_GUARD();
 391
 392    if (job_id) {
 393        if (flags & JOB_INTERNAL) {
 394            error_setg(errp, "Cannot specify job ID for internal job");
 395            return NULL;
 396        }
 397        if (!id_wellformed(job_id)) {
 398            error_setg(errp, "Invalid job ID '%s'", job_id);
 399            return NULL;
 400        }
 401        if (job_get_locked(job_id)) {
 402            error_setg(errp, "Job ID '%s' already in use", job_id);
 403            return NULL;
 404        }
 405    } else if (!(flags & JOB_INTERNAL)) {
 406        error_setg(errp, "An explicit job ID is required");
 407        return NULL;
 408    }
 409
 410    job = g_malloc0(driver->instance_size);
 411    job->driver        = driver;
 412    job->id            = g_strdup(job_id);
 413    job->refcnt        = 1;
 414    job->aio_context   = ctx;
 415    job->busy          = false;
 416    job->paused        = true;
 417    job->pause_count   = 1;
 418    job->auto_finalize = !(flags & JOB_MANUAL_FINALIZE);
 419    job->auto_dismiss  = !(flags & JOB_MANUAL_DISMISS);
 420    job->cb            = cb;
 421    job->opaque        = opaque;
 422
 423    progress_init(&job->progress);
 424
 425    notifier_list_init(&job->on_finalize_cancelled);
 426    notifier_list_init(&job->on_finalize_completed);
 427    notifier_list_init(&job->on_pending);
 428    notifier_list_init(&job->on_ready);
 429    notifier_list_init(&job->on_idle);
 430
 431    job_state_transition_locked(job, JOB_STATUS_CREATED);
 432    aio_timer_init(qemu_get_aio_context(), &job->sleep_timer,
 433                   QEMU_CLOCK_REALTIME, SCALE_NS,
 434                   job_sleep_timer_cb, job);
 435
 436    QLIST_INSERT_HEAD(&jobs, job, job_list);
 437
 438    /* Single jobs are modeled as single-job transactions for sake of
 439     * consolidating the job management logic */
 440    if (!txn) {
 441        txn = job_txn_new();
 442        job_txn_add_job_locked(txn, job);
 443        job_txn_unref_locked(txn);
 444    } else {
 445        job_txn_add_job_locked(txn, job);
 446    }
 447
 448    return job;
 449}
 450
 451void job_ref_locked(Job *job)
 452{
 453    ++job->refcnt;
 454}
 455
 456void job_unref_locked(Job *job)
 457{
 458    GLOBAL_STATE_CODE();
 459
 460    if (--job->refcnt == 0) {
 461        assert(job->status == JOB_STATUS_NULL);
 462        assert(!timer_pending(&job->sleep_timer));
 463        assert(!job->txn);
 464
 465        if (job->driver->free) {
 466            AioContext *aio_context = job->aio_context;
 467            job_unlock();
 468            /* FIXME: aiocontext lock is required because cb calls blk_unref */
 469            aio_context_acquire(aio_context);
 470            job->driver->free(job);
 471            aio_context_release(aio_context);
 472            job_lock();
 473        }
 474
 475        QLIST_REMOVE(job, job_list);
 476
 477        progress_destroy(&job->progress);
 478        error_free(job->err);
 479        g_free(job->id);
 480        g_free(job);
 481    }
 482}
 483
 484void job_progress_update(Job *job, uint64_t done)
 485{
 486    progress_work_done(&job->progress, done);
 487}
 488
 489void job_progress_set_remaining(Job *job, uint64_t remaining)
 490{
 491    progress_set_remaining(&job->progress, remaining);
 492}
 493
 494void job_progress_increase_remaining(Job *job, uint64_t delta)
 495{
 496    progress_increase_remaining(&job->progress, delta);
 497}
 498
 499/**
 500 * To be called when a cancelled job is finalised.
 501 * Called with job_mutex held.
 502 */
 503static void job_event_cancelled_locked(Job *job)
 504{
 505    notifier_list_notify(&job->on_finalize_cancelled, job);
 506}
 507
 508/**
 509 * To be called when a successfully completed job is finalised.
 510 * Called with job_mutex held.
 511 */
 512static void job_event_completed_locked(Job *job)
 513{
 514    notifier_list_notify(&job->on_finalize_completed, job);
 515}
 516
 517/* Called with job_mutex held. */
 518static void job_event_pending_locked(Job *job)
 519{
 520    notifier_list_notify(&job->on_pending, job);
 521}
 522
 523/* Called with job_mutex held. */
 524static void job_event_ready_locked(Job *job)
 525{
 526    notifier_list_notify(&job->on_ready, job);
 527}
 528
 529/* Called with job_mutex held. */
 530static void job_event_idle_locked(Job *job)
 531{
 532    notifier_list_notify(&job->on_idle, job);
 533}
 534
 535void job_enter_cond_locked(Job *job, bool(*fn)(Job *job))
 536{
 537    if (!job_started_locked(job)) {
 538        return;
 539    }
 540    if (job->deferred_to_main_loop) {
 541        return;
 542    }
 543
 544    if (job->busy) {
 545        return;
 546    }
 547
 548    if (fn && !fn(job)) {
 549        return;
 550    }
 551
 552    assert(!job->deferred_to_main_loop);
 553    timer_del(&job->sleep_timer);
 554    job->busy = true;
 555    job_unlock();
 556    aio_co_wake(job->co);
 557    job_lock();
 558}
 559
 560void job_enter(Job *job)
 561{
 562    JOB_LOCK_GUARD();
 563    job_enter_cond_locked(job, NULL);
 564}
 565
 566/* Yield, and schedule a timer to reenter the coroutine after @ns nanoseconds.
 567 * Reentering the job coroutine with job_enter() before the timer has expired
 568 * is allowed and cancels the timer.
 569 *
 570 * If @ns is (uint64_t) -1, no timer is scheduled and job_enter() must be
 571 * called explicitly.
 572 *
 573 * Called with job_mutex held, but releases it temporarily.
 574 */
 575static void coroutine_fn job_do_yield_locked(Job *job, uint64_t ns)
 576{
 577    AioContext *next_aio_context;
 578
 579    if (ns != -1) {
 580        timer_mod(&job->sleep_timer, ns);
 581    }
 582    job->busy = false;
 583    job_event_idle_locked(job);
 584    job_unlock();
 585    qemu_coroutine_yield();
 586    job_lock();
 587
 588    next_aio_context = job->aio_context;
 589    /*
 590     * Coroutine has resumed, but in the meanwhile the job AioContext
 591     * might have changed via bdrv_try_change_aio_context(), so we need to move
 592     * the coroutine too in the new aiocontext.
 593     */
 594    while (qemu_get_current_aio_context() != next_aio_context) {
 595        job_unlock();
 596        aio_co_reschedule_self(next_aio_context);
 597        job_lock();
 598        next_aio_context = job->aio_context;
 599    }
 600
 601    /* Set by job_enter_cond_locked() before re-entering the coroutine.  */
 602    assert(job->busy);
 603}
 604
 605/* Called with job_mutex held, but releases it temporarily. */
 606static void coroutine_fn job_pause_point_locked(Job *job)
 607{
 608    assert(job && job_started_locked(job));
 609
 610    if (!job_should_pause_locked(job)) {
 611        return;
 612    }
 613    if (job_is_cancelled_locked(job)) {
 614        return;
 615    }
 616
 617    if (job->driver->pause) {
 618        job_unlock();
 619        job->driver->pause(job);
 620        job_lock();
 621    }
 622
 623    if (job_should_pause_locked(job) && !job_is_cancelled_locked(job)) {
 624        JobStatus status = job->status;
 625        job_state_transition_locked(job, status == JOB_STATUS_READY
 626                                    ? JOB_STATUS_STANDBY
 627                                    : JOB_STATUS_PAUSED);
 628        job->paused = true;
 629        job_do_yield_locked(job, -1);
 630        job->paused = false;
 631        job_state_transition_locked(job, status);
 632    }
 633
 634    if (job->driver->resume) {
 635        job_unlock();
 636        job->driver->resume(job);
 637        job_lock();
 638    }
 639}
 640
 641void coroutine_fn job_pause_point(Job *job)
 642{
 643    JOB_LOCK_GUARD();
 644    job_pause_point_locked(job);
 645}
 646
 647void coroutine_fn job_yield(Job *job)
 648{
 649    JOB_LOCK_GUARD();
 650    assert(job->busy);
 651
 652    /* Check cancellation *before* setting busy = false, too!  */
 653    if (job_is_cancelled_locked(job)) {
 654        return;
 655    }
 656
 657    if (!job_should_pause_locked(job)) {
 658        job_do_yield_locked(job, -1);
 659    }
 660
 661    job_pause_point_locked(job);
 662}
 663
 664void coroutine_fn job_sleep_ns(Job *job, int64_t ns)
 665{
 666    JOB_LOCK_GUARD();
 667    assert(job->busy);
 668
 669    /* Check cancellation *before* setting busy = false, too!  */
 670    if (job_is_cancelled_locked(job)) {
 671        return;
 672    }
 673
 674    if (!job_should_pause_locked(job)) {
 675        job_do_yield_locked(job, qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + ns);
 676    }
 677
 678    job_pause_point_locked(job);
 679}
 680
 681/* Assumes the job_mutex is held */
 682static bool job_timer_not_pending_locked(Job *job)
 683{
 684    return !timer_pending(&job->sleep_timer);
 685}
 686
 687void job_pause_locked(Job *job)
 688{
 689    job->pause_count++;
 690    if (!job->paused) {
 691        job_enter_cond_locked(job, NULL);
 692    }
 693}
 694
 695void job_pause(Job *job)
 696{
 697    JOB_LOCK_GUARD();
 698    job_pause_locked(job);
 699}
 700
 701void job_resume_locked(Job *job)
 702{
 703    assert(job->pause_count > 0);
 704    job->pause_count--;
 705    if (job->pause_count) {
 706        return;
 707    }
 708
 709    /* kick only if no timer is pending */
 710    job_enter_cond_locked(job, job_timer_not_pending_locked);
 711}
 712
 713void job_resume(Job *job)
 714{
 715    JOB_LOCK_GUARD();
 716    job_resume_locked(job);
 717}
 718
 719void job_user_pause_locked(Job *job, Error **errp)
 720{
 721    if (job_apply_verb_locked(job, JOB_VERB_PAUSE, errp)) {
 722        return;
 723    }
 724    if (job->user_paused) {
 725        error_setg(errp, "Job is already paused");
 726        return;
 727    }
 728    job->user_paused = true;
 729    job_pause_locked(job);
 730}
 731
 732bool job_user_paused_locked(Job *job)
 733{
 734    return job->user_paused;
 735}
 736
 737void job_user_resume_locked(Job *job, Error **errp)
 738{
 739    assert(job);
 740    GLOBAL_STATE_CODE();
 741    if (!job->user_paused || job->pause_count <= 0) {
 742        error_setg(errp, "Can't resume a job that was not paused");
 743        return;
 744    }
 745    if (job_apply_verb_locked(job, JOB_VERB_RESUME, errp)) {
 746        return;
 747    }
 748    if (job->driver->user_resume) {
 749        job_unlock();
 750        job->driver->user_resume(job);
 751        job_lock();
 752    }
 753    job->user_paused = false;
 754    job_resume_locked(job);
 755}
 756
 757/* Called with job_mutex held, but releases it temporarily. */
 758static void job_do_dismiss_locked(Job *job)
 759{
 760    assert(job);
 761    job->busy = false;
 762    job->paused = false;
 763    job->deferred_to_main_loop = true;
 764
 765    job_txn_del_job_locked(job);
 766
 767    job_state_transition_locked(job, JOB_STATUS_NULL);
 768    job_unref_locked(job);
 769}
 770
 771void job_dismiss_locked(Job **jobptr, Error **errp)
 772{
 773    Job *job = *jobptr;
 774    /* similarly to _complete, this is QMP-interface only. */
 775    assert(job->id);
 776    if (job_apply_verb_locked(job, JOB_VERB_DISMISS, errp)) {
 777        return;
 778    }
 779
 780    job_do_dismiss_locked(job);
 781    *jobptr = NULL;
 782}
 783
 784void job_early_fail(Job *job)
 785{
 786    JOB_LOCK_GUARD();
 787    assert(job->status == JOB_STATUS_CREATED);
 788    job_do_dismiss_locked(job);
 789}
 790
 791/* Called with job_mutex held. */
 792static void job_conclude_locked(Job *job)
 793{
 794    job_state_transition_locked(job, JOB_STATUS_CONCLUDED);
 795    if (job->auto_dismiss || !job_started_locked(job)) {
 796        job_do_dismiss_locked(job);
 797    }
 798}
 799
 800/* Called with job_mutex held. */
 801static void job_update_rc_locked(Job *job)
 802{
 803    if (!job->ret && job_is_cancelled_locked(job)) {
 804        job->ret = -ECANCELED;
 805    }
 806    if (job->ret) {
 807        if (!job->err) {
 808            error_setg(&job->err, "%s", strerror(-job->ret));
 809        }
 810        job_state_transition_locked(job, JOB_STATUS_ABORTING);
 811    }
 812}
 813
 814static void job_commit(Job *job)
 815{
 816    assert(!job->ret);
 817    GLOBAL_STATE_CODE();
 818    if (job->driver->commit) {
 819        job->driver->commit(job);
 820    }
 821}
 822
 823static void job_abort(Job *job)
 824{
 825    assert(job->ret);
 826    GLOBAL_STATE_CODE();
 827    if (job->driver->abort) {
 828        job->driver->abort(job);
 829    }
 830}
 831
 832static void job_clean(Job *job)
 833{
 834    GLOBAL_STATE_CODE();
 835    if (job->driver->clean) {
 836        job->driver->clean(job);
 837    }
 838}
 839
 840/*
 841 * Called with job_mutex held, but releases it temporarily.
 842 * Takes AioContext lock internally to invoke a job->driver callback.
 843 */
 844static int job_finalize_single_locked(Job *job)
 845{
 846    int job_ret;
 847    AioContext *ctx = job->aio_context;
 848
 849    assert(job_is_completed_locked(job));
 850
 851    /* Ensure abort is called for late-transactional failures */
 852    job_update_rc_locked(job);
 853
 854    job_ret = job->ret;
 855    job_unlock();
 856    aio_context_acquire(ctx);
 857
 858    if (!job_ret) {
 859        job_commit(job);
 860    } else {
 861        job_abort(job);
 862    }
 863    job_clean(job);
 864
 865    if (job->cb) {
 866        job->cb(job->opaque, job_ret);
 867    }
 868
 869    aio_context_release(ctx);
 870    job_lock();
 871
 872    /* Emit events only if we actually started */
 873    if (job_started_locked(job)) {
 874        if (job_is_cancelled_locked(job)) {
 875            job_event_cancelled_locked(job);
 876        } else {
 877            job_event_completed_locked(job);
 878        }
 879    }
 880
 881    job_txn_del_job_locked(job);
 882    job_conclude_locked(job);
 883    return 0;
 884}
 885
 886/*
 887 * Called with job_mutex held, but releases it temporarily.
 888 * Takes AioContext lock internally to invoke a job->driver callback.
 889 */
 890static void job_cancel_async_locked(Job *job, bool force)
 891{
 892    AioContext *ctx = job->aio_context;
 893    GLOBAL_STATE_CODE();
 894    if (job->driver->cancel) {
 895        job_unlock();
 896        aio_context_acquire(ctx);
 897        force = job->driver->cancel(job, force);
 898        aio_context_release(ctx);
 899        job_lock();
 900    } else {
 901        /* No .cancel() means the job will behave as if force-cancelled */
 902        force = true;
 903    }
 904
 905    if (job->user_paused) {
 906        /* Do not call job_enter here, the caller will handle it.  */
 907        if (job->driver->user_resume) {
 908            job_unlock();
 909            job->driver->user_resume(job);
 910            job_lock();
 911        }
 912        job->user_paused = false;
 913        assert(job->pause_count > 0);
 914        job->pause_count--;
 915    }
 916
 917    /*
 918     * Ignore soft cancel requests after the job is already done
 919     * (We will still invoke job->driver->cancel() above, but if the
 920     * job driver supports soft cancelling and the job is done, that
 921     * should be a no-op, too.  We still call it so it can override
 922     * @force.)
 923     */
 924    if (force || !job->deferred_to_main_loop) {
 925        job->cancelled = true;
 926        /* To prevent 'force == false' overriding a previous 'force == true' */
 927        job->force_cancel |= force;
 928    }
 929}
 930
 931/*
 932 * Called with job_mutex held, but releases it temporarily.
 933 * Takes AioContext lock internally to invoke a job->driver callback.
 934 */
 935static void job_completed_txn_abort_locked(Job *job)
 936{
 937    JobTxn *txn = job->txn;
 938    Job *other_job;
 939
 940    if (txn->aborting) {
 941        /*
 942         * We are cancelled by another job, which will handle everything.
 943         */
 944        return;
 945    }
 946    txn->aborting = true;
 947    job_txn_ref_locked(txn);
 948
 949    job_ref_locked(job);
 950
 951    /* Other jobs are effectively cancelled by us, set the status for
 952     * them; this job, however, may or may not be cancelled, depending
 953     * on the caller, so leave it. */
 954    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
 955        if (other_job != job) {
 956            /*
 957             * This is a transaction: If one job failed, no result will matter.
 958             * Therefore, pass force=true to terminate all other jobs as quickly
 959             * as possible.
 960             */
 961            job_cancel_async_locked(other_job, true);
 962        }
 963    }
 964    while (!QLIST_EMPTY(&txn->jobs)) {
 965        other_job = QLIST_FIRST(&txn->jobs);
 966        if (!job_is_completed_locked(other_job)) {
 967            assert(job_cancel_requested_locked(other_job));
 968            job_finish_sync_locked(other_job, NULL, NULL);
 969        }
 970        job_finalize_single_locked(other_job);
 971    }
 972
 973    job_unref_locked(job);
 974    job_txn_unref_locked(txn);
 975}
 976
 977/* Called with job_mutex held, but releases it temporarily */
 978static int job_prepare_locked(Job *job)
 979{
 980    int ret;
 981    AioContext *ctx = job->aio_context;
 982
 983    GLOBAL_STATE_CODE();
 984
 985    if (job->ret == 0 && job->driver->prepare) {
 986        job_unlock();
 987        aio_context_acquire(ctx);
 988        ret = job->driver->prepare(job);
 989        aio_context_release(ctx);
 990        job_lock();
 991        job->ret = ret;
 992        job_update_rc_locked(job);
 993    }
 994
 995    return job->ret;
 996}
 997
 998/* Called with job_mutex held */
 999static int job_needs_finalize_locked(Job *job)
1000{
1001    return !job->auto_finalize;
1002}
1003
1004/* Called with job_mutex held */
1005static void job_do_finalize_locked(Job *job)
1006{
1007    int rc;
1008    assert(job && job->txn);
1009
1010    /* prepare the transaction to complete */
1011    rc = job_txn_apply_locked(job, job_prepare_locked);
1012    if (rc) {
1013        job_completed_txn_abort_locked(job);
1014    } else {
1015        job_txn_apply_locked(job, job_finalize_single_locked);
1016    }
1017}
1018
1019void job_finalize_locked(Job *job, Error **errp)
1020{
1021    assert(job && job->id);
1022    if (job_apply_verb_locked(job, JOB_VERB_FINALIZE, errp)) {
1023        return;
1024    }
1025    job_do_finalize_locked(job);
1026}
1027
1028/* Called with job_mutex held. */
1029static int job_transition_to_pending_locked(Job *job)
1030{
1031    job_state_transition_locked(job, JOB_STATUS_PENDING);
1032    if (!job->auto_finalize) {
1033        job_event_pending_locked(job);
1034    }
1035    return 0;
1036}
1037
1038void job_transition_to_ready(Job *job)
1039{
1040    JOB_LOCK_GUARD();
1041    job_state_transition_locked(job, JOB_STATUS_READY);
1042    job_event_ready_locked(job);
1043}
1044
1045/* Called with job_mutex held. */
1046static void job_completed_txn_success_locked(Job *job)
1047{
1048    JobTxn *txn = job->txn;
1049    Job *other_job;
1050
1051    job_state_transition_locked(job, JOB_STATUS_WAITING);
1052
1053    /*
1054     * Successful completion, see if there are other running jobs in this
1055     * txn.
1056     */
1057    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
1058        if (!job_is_completed_locked(other_job)) {
1059            return;
1060        }
1061        assert(other_job->ret == 0);
1062    }
1063
1064    job_txn_apply_locked(job, job_transition_to_pending_locked);
1065
1066    /* If no jobs need manual finalization, automatically do so */
1067    if (job_txn_apply_locked(job, job_needs_finalize_locked) == 0) {
1068        job_do_finalize_locked(job);
1069    }
1070}
1071
1072/* Called with job_mutex held. */
1073static void job_completed_locked(Job *job)
1074{
1075    assert(job && job->txn && !job_is_completed_locked(job));
1076
1077    job_update_rc_locked(job);
1078    trace_job_completed(job, job->ret);
1079    if (job->ret) {
1080        job_completed_txn_abort_locked(job);
1081    } else {
1082        job_completed_txn_success_locked(job);
1083    }
1084}
1085
1086/**
1087 * Useful only as a type shim for aio_bh_schedule_oneshot.
1088 * Called with job_mutex *not* held.
1089 */
1090static void job_exit(void *opaque)
1091{
1092    Job *job = (Job *)opaque;
1093    JOB_LOCK_GUARD();
1094    job_ref_locked(job);
1095
1096    /* This is a lie, we're not quiescent, but still doing the completion
1097     * callbacks. However, completion callbacks tend to involve operations that
1098     * drain block nodes, and if .drained_poll still returned true, we would
1099     * deadlock. */
1100    job->busy = false;
1101    job_event_idle_locked(job);
1102
1103    job_completed_locked(job);
1104    job_unref_locked(job);
1105}
1106
1107/**
1108 * All jobs must allow a pause point before entering their job proper. This
1109 * ensures that jobs can be paused prior to being started, then resumed later.
1110 */
1111static void coroutine_fn job_co_entry(void *opaque)
1112{
1113    Job *job = opaque;
1114    int ret;
1115
1116    assert(job && job->driver && job->driver->run);
1117    WITH_JOB_LOCK_GUARD() {
1118        assert(job->aio_context == qemu_get_current_aio_context());
1119        job_pause_point_locked(job);
1120    }
1121    ret = job->driver->run(job, &job->err);
1122    WITH_JOB_LOCK_GUARD() {
1123        job->ret = ret;
1124        job->deferred_to_main_loop = true;
1125        job->busy = true;
1126    }
1127    aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
1128}
1129
1130void job_start(Job *job)
1131{
1132    assert(qemu_in_main_thread());
1133
1134    WITH_JOB_LOCK_GUARD() {
1135        assert(job && !job_started_locked(job) && job->paused &&
1136            job->driver && job->driver->run);
1137        job->co = qemu_coroutine_create(job_co_entry, job);
1138        job->pause_count--;
1139        job->busy = true;
1140        job->paused = false;
1141        job_state_transition_locked(job, JOB_STATUS_RUNNING);
1142    }
1143    aio_co_enter(job->aio_context, job->co);
1144}
1145
1146void job_cancel_locked(Job *job, bool force)
1147{
1148    if (job->status == JOB_STATUS_CONCLUDED) {
1149        job_do_dismiss_locked(job);
1150        return;
1151    }
1152    job_cancel_async_locked(job, force);
1153    if (!job_started_locked(job)) {
1154        job_completed_locked(job);
1155    } else if (job->deferred_to_main_loop) {
1156        /*
1157         * job_cancel_async() ignores soft-cancel requests for jobs
1158         * that are already done (i.e. deferred to the main loop).  We
1159         * have to check again whether the job is really cancelled.
1160         * (job_cancel_requested() and job_is_cancelled() are equivalent
1161         * here, because job_cancel_async() will make soft-cancel
1162         * requests no-ops when deferred_to_main_loop is true.  We
1163         * choose to call job_is_cancelled() to show that we invoke
1164         * job_completed_txn_abort() only for force-cancelled jobs.)
1165         */
1166        if (job_is_cancelled_locked(job)) {
1167            job_completed_txn_abort_locked(job);
1168        }
1169    } else {
1170        job_enter_cond_locked(job, NULL);
1171    }
1172}
1173
1174void job_user_cancel_locked(Job *job, bool force, Error **errp)
1175{
1176    if (job_apply_verb_locked(job, JOB_VERB_CANCEL, errp)) {
1177        return;
1178    }
1179    job_cancel_locked(job, force);
1180}
1181
1182/* A wrapper around job_cancel_locked() taking an Error ** parameter so it may
1183 * be used with job_finish_sync_locked() without the need for (rather nasty)
1184 * function pointer casts there.
1185 *
1186 * Called with job_mutex held.
1187 */
1188static void job_cancel_err_locked(Job *job, Error **errp)
1189{
1190    job_cancel_locked(job, false);
1191}
1192
1193/**
1194 * Same as job_cancel_err(), but force-cancel.
1195 * Called with job_mutex held.
1196 */
1197static void job_force_cancel_err_locked(Job *job, Error **errp)
1198{
1199    job_cancel_locked(job, true);
1200}
1201
1202int job_cancel_sync_locked(Job *job, bool force)
1203{
1204    if (force) {
1205        return job_finish_sync_locked(job, &job_force_cancel_err_locked, NULL);
1206    } else {
1207        return job_finish_sync_locked(job, &job_cancel_err_locked, NULL);
1208    }
1209}
1210
1211int job_cancel_sync(Job *job, bool force)
1212{
1213    JOB_LOCK_GUARD();
1214    return job_cancel_sync_locked(job, force);
1215}
1216
1217void job_cancel_sync_all(void)
1218{
1219    Job *job;
1220    JOB_LOCK_GUARD();
1221
1222    while ((job = job_next_locked(NULL))) {
1223        job_cancel_sync_locked(job, true);
1224    }
1225}
1226
1227int job_complete_sync_locked(Job *job, Error **errp)
1228{
1229    return job_finish_sync_locked(job, job_complete_locked, errp);
1230}
1231
1232void job_complete_locked(Job *job, Error **errp)
1233{
1234    /* Should not be reachable via external interface for internal jobs */
1235    assert(job->id);
1236    GLOBAL_STATE_CODE();
1237    if (job_apply_verb_locked(job, JOB_VERB_COMPLETE, errp)) {
1238        return;
1239    }
1240    if (job_cancel_requested_locked(job) || !job->driver->complete) {
1241        error_setg(errp, "The active block job '%s' cannot be completed",
1242                   job->id);
1243        return;
1244    }
1245
1246    job_unlock();
1247    job->driver->complete(job, errp);
1248    job_lock();
1249}
1250
1251int job_finish_sync_locked(Job *job,
1252                           void (*finish)(Job *, Error **errp),
1253                           Error **errp)
1254{
1255    Error *local_err = NULL;
1256    int ret;
1257    GLOBAL_STATE_CODE();
1258
1259    job_ref_locked(job);
1260
1261    if (finish) {
1262        finish(job, &local_err);
1263    }
1264    if (local_err) {
1265        error_propagate(errp, local_err);
1266        job_unref_locked(job);
1267        return -EBUSY;
1268    }
1269
1270    job_unlock();
1271    AIO_WAIT_WHILE_UNLOCKED(job->aio_context,
1272                            (job_enter(job), !job_is_completed(job)));
1273    job_lock();
1274
1275    ret = (job_is_cancelled_locked(job) && job->ret == 0)
1276          ? -ECANCELED : job->ret;
1277    job_unref_locked(job);
1278    return ret;
1279}
1280