qemu/blockjob.c
<<
>>
Prefs
   1/*
   2 * QEMU System Emulator block driver
   3 *
   4 * Copyright (c) 2011 IBM Corp.
   5 * Copyright (c) 2012 Red Hat, Inc.
   6 *
   7 * Permission is hereby granted, free of charge, to any person obtaining a copy
   8 * of this software and associated documentation files (the "Software"), to deal
   9 * in the Software without restriction, including without limitation the rights
  10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 * copies of the Software, and to permit persons to whom the Software is
  12 * furnished to do so, subject to the following conditions:
  13 *
  14 * The above copyright notice and this permission notice shall be included in
  15 * all copies or substantial portions of the Software.
  16 *
  17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23 * THE SOFTWARE.
  24 */
  25
  26#include "qemu/osdep.h"
  27#include "block/aio-wait.h"
  28#include "block/block.h"
  29#include "block/blockjob_int.h"
  30#include "block/block_int.h"
  31#include "block/trace.h"
  32#include "sysemu/block-backend.h"
  33#include "qapi/error.h"
  34#include "qapi/qapi-events-block-core.h"
  35#include "qapi/qmp/qerror.h"
  36#include "qemu/main-loop.h"
  37#include "qemu/timer.h"
  38
  39static bool is_block_job(Job *job)
  40{
  41    return job_type(job) == JOB_TYPE_BACKUP ||
  42           job_type(job) == JOB_TYPE_COMMIT ||
  43           job_type(job) == JOB_TYPE_MIRROR ||
  44           job_type(job) == JOB_TYPE_STREAM;
  45}
  46
  47BlockJob *block_job_next_locked(BlockJob *bjob)
  48{
  49    Job *job = bjob ? &bjob->job : NULL;
  50    GLOBAL_STATE_CODE();
  51
  52    do {
  53        job = job_next_locked(job);
  54    } while (job && !is_block_job(job));
  55
  56    return job ? container_of(job, BlockJob, job) : NULL;
  57}
  58
  59BlockJob *block_job_get_locked(const char *id)
  60{
  61    Job *job = job_get_locked(id);
  62    GLOBAL_STATE_CODE();
  63
  64    if (job && is_block_job(job)) {
  65        return container_of(job, BlockJob, job);
  66    } else {
  67        return NULL;
  68    }
  69}
  70
  71BlockJob *block_job_get(const char *id)
  72{
  73    JOB_LOCK_GUARD();
  74    return block_job_get_locked(id);
  75}
  76
  77void block_job_free(Job *job)
  78{
  79    BlockJob *bjob = container_of(job, BlockJob, job);
  80    GLOBAL_STATE_CODE();
  81
  82    block_job_remove_all_bdrv(bjob);
  83    ratelimit_destroy(&bjob->limit);
  84    error_free(bjob->blocker);
  85}
  86
  87static char *child_job_get_parent_desc(BdrvChild *c)
  88{
  89    BlockJob *job = c->opaque;
  90    return g_strdup_printf("%s job '%s'", job_type_str(&job->job), job->job.id);
  91}
  92
  93static void child_job_drained_begin(BdrvChild *c)
  94{
  95    BlockJob *job = c->opaque;
  96    job_pause(&job->job);
  97}
  98
  99static bool child_job_drained_poll(BdrvChild *c)
 100{
 101    BlockJob *bjob = c->opaque;
 102    Job *job = &bjob->job;
 103    const BlockJobDriver *drv = block_job_driver(bjob);
 104
 105    /* An inactive or completed job doesn't have any pending requests. Jobs
 106     * with !job->busy are either already paused or have a pause point after
 107     * being reentered, so no job driver code will run before they pause. */
 108    WITH_JOB_LOCK_GUARD() {
 109        if (!job->busy || job_is_completed_locked(job)) {
 110            return false;
 111        }
 112    }
 113
 114    /* Otherwise, assume that it isn't fully stopped yet, but allow the job to
 115     * override this assumption. */
 116    if (drv->drained_poll) {
 117        return drv->drained_poll(bjob);
 118    } else {
 119        return true;
 120    }
 121}
 122
 123static void child_job_drained_end(BdrvChild *c)
 124{
 125    BlockJob *job = c->opaque;
 126    job_resume(&job->job);
 127}
 128
 129typedef struct BdrvStateChildJobContext {
 130    AioContext *new_ctx;
 131    BlockJob *job;
 132} BdrvStateChildJobContext;
 133
 134static void child_job_set_aio_ctx_commit(void *opaque)
 135{
 136    BdrvStateChildJobContext *s = opaque;
 137    BlockJob *job = s->job;
 138
 139    job_set_aio_context(&job->job, s->new_ctx);
 140}
 141
 142static TransactionActionDrv change_child_job_context = {
 143    .commit = child_job_set_aio_ctx_commit,
 144    .clean = g_free,
 145};
 146
 147static bool child_job_change_aio_ctx(BdrvChild *c, AioContext *ctx,
 148                                     GHashTable *visited, Transaction *tran,
 149                                     Error **errp)
 150{
 151    BlockJob *job = c->opaque;
 152    BdrvStateChildJobContext *s;
 153    GSList *l;
 154
 155    for (l = job->nodes; l; l = l->next) {
 156        BdrvChild *sibling = l->data;
 157        if (!bdrv_child_change_aio_context(sibling, ctx, visited,
 158                                           tran, errp)) {
 159            return false;
 160        }
 161    }
 162
 163    s = g_new(BdrvStateChildJobContext, 1);
 164    *s = (BdrvStateChildJobContext) {
 165        .new_ctx = ctx,
 166        .job = job,
 167    };
 168
 169    tran_add(tran, &change_child_job_context, s);
 170    return true;
 171}
 172
 173static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
 174{
 175    BlockJob *job = c->opaque;
 176    IO_CODE();
 177    JOB_LOCK_GUARD();
 178
 179    return job->job.aio_context;
 180}
 181
 182static const BdrvChildClass child_job = {
 183    .get_parent_desc    = child_job_get_parent_desc,
 184    .drained_begin      = child_job_drained_begin,
 185    .drained_poll       = child_job_drained_poll,
 186    .drained_end        = child_job_drained_end,
 187    .change_aio_ctx     = child_job_change_aio_ctx,
 188    .stay_at_node       = true,
 189    .get_parent_aio_context = child_job_get_parent_aio_context,
 190};
 191
 192void block_job_remove_all_bdrv(BlockJob *job)
 193{
 194    GLOBAL_STATE_CODE();
 195    /*
 196     * bdrv_root_unref_child() may reach child_job_[can_]set_aio_ctx(),
 197     * which will also traverse job->nodes, so consume the list one by
 198     * one to make sure that such a concurrent access does not attempt
 199     * to process an already freed BdrvChild.
 200     */
 201    while (job->nodes) {
 202        GSList *l = job->nodes;
 203        BdrvChild *c = l->data;
 204
 205        job->nodes = l->next;
 206
 207        bdrv_op_unblock_all(c->bs, job->blocker);
 208        bdrv_root_unref_child(c);
 209
 210        g_slist_free_1(l);
 211    }
 212}
 213
 214bool block_job_has_bdrv(BlockJob *job, BlockDriverState *bs)
 215{
 216    GSList *el;
 217    GLOBAL_STATE_CODE();
 218
 219    for (el = job->nodes; el; el = el->next) {
 220        BdrvChild *c = el->data;
 221        if (c->bs == bs) {
 222            return true;
 223        }
 224    }
 225
 226    return false;
 227}
 228
 229int block_job_add_bdrv(BlockJob *job, const char *name, BlockDriverState *bs,
 230                       uint64_t perm, uint64_t shared_perm, Error **errp)
 231{
 232    BdrvChild *c;
 233    AioContext *ctx = bdrv_get_aio_context(bs);
 234    bool need_context_ops;
 235    GLOBAL_STATE_CODE();
 236
 237    bdrv_ref(bs);
 238
 239    need_context_ops = ctx != job->job.aio_context;
 240
 241    if (need_context_ops) {
 242        if (job->job.aio_context != qemu_get_aio_context()) {
 243            aio_context_release(job->job.aio_context);
 244        }
 245        aio_context_acquire(ctx);
 246    }
 247    c = bdrv_root_attach_child(bs, name, &child_job, 0, perm, shared_perm, job,
 248                               errp);
 249    if (need_context_ops) {
 250        aio_context_release(ctx);
 251        if (job->job.aio_context != qemu_get_aio_context()) {
 252            aio_context_acquire(job->job.aio_context);
 253        }
 254    }
 255    if (c == NULL) {
 256        return -EPERM;
 257    }
 258
 259    job->nodes = g_slist_prepend(job->nodes, c);
 260    bdrv_op_block_all(bs, job->blocker);
 261
 262    return 0;
 263}
 264
 265/* Called with job_mutex lock held. */
 266static void block_job_on_idle_locked(Notifier *n, void *opaque)
 267{
 268    aio_wait_kick();
 269}
 270
 271bool block_job_is_internal(BlockJob *job)
 272{
 273    return (job->job.id == NULL);
 274}
 275
 276const BlockJobDriver *block_job_driver(BlockJob *job)
 277{
 278    return container_of(job->job.driver, BlockJobDriver, job_driver);
 279}
 280
 281/* Assumes the job_mutex is held */
 282static bool job_timer_pending(Job *job)
 283{
 284    return timer_pending(&job->sleep_timer);
 285}
 286
 287bool block_job_set_speed_locked(BlockJob *job, int64_t speed, Error **errp)
 288{
 289    const BlockJobDriver *drv = block_job_driver(job);
 290    int64_t old_speed = job->speed;
 291
 292    GLOBAL_STATE_CODE();
 293
 294    if (job_apply_verb_locked(&job->job, JOB_VERB_SET_SPEED, errp) < 0) {
 295        return false;
 296    }
 297    if (speed < 0) {
 298        error_setg(errp, QERR_INVALID_PARAMETER_VALUE, "speed",
 299                   "a non-negative value");
 300        return false;
 301    }
 302
 303    ratelimit_set_speed(&job->limit, speed, BLOCK_JOB_SLICE_TIME);
 304
 305    job->speed = speed;
 306
 307    if (drv->set_speed) {
 308        job_unlock();
 309        drv->set_speed(job, speed);
 310        job_lock();
 311    }
 312
 313    if (speed && speed <= old_speed) {
 314        return true;
 315    }
 316
 317    /* kick only if a timer is pending */
 318    job_enter_cond_locked(&job->job, job_timer_pending);
 319
 320    return true;
 321}
 322
 323static bool block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
 324{
 325    JOB_LOCK_GUARD();
 326    return block_job_set_speed_locked(job, speed, errp);
 327}
 328
 329void block_job_ratelimit_processed_bytes(BlockJob *job, uint64_t n)
 330{
 331    IO_CODE();
 332    ratelimit_calculate_delay(&job->limit, n);
 333}
 334
 335void block_job_ratelimit_sleep(BlockJob *job)
 336{
 337    uint64_t delay_ns;
 338
 339    /*
 340     * Sleep at least once. If the job is reentered early, keep waiting until
 341     * we've waited for the full time that is necessary to keep the job at the
 342     * right speed.
 343     *
 344     * Make sure to recalculate the delay after each (possibly interrupted)
 345     * sleep because the speed can change while the job has yielded.
 346     */
 347    do {
 348        delay_ns = ratelimit_calculate_delay(&job->limit, 0);
 349        job_sleep_ns(&job->job, delay_ns);
 350    } while (delay_ns && !job_is_cancelled(&job->job));
 351}
 352
 353BlockJobInfo *block_job_query_locked(BlockJob *job, Error **errp)
 354{
 355    BlockJobInfo *info;
 356    uint64_t progress_current, progress_total;
 357
 358    GLOBAL_STATE_CODE();
 359
 360    if (block_job_is_internal(job)) {
 361        error_setg(errp, "Cannot query QEMU internal jobs");
 362        return NULL;
 363    }
 364
 365    progress_get_snapshot(&job->job.progress, &progress_current,
 366                          &progress_total);
 367
 368    info = g_new0(BlockJobInfo, 1);
 369    info->type      = g_strdup(job_type_str(&job->job));
 370    info->device    = g_strdup(job->job.id);
 371    info->busy      = job->job.busy;
 372    info->paused    = job->job.pause_count > 0;
 373    info->offset    = progress_current;
 374    info->len       = progress_total;
 375    info->speed     = job->speed;
 376    info->io_status = job->iostatus;
 377    info->ready     = job_is_ready_locked(&job->job),
 378    info->status    = job->job.status;
 379    info->auto_finalize = job->job.auto_finalize;
 380    info->auto_dismiss  = job->job.auto_dismiss;
 381    if (job->job.ret) {
 382        info->error = job->job.err ?
 383                        g_strdup(error_get_pretty(job->job.err)) :
 384                        g_strdup(strerror(-job->job.ret));
 385    }
 386    return info;
 387}
 388
 389/* Called with job lock held */
 390static void block_job_iostatus_set_err_locked(BlockJob *job, int error)
 391{
 392    if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
 393        job->iostatus = error == ENOSPC ? BLOCK_DEVICE_IO_STATUS_NOSPACE :
 394                                          BLOCK_DEVICE_IO_STATUS_FAILED;
 395    }
 396}
 397
 398/* Called with job_mutex lock held. */
 399static void block_job_event_cancelled_locked(Notifier *n, void *opaque)
 400{
 401    BlockJob *job = opaque;
 402    uint64_t progress_current, progress_total;
 403
 404    if (block_job_is_internal(job)) {
 405        return;
 406    }
 407
 408    progress_get_snapshot(&job->job.progress, &progress_current,
 409                          &progress_total);
 410
 411    qapi_event_send_block_job_cancelled(job_type(&job->job),
 412                                        job->job.id,
 413                                        progress_total,
 414                                        progress_current,
 415                                        job->speed);
 416}
 417
 418/* Called with job_mutex lock held. */
 419static void block_job_event_completed_locked(Notifier *n, void *opaque)
 420{
 421    BlockJob *job = opaque;
 422    const char *msg = NULL;
 423    uint64_t progress_current, progress_total;
 424
 425    if (block_job_is_internal(job)) {
 426        return;
 427    }
 428
 429    if (job->job.ret < 0) {
 430        msg = error_get_pretty(job->job.err);
 431    }
 432
 433    progress_get_snapshot(&job->job.progress, &progress_current,
 434                          &progress_total);
 435
 436    qapi_event_send_block_job_completed(job_type(&job->job),
 437                                        job->job.id,
 438                                        progress_total,
 439                                        progress_current,
 440                                        job->speed,
 441                                        msg);
 442}
 443
 444/* Called with job_mutex lock held. */
 445static void block_job_event_pending_locked(Notifier *n, void *opaque)
 446{
 447    BlockJob *job = opaque;
 448
 449    if (block_job_is_internal(job)) {
 450        return;
 451    }
 452
 453    qapi_event_send_block_job_pending(job_type(&job->job),
 454                                      job->job.id);
 455}
 456
 457/* Called with job_mutex lock held. */
 458static void block_job_event_ready_locked(Notifier *n, void *opaque)
 459{
 460    BlockJob *job = opaque;
 461    uint64_t progress_current, progress_total;
 462
 463    if (block_job_is_internal(job)) {
 464        return;
 465    }
 466
 467    progress_get_snapshot(&job->job.progress, &progress_current,
 468                          &progress_total);
 469
 470    qapi_event_send_block_job_ready(job_type(&job->job),
 471                                    job->job.id,
 472                                    progress_total,
 473                                    progress_current,
 474                                    job->speed);
 475}
 476
 477
 478void *block_job_create(const char *job_id, const BlockJobDriver *driver,
 479                       JobTxn *txn, BlockDriverState *bs, uint64_t perm,
 480                       uint64_t shared_perm, int64_t speed, int flags,
 481                       BlockCompletionFunc *cb, void *opaque, Error **errp)
 482{
 483    BlockJob *job;
 484    int ret;
 485    GLOBAL_STATE_CODE();
 486
 487    if (job_id == NULL && !(flags & JOB_INTERNAL)) {
 488        job_id = bdrv_get_device_name(bs);
 489    }
 490
 491    job = job_create(job_id, &driver->job_driver, txn, bdrv_get_aio_context(bs),
 492                     flags, cb, opaque, errp);
 493    if (job == NULL) {
 494        return NULL;
 495    }
 496
 497    assert(is_block_job(&job->job));
 498    assert(job->job.driver->free == &block_job_free);
 499    assert(job->job.driver->user_resume == &block_job_user_resume);
 500
 501    ratelimit_init(&job->limit);
 502
 503    job->finalize_cancelled_notifier.notify = block_job_event_cancelled_locked;
 504    job->finalize_completed_notifier.notify = block_job_event_completed_locked;
 505    job->pending_notifier.notify = block_job_event_pending_locked;
 506    job->ready_notifier.notify = block_job_event_ready_locked;
 507    job->idle_notifier.notify = block_job_on_idle_locked;
 508
 509    WITH_JOB_LOCK_GUARD() {
 510        notifier_list_add(&job->job.on_finalize_cancelled,
 511                          &job->finalize_cancelled_notifier);
 512        notifier_list_add(&job->job.on_finalize_completed,
 513                          &job->finalize_completed_notifier);
 514        notifier_list_add(&job->job.on_pending, &job->pending_notifier);
 515        notifier_list_add(&job->job.on_ready, &job->ready_notifier);
 516        notifier_list_add(&job->job.on_idle, &job->idle_notifier);
 517    }
 518
 519    error_setg(&job->blocker, "block device is in use by block job: %s",
 520               job_type_str(&job->job));
 521
 522    ret = block_job_add_bdrv(job, "main node", bs, perm, shared_perm, errp);
 523    if (ret < 0) {
 524        goto fail;
 525    }
 526
 527    bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker);
 528
 529    if (!block_job_set_speed(job, speed, errp)) {
 530        goto fail;
 531    }
 532
 533    return job;
 534
 535fail:
 536    job_early_fail(&job->job);
 537    return NULL;
 538}
 539
 540void block_job_iostatus_reset_locked(BlockJob *job)
 541{
 542    GLOBAL_STATE_CODE();
 543    if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
 544        return;
 545    }
 546    assert(job->job.user_paused && job->job.pause_count > 0);
 547    job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
 548}
 549
 550static void block_job_iostatus_reset(BlockJob *job)
 551{
 552    JOB_LOCK_GUARD();
 553    block_job_iostatus_reset_locked(job);
 554}
 555
 556void block_job_user_resume(Job *job)
 557{
 558    BlockJob *bjob = container_of(job, BlockJob, job);
 559    GLOBAL_STATE_CODE();
 560    block_job_iostatus_reset(bjob);
 561}
 562
 563BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
 564                                        int is_read, int error)
 565{
 566    BlockErrorAction action;
 567    IO_CODE();
 568
 569    switch (on_err) {
 570    case BLOCKDEV_ON_ERROR_ENOSPC:
 571    case BLOCKDEV_ON_ERROR_AUTO:
 572        action = (error == ENOSPC) ?
 573                 BLOCK_ERROR_ACTION_STOP : BLOCK_ERROR_ACTION_REPORT;
 574        break;
 575    case BLOCKDEV_ON_ERROR_STOP:
 576        action = BLOCK_ERROR_ACTION_STOP;
 577        break;
 578    case BLOCKDEV_ON_ERROR_REPORT:
 579        action = BLOCK_ERROR_ACTION_REPORT;
 580        break;
 581    case BLOCKDEV_ON_ERROR_IGNORE:
 582        action = BLOCK_ERROR_ACTION_IGNORE;
 583        break;
 584    default:
 585        abort();
 586    }
 587    if (!block_job_is_internal(job)) {
 588        qapi_event_send_block_job_error(job->job.id,
 589                                        is_read ? IO_OPERATION_TYPE_READ :
 590                                        IO_OPERATION_TYPE_WRITE,
 591                                        action);
 592    }
 593    if (action == BLOCK_ERROR_ACTION_STOP) {
 594        WITH_JOB_LOCK_GUARD() {
 595            if (!job->job.user_paused) {
 596                job_pause_locked(&job->job);
 597                /*
 598                 * make the pause user visible, which will be
 599                 * resumed from QMP.
 600                 */
 601                job->job.user_paused = true;
 602            }
 603            block_job_iostatus_set_err_locked(job, error);
 604        }
 605    }
 606    return action;
 607}
 608
 609AioContext *block_job_get_aio_context(BlockJob *job)
 610{
 611    GLOBAL_STATE_CODE();
 612    return job->job.aio_context;
 613}
 614