qemu/blockjob.c
<<
>>
Prefs
   1/*
   2 * QEMU System Emulator block driver
   3 *
   4 * Copyright (c) 2011 IBM Corp.
   5 * Copyright (c) 2012 Red Hat, Inc.
   6 *
   7 * Permission is hereby granted, free of charge, to any person obtaining a copy
   8 * of this software and associated documentation files (the "Software"), to deal
   9 * in the Software without restriction, including without limitation the rights
  10 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  11 * copies of the Software, and to permit persons to whom the Software is
  12 * furnished to do so, subject to the following conditions:
  13 *
  14 * The above copyright notice and this permission notice shall be included in
  15 * all copies or substantial portions of the Software.
  16 *
  17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
  20 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  22 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  23 * THE SOFTWARE.
  24 */
  25
  26#include "qemu/osdep.h"
  27#include "qemu-common.h"
  28#include "trace.h"
  29#include "block/block.h"
  30#include "block/blockjob_int.h"
  31#include "block/block_int.h"
  32#include "sysemu/block-backend.h"
  33#include "qapi/qmp/qerror.h"
  34#include "qapi/qmp/qjson.h"
  35#include "qemu/coroutine.h"
  36#include "qemu/id.h"
  37#include "qmp-commands.h"
  38#include "qemu/timer.h"
  39#include "qapi-event.h"
  40
  41static void block_job_event_cancelled(BlockJob *job);
  42static void block_job_event_completed(BlockJob *job, const char *msg);
  43
  44/* Transactional group of block jobs */
  45struct BlockJobTxn {
  46
  47    /* Is this txn being cancelled? */
  48    bool aborting;
  49
  50    /* List of jobs */
  51    QLIST_HEAD(, BlockJob) jobs;
  52
  53    /* Reference count */
  54    int refcnt;
  55};
  56
  57static QLIST_HEAD(, BlockJob) block_jobs = QLIST_HEAD_INITIALIZER(block_jobs);
  58
  59BlockJob *block_job_next(BlockJob *job)
  60{
  61    if (!job) {
  62        return QLIST_FIRST(&block_jobs);
  63    }
  64    return QLIST_NEXT(job, job_list);
  65}
  66
  67BlockJob *block_job_get(const char *id)
  68{
  69    BlockJob *job;
  70
  71    QLIST_FOREACH(job, &block_jobs, job_list) {
  72        if (job->id && !strcmp(id, job->id)) {
  73            return job;
  74        }
  75    }
  76
  77    return NULL;
  78}
  79
  80static void block_job_attached_aio_context(AioContext *new_context,
  81                                           void *opaque)
  82{
  83    BlockJob *job = opaque;
  84
  85    if (job->driver->attached_aio_context) {
  86        job->driver->attached_aio_context(job, new_context);
  87    }
  88
  89    block_job_resume(job);
  90}
  91
  92static void block_job_drain(BlockJob *job)
  93{
  94    /* If job is !job->busy this kicks it into the next pause point. */
  95    block_job_enter(job);
  96
  97    blk_drain(job->blk);
  98    if (job->driver->drain) {
  99        job->driver->drain(job);
 100    }
 101}
 102
 103static void block_job_detach_aio_context(void *opaque)
 104{
 105    BlockJob *job = opaque;
 106
 107    /* In case the job terminates during aio_poll()... */
 108    block_job_ref(job);
 109
 110    block_job_pause(job);
 111
 112    while (!job->paused && !job->completed) {
 113        block_job_drain(job);
 114    }
 115
 116    block_job_unref(job);
 117}
 118
 119void block_job_add_bdrv(BlockJob *job, BlockDriverState *bs)
 120{
 121    job->nodes = g_slist_prepend(job->nodes, bs);
 122    bdrv_ref(bs);
 123    bdrv_op_block_all(bs, job->blocker);
 124}
 125
 126void *block_job_create(const char *job_id, const BlockJobDriver *driver,
 127                       BlockDriverState *bs, int64_t speed, int flags,
 128                       BlockCompletionFunc *cb, void *opaque, Error **errp)
 129{
 130    BlockBackend *blk;
 131    BlockJob *job;
 132
 133    if (bs->job) {
 134        error_setg(errp, QERR_DEVICE_IN_USE, bdrv_get_device_name(bs));
 135        return NULL;
 136    }
 137
 138    if (job_id == NULL && !(flags & BLOCK_JOB_INTERNAL)) {
 139        job_id = bdrv_get_device_name(bs);
 140        if (!*job_id) {
 141            error_setg(errp, "An explicit job ID is required for this node");
 142            return NULL;
 143        }
 144    }
 145
 146    if (job_id) {
 147        if (flags & BLOCK_JOB_INTERNAL) {
 148            error_setg(errp, "Cannot specify job ID for internal block job");
 149            return NULL;
 150        }
 151
 152        if (!id_wellformed(job_id)) {
 153            error_setg(errp, "Invalid job ID '%s'", job_id);
 154            return NULL;
 155        }
 156
 157        if (block_job_get(job_id)) {
 158            error_setg(errp, "Job ID '%s' already in use", job_id);
 159            return NULL;
 160        }
 161    }
 162
 163    blk = blk_new();
 164    blk_insert_bs(blk, bs);
 165
 166    job = g_malloc0(driver->instance_size);
 167    error_setg(&job->blocker, "block device is in use by block job: %s",
 168               BlockJobType_lookup[driver->job_type]);
 169    block_job_add_bdrv(job, bs);
 170    bdrv_op_unblock(bs, BLOCK_OP_TYPE_DATAPLANE, job->blocker);
 171
 172    job->driver        = driver;
 173    job->id            = g_strdup(job_id);
 174    job->blk           = blk;
 175    job->cb            = cb;
 176    job->opaque        = opaque;
 177    job->busy          = false;
 178    job->paused        = true;
 179    job->pause_count   = 1;
 180    job->refcnt        = 1;
 181    bs->job = job;
 182
 183    QLIST_INSERT_HEAD(&block_jobs, job, job_list);
 184
 185    blk_add_aio_context_notifier(blk, block_job_attached_aio_context,
 186                                 block_job_detach_aio_context, job);
 187
 188    /* Only set speed when necessary to avoid NotSupported error */
 189    if (speed != 0) {
 190        Error *local_err = NULL;
 191
 192        block_job_set_speed(job, speed, &local_err);
 193        if (local_err) {
 194            block_job_unref(job);
 195            error_propagate(errp, local_err);
 196            return NULL;
 197        }
 198    }
 199    return job;
 200}
 201
 202bool block_job_is_internal(BlockJob *job)
 203{
 204    return (job->id == NULL);
 205}
 206
 207static bool block_job_started(BlockJob *job)
 208{
 209    return job->co;
 210}
 211
 212void block_job_start(BlockJob *job)
 213{
 214    assert(job && !block_job_started(job) && job->paused &&
 215           !job->busy && job->driver->start);
 216    job->co = qemu_coroutine_create(job->driver->start, job);
 217    if (--job->pause_count == 0) {
 218        job->paused = false;
 219        job->busy = true;
 220        qemu_coroutine_enter(job->co);
 221    }
 222}
 223
 224void block_job_ref(BlockJob *job)
 225{
 226    ++job->refcnt;
 227}
 228
 229void block_job_unref(BlockJob *job)
 230{
 231    if (--job->refcnt == 0) {
 232        GSList *l;
 233        BlockDriverState *bs = blk_bs(job->blk);
 234        bs->job = NULL;
 235        for (l = job->nodes; l; l = l->next) {
 236            bs = l->data;
 237            bdrv_op_unblock_all(bs, job->blocker);
 238            bdrv_unref(bs);
 239        }
 240        g_slist_free(job->nodes);
 241        blk_remove_aio_context_notifier(job->blk,
 242                                        block_job_attached_aio_context,
 243                                        block_job_detach_aio_context, job);
 244        blk_unref(job->blk);
 245        error_free(job->blocker);
 246        g_free(job->id);
 247        QLIST_REMOVE(job, job_list);
 248        g_free(job);
 249    }
 250}
 251
 252static void block_job_completed_single(BlockJob *job)
 253{
 254    if (!job->ret) {
 255        if (job->driver->commit) {
 256            job->driver->commit(job);
 257        }
 258    } else {
 259        if (job->driver->abort) {
 260            job->driver->abort(job);
 261        }
 262    }
 263    if (job->driver->clean) {
 264        job->driver->clean(job);
 265    }
 266
 267    if (job->cb) {
 268        job->cb(job->opaque, job->ret);
 269    }
 270
 271    /* Emit events only if we actually started */
 272    if (block_job_started(job)) {
 273        if (block_job_is_cancelled(job)) {
 274            block_job_event_cancelled(job);
 275        } else {
 276            const char *msg = NULL;
 277            if (job->ret < 0) {
 278                msg = strerror(-job->ret);
 279            }
 280            block_job_event_completed(job, msg);
 281        }
 282    }
 283
 284    if (job->txn) {
 285        QLIST_REMOVE(job, txn_list);
 286        block_job_txn_unref(job->txn);
 287    }
 288    block_job_unref(job);
 289}
 290
 291static void block_job_completed_txn_abort(BlockJob *job)
 292{
 293    AioContext *ctx;
 294    BlockJobTxn *txn = job->txn;
 295    BlockJob *other_job, *next;
 296
 297    if (txn->aborting) {
 298        /*
 299         * We are cancelled by another job, which will handle everything.
 300         */
 301        return;
 302    }
 303    txn->aborting = true;
 304    /* We are the first failed job. Cancel other jobs. */
 305    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
 306        ctx = blk_get_aio_context(other_job->blk);
 307        aio_context_acquire(ctx);
 308    }
 309    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
 310        if (other_job == job || other_job->completed) {
 311            /* Other jobs are "effectively" cancelled by us, set the status for
 312             * them; this job, however, may or may not be cancelled, depending
 313             * on the caller, so leave it. */
 314            if (other_job != job) {
 315                other_job->cancelled = true;
 316            }
 317            continue;
 318        }
 319        block_job_cancel_sync(other_job);
 320        assert(other_job->completed);
 321    }
 322    QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
 323        ctx = blk_get_aio_context(other_job->blk);
 324        block_job_completed_single(other_job);
 325        aio_context_release(ctx);
 326    }
 327}
 328
 329static void block_job_completed_txn_success(BlockJob *job)
 330{
 331    AioContext *ctx;
 332    BlockJobTxn *txn = job->txn;
 333    BlockJob *other_job, *next;
 334    /*
 335     * Successful completion, see if there are other running jobs in this
 336     * txn.
 337     */
 338    QLIST_FOREACH(other_job, &txn->jobs, txn_list) {
 339        if (!other_job->completed) {
 340            return;
 341        }
 342    }
 343    /* We are the last completed job, commit the transaction. */
 344    QLIST_FOREACH_SAFE(other_job, &txn->jobs, txn_list, next) {
 345        ctx = blk_get_aio_context(other_job->blk);
 346        aio_context_acquire(ctx);
 347        assert(other_job->ret == 0);
 348        block_job_completed_single(other_job);
 349        aio_context_release(ctx);
 350    }
 351}
 352
 353void block_job_completed(BlockJob *job, int ret)
 354{
 355    assert(blk_bs(job->blk)->job == job);
 356    assert(!job->completed);
 357    job->completed = true;
 358    job->ret = ret;
 359    if (!job->txn) {
 360        block_job_completed_single(job);
 361    } else if (ret < 0 || block_job_is_cancelled(job)) {
 362        block_job_completed_txn_abort(job);
 363    } else {
 364        block_job_completed_txn_success(job);
 365    }
 366}
 367
 368void block_job_set_speed(BlockJob *job, int64_t speed, Error **errp)
 369{
 370    Error *local_err = NULL;
 371
 372    if (!job->driver->set_speed) {
 373        error_setg(errp, QERR_UNSUPPORTED);
 374        return;
 375    }
 376    job->driver->set_speed(job, speed, &local_err);
 377    if (local_err) {
 378        error_propagate(errp, local_err);
 379        return;
 380    }
 381
 382    job->speed = speed;
 383}
 384
 385void block_job_complete(BlockJob *job, Error **errp)
 386{
 387    /* Should not be reachable via external interface for internal jobs */
 388    assert(job->id);
 389    if (job->pause_count || job->cancelled ||
 390        !block_job_started(job) || !job->driver->complete) {
 391        error_setg(errp, "The active block job '%s' cannot be completed",
 392                   job->id);
 393        return;
 394    }
 395
 396    job->driver->complete(job, errp);
 397}
 398
 399void block_job_pause(BlockJob *job)
 400{
 401    job->pause_count++;
 402}
 403
 404void block_job_user_pause(BlockJob *job)
 405{
 406    job->user_paused = true;
 407    block_job_pause(job);
 408}
 409
 410static bool block_job_should_pause(BlockJob *job)
 411{
 412    return job->pause_count > 0;
 413}
 414
 415bool block_job_user_paused(BlockJob *job)
 416{
 417    return job ? job->user_paused : 0;
 418}
 419
 420void coroutine_fn block_job_pause_point(BlockJob *job)
 421{
 422    assert(job && block_job_started(job));
 423
 424    if (!block_job_should_pause(job)) {
 425        return;
 426    }
 427    if (block_job_is_cancelled(job)) {
 428        return;
 429    }
 430
 431    if (job->driver->pause) {
 432        job->driver->pause(job);
 433    }
 434
 435    if (block_job_should_pause(job) && !block_job_is_cancelled(job)) {
 436        job->paused = true;
 437        job->busy = false;
 438        qemu_coroutine_yield(); /* wait for block_job_resume() */
 439        job->busy = true;
 440        job->paused = false;
 441    }
 442
 443    if (job->driver->resume) {
 444        job->driver->resume(job);
 445    }
 446}
 447
 448void block_job_resume(BlockJob *job)
 449{
 450    assert(job->pause_count > 0);
 451    job->pause_count--;
 452    if (job->pause_count) {
 453        return;
 454    }
 455    block_job_enter(job);
 456}
 457
 458void block_job_user_resume(BlockJob *job)
 459{
 460    if (job && job->user_paused && job->pause_count > 0) {
 461        job->user_paused = false;
 462        block_job_resume(job);
 463    }
 464}
 465
 466void block_job_enter(BlockJob *job)
 467{
 468    if (job->co && !job->busy) {
 469        qemu_coroutine_enter(job->co);
 470    }
 471}
 472
 473void block_job_cancel(BlockJob *job)
 474{
 475    if (block_job_started(job)) {
 476        job->cancelled = true;
 477        block_job_iostatus_reset(job);
 478        block_job_enter(job);
 479    } else {
 480        block_job_completed(job, -ECANCELED);
 481    }
 482}
 483
 484bool block_job_is_cancelled(BlockJob *job)
 485{
 486    return job->cancelled;
 487}
 488
 489void block_job_iostatus_reset(BlockJob *job)
 490{
 491    job->iostatus = BLOCK_DEVICE_IO_STATUS_OK;
 492    if (job->driver->iostatus_reset) {
 493        job->driver->iostatus_reset(job);
 494    }
 495}
 496
 497static int block_job_finish_sync(BlockJob *job,
 498                                 void (*finish)(BlockJob *, Error **errp),
 499                                 Error **errp)
 500{
 501    Error *local_err = NULL;
 502    int ret;
 503
 504    assert(blk_bs(job->blk)->job == job);
 505
 506    block_job_ref(job);
 507
 508    finish(job, &local_err);
 509    if (local_err) {
 510        error_propagate(errp, local_err);
 511        block_job_unref(job);
 512        return -EBUSY;
 513    }
 514    /* block_job_drain calls block_job_enter, and it should be enough to
 515     * induce progress until the job completes or moves to the main thread.
 516    */
 517    while (!job->deferred_to_main_loop && !job->completed) {
 518        block_job_drain(job);
 519    }
 520    while (!job->completed) {
 521        aio_poll(qemu_get_aio_context(), true);
 522    }
 523    ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
 524    block_job_unref(job);
 525    return ret;
 526}
 527
 528/* A wrapper around block_job_cancel() taking an Error ** parameter so it may be
 529 * used with block_job_finish_sync() without the need for (rather nasty)
 530 * function pointer casts there. */
 531static void block_job_cancel_err(BlockJob *job, Error **errp)
 532{
 533    block_job_cancel(job);
 534}
 535
 536int block_job_cancel_sync(BlockJob *job)
 537{
 538    return block_job_finish_sync(job, &block_job_cancel_err, NULL);
 539}
 540
 541void block_job_cancel_sync_all(void)
 542{
 543    BlockJob *job;
 544    AioContext *aio_context;
 545
 546    while ((job = QLIST_FIRST(&block_jobs))) {
 547        aio_context = blk_get_aio_context(job->blk);
 548        aio_context_acquire(aio_context);
 549        block_job_cancel_sync(job);
 550        aio_context_release(aio_context);
 551    }
 552}
 553
 554int block_job_complete_sync(BlockJob *job, Error **errp)
 555{
 556    return block_job_finish_sync(job, &block_job_complete, errp);
 557}
 558
 559void block_job_sleep_ns(BlockJob *job, QEMUClockType type, int64_t ns)
 560{
 561    assert(job->busy);
 562
 563    /* Check cancellation *before* setting busy = false, too!  */
 564    if (block_job_is_cancelled(job)) {
 565        return;
 566    }
 567
 568    job->busy = false;
 569    if (!block_job_should_pause(job)) {
 570        co_aio_sleep_ns(blk_get_aio_context(job->blk), type, ns);
 571    }
 572    job->busy = true;
 573
 574    block_job_pause_point(job);
 575}
 576
 577void block_job_yield(BlockJob *job)
 578{
 579    assert(job->busy);
 580
 581    /* Check cancellation *before* setting busy = false, too!  */
 582    if (block_job_is_cancelled(job)) {
 583        return;
 584    }
 585
 586    job->busy = false;
 587    if (!block_job_should_pause(job)) {
 588        qemu_coroutine_yield();
 589    }
 590    job->busy = true;
 591
 592    block_job_pause_point(job);
 593}
 594
 595BlockJobInfo *block_job_query(BlockJob *job, Error **errp)
 596{
 597    BlockJobInfo *info;
 598
 599    if (block_job_is_internal(job)) {
 600        error_setg(errp, "Cannot query QEMU internal jobs");
 601        return NULL;
 602    }
 603    info = g_new0(BlockJobInfo, 1);
 604    info->type      = g_strdup(BlockJobType_lookup[job->driver->job_type]);
 605    info->device    = g_strdup(job->id);
 606    info->len       = job->len;
 607    info->busy      = job->busy;
 608    info->paused    = job->pause_count > 0;
 609    info->offset    = job->offset;
 610    info->speed     = job->speed;
 611    info->io_status = job->iostatus;
 612    info->ready     = job->ready;
 613    return info;
 614}
 615
 616static void block_job_iostatus_set_err(BlockJob *job, int error)
 617{
 618    if (job->iostatus == BLOCK_DEVICE_IO_STATUS_OK) {
 619        job->iostatus = error == ENOSPC ? BLOCK_DEVICE_IO_STATUS_NOSPACE :
 620                                          BLOCK_DEVICE_IO_STATUS_FAILED;
 621    }
 622}
 623
 624static void block_job_event_cancelled(BlockJob *job)
 625{
 626    if (block_job_is_internal(job)) {
 627        return;
 628    }
 629
 630    qapi_event_send_block_job_cancelled(job->driver->job_type,
 631                                        job->id,
 632                                        job->len,
 633                                        job->offset,
 634                                        job->speed,
 635                                        &error_abort);
 636}
 637
 638static void block_job_event_completed(BlockJob *job, const char *msg)
 639{
 640    if (block_job_is_internal(job)) {
 641        return;
 642    }
 643
 644    qapi_event_send_block_job_completed(job->driver->job_type,
 645                                        job->id,
 646                                        job->len,
 647                                        job->offset,
 648                                        job->speed,
 649                                        !!msg,
 650                                        msg,
 651                                        &error_abort);
 652}
 653
 654void block_job_event_ready(BlockJob *job)
 655{
 656    job->ready = true;
 657
 658    if (block_job_is_internal(job)) {
 659        return;
 660    }
 661
 662    qapi_event_send_block_job_ready(job->driver->job_type,
 663                                    job->id,
 664                                    job->len,
 665                                    job->offset,
 666                                    job->speed, &error_abort);
 667}
 668
 669BlockErrorAction block_job_error_action(BlockJob *job, BlockdevOnError on_err,
 670                                        int is_read, int error)
 671{
 672    BlockErrorAction action;
 673
 674    switch (on_err) {
 675    case BLOCKDEV_ON_ERROR_ENOSPC:
 676    case BLOCKDEV_ON_ERROR_AUTO:
 677        action = (error == ENOSPC) ?
 678                 BLOCK_ERROR_ACTION_STOP : BLOCK_ERROR_ACTION_REPORT;
 679        break;
 680    case BLOCKDEV_ON_ERROR_STOP:
 681        action = BLOCK_ERROR_ACTION_STOP;
 682        break;
 683    case BLOCKDEV_ON_ERROR_REPORT:
 684        action = BLOCK_ERROR_ACTION_REPORT;
 685        break;
 686    case BLOCKDEV_ON_ERROR_IGNORE:
 687        action = BLOCK_ERROR_ACTION_IGNORE;
 688        break;
 689    default:
 690        abort();
 691    }
 692    if (!block_job_is_internal(job)) {
 693        qapi_event_send_block_job_error(job->id,
 694                                        is_read ? IO_OPERATION_TYPE_READ :
 695                                        IO_OPERATION_TYPE_WRITE,
 696                                        action, &error_abort);
 697    }
 698    if (action == BLOCK_ERROR_ACTION_STOP) {
 699        /* make the pause user visible, which will be resumed from QMP. */
 700        block_job_user_pause(job);
 701        block_job_iostatus_set_err(job, error);
 702    }
 703    return action;
 704}
 705
 706typedef struct {
 707    BlockJob *job;
 708    AioContext *aio_context;
 709    BlockJobDeferToMainLoopFn *fn;
 710    void *opaque;
 711} BlockJobDeferToMainLoopData;
 712
 713static void block_job_defer_to_main_loop_bh(void *opaque)
 714{
 715    BlockJobDeferToMainLoopData *data = opaque;
 716    AioContext *aio_context;
 717
 718    /* Prevent race with block_job_defer_to_main_loop() */
 719    aio_context_acquire(data->aio_context);
 720
 721    /* Fetch BDS AioContext again, in case it has changed */
 722    aio_context = blk_get_aio_context(data->job->blk);
 723    aio_context_acquire(aio_context);
 724
 725    data->job->deferred_to_main_loop = false;
 726    data->fn(data->job, data->opaque);
 727
 728    aio_context_release(aio_context);
 729
 730    aio_context_release(data->aio_context);
 731
 732    g_free(data);
 733}
 734
 735void block_job_defer_to_main_loop(BlockJob *job,
 736                                  BlockJobDeferToMainLoopFn *fn,
 737                                  void *opaque)
 738{
 739    BlockJobDeferToMainLoopData *data = g_malloc(sizeof(*data));
 740    data->job = job;
 741    data->aio_context = blk_get_aio_context(job->blk);
 742    data->fn = fn;
 743    data->opaque = opaque;
 744    job->deferred_to_main_loop = true;
 745
 746    aio_bh_schedule_oneshot(qemu_get_aio_context(),
 747                            block_job_defer_to_main_loop_bh, data);
 748}
 749
 750BlockJobTxn *block_job_txn_new(void)
 751{
 752    BlockJobTxn *txn = g_new0(BlockJobTxn, 1);
 753    QLIST_INIT(&txn->jobs);
 754    txn->refcnt = 1;
 755    return txn;
 756}
 757
 758static void block_job_txn_ref(BlockJobTxn *txn)
 759{
 760    txn->refcnt++;
 761}
 762
 763void block_job_txn_unref(BlockJobTxn *txn)
 764{
 765    if (txn && --txn->refcnt == 0) {
 766        g_free(txn);
 767    }
 768}
 769
 770void block_job_txn_add_job(BlockJobTxn *txn, BlockJob *job)
 771{
 772    if (!txn) {
 773        return;
 774    }
 775
 776    assert(!job->txn);
 777    job->txn = txn;
 778
 779    QLIST_INSERT_HEAD(&txn->jobs, job, txn_list);
 780    block_job_txn_ref(txn);
 781}
 782