qemu/block/throttle-groups.c
<<
>>
Prefs
   1/*
   2 * QEMU block throttling group infrastructure
   3 *
   4 * Copyright (C) Nodalink, EURL. 2014
   5 * Copyright (C) Igalia, S.L. 2015
   6 *
   7 * Authors:
   8 *   BenoƮt Canet <benoit.canet@nodalink.com>
   9 *   Alberto Garcia <berto@igalia.com>
  10 *
  11 * This program is free software; you can redistribute it and/or
  12 * modify it under the terms of the GNU General Public License as
  13 * published by the Free Software Foundation; either version 2 or
  14 * (at your option) version 3 of the License.
  15 *
  16 * This program is distributed in the hope that it will be useful,
  17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
  18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  19 * GNU General Public License for more details.
  20 *
  21 * You should have received a copy of the GNU General Public License
  22 * along with this program; if not, see <http://www.gnu.org/licenses/>.
  23 */
  24
  25#include "qemu/osdep.h"
  26#include "sysemu/block-backend.h"
  27#include "block/throttle-groups.h"
  28#include "qemu/queue.h"
  29#include "qemu/thread.h"
  30#include "sysemu/qtest.h"
  31
  32/* The ThrottleGroup structure (with its ThrottleState) is shared
  33 * among different BlockBackends and it's independent from
  34 * AioContext, so in order to use it from different threads it needs
  35 * its own locking.
  36 *
  37 * This locking is however handled internally in this file, so it's
  38 * transparent to outside users.
  39 *
  40 * The whole ThrottleGroup structure is private and invisible to
  41 * outside users, that only use it through its ThrottleState.
  42 *
  43 * In addition to the ThrottleGroup structure, BlockBackendPublic has
  44 * fields that need to be accessed by other members of the group and
  45 * therefore also need to be protected by this lock. Once a
  46 * BlockBackend is registered in a group those fields can be accessed
  47 * by other threads any time.
  48 *
  49 * Again, all this is handled internally and is mostly transparent to
  50 * the outside. The 'throttle_timers' field however has an additional
  51 * constraint because it may be temporarily invalid (see for example
  52 * bdrv_set_aio_context()). Therefore in this file a thread will
  53 * access some other BlockBackend's timers only after verifying that
  54 * that BlockBackend has throttled requests in the queue.
  55 */
  56typedef struct ThrottleGroup {
  57    char *name; /* This is constant during the lifetime of the group */
  58
  59    QemuMutex lock; /* This lock protects the following four fields */
  60    ThrottleState ts;
  61    QLIST_HEAD(, BlockBackendPublic) head;
  62    BlockBackend *tokens[2];
  63    bool any_timer_armed[2];
  64
  65    /* These two are protected by the global throttle_groups_lock */
  66    unsigned refcount;
  67    QTAILQ_ENTRY(ThrottleGroup) list;
  68} ThrottleGroup;
  69
  70static QemuMutex throttle_groups_lock;
  71static QTAILQ_HEAD(, ThrottleGroup) throttle_groups =
  72    QTAILQ_HEAD_INITIALIZER(throttle_groups);
  73
  74/* Increments the reference count of a ThrottleGroup given its name.
  75 *
  76 * If no ThrottleGroup is found with the given name a new one is
  77 * created.
  78 *
  79 * @name: the name of the ThrottleGroup
  80 * @ret:  the ThrottleState member of the ThrottleGroup
  81 */
  82ThrottleState *throttle_group_incref(const char *name)
  83{
  84    ThrottleGroup *tg = NULL;
  85    ThrottleGroup *iter;
  86
  87    qemu_mutex_lock(&throttle_groups_lock);
  88
  89    /* Look for an existing group with that name */
  90    QTAILQ_FOREACH(iter, &throttle_groups, list) {
  91        if (!strcmp(name, iter->name)) {
  92            tg = iter;
  93            break;
  94        }
  95    }
  96
  97    /* Create a new one if not found */
  98    if (!tg) {
  99        tg = g_new0(ThrottleGroup, 1);
 100        tg->name = g_strdup(name);
 101        qemu_mutex_init(&tg->lock);
 102        throttle_init(&tg->ts);
 103        QLIST_INIT(&tg->head);
 104
 105        QTAILQ_INSERT_TAIL(&throttle_groups, tg, list);
 106    }
 107
 108    tg->refcount++;
 109
 110    qemu_mutex_unlock(&throttle_groups_lock);
 111
 112    return &tg->ts;
 113}
 114
 115/* Decrease the reference count of a ThrottleGroup.
 116 *
 117 * When the reference count reaches zero the ThrottleGroup is
 118 * destroyed.
 119 *
 120 * @ts:  The ThrottleGroup to unref, given by its ThrottleState member
 121 */
 122void throttle_group_unref(ThrottleState *ts)
 123{
 124    ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
 125
 126    qemu_mutex_lock(&throttle_groups_lock);
 127    if (--tg->refcount == 0) {
 128        QTAILQ_REMOVE(&throttle_groups, tg, list);
 129        qemu_mutex_destroy(&tg->lock);
 130        g_free(tg->name);
 131        g_free(tg);
 132    }
 133    qemu_mutex_unlock(&throttle_groups_lock);
 134}
 135
 136/* Get the name from a BlockBackend's ThrottleGroup. The name (and the pointer)
 137 * is guaranteed to remain constant during the lifetime of the group.
 138 *
 139 * @blk:  a BlockBackend that is member of a throttling group
 140 * @ret:  the name of the group.
 141 */
 142const char *throttle_group_get_name(BlockBackend *blk)
 143{
 144    BlockBackendPublic *blkp = blk_get_public(blk);
 145    ThrottleGroup *tg = container_of(blkp->throttle_state, ThrottleGroup, ts);
 146    return tg->name;
 147}
 148
 149/* Return the next BlockBackend in the round-robin sequence, simulating a
 150 * circular list.
 151 *
 152 * This assumes that tg->lock is held.
 153 *
 154 * @blk: the current BlockBackend
 155 * @ret: the next BlockBackend in the sequence
 156 */
 157static BlockBackend *throttle_group_next_blk(BlockBackend *blk)
 158{
 159    BlockBackendPublic *blkp = blk_get_public(blk);
 160    ThrottleState *ts = blkp->throttle_state;
 161    ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
 162    BlockBackendPublic *next = QLIST_NEXT(blkp, round_robin);
 163
 164    if (!next) {
 165        next = QLIST_FIRST(&tg->head);
 166    }
 167
 168    return blk_by_public(next);
 169}
 170
 171/* Return the next BlockBackend in the round-robin sequence with pending I/O
 172 * requests.
 173 *
 174 * This assumes that tg->lock is held.
 175 *
 176 * @blk:       the current BlockBackend
 177 * @is_write:  the type of operation (read/write)
 178 * @ret:       the next BlockBackend with pending requests, or blk if there is
 179 *             none.
 180 */
 181static BlockBackend *next_throttle_token(BlockBackend *blk, bool is_write)
 182{
 183    BlockBackendPublic *blkp = blk_get_public(blk);
 184    ThrottleGroup *tg = container_of(blkp->throttle_state, ThrottleGroup, ts);
 185    BlockBackend *token, *start;
 186
 187    start = token = tg->tokens[is_write];
 188
 189    /* get next bs round in round robin style */
 190    token = throttle_group_next_blk(token);
 191    while (token != start && !blkp->pending_reqs[is_write]) {
 192        token = throttle_group_next_blk(token);
 193    }
 194
 195    /* If no IO are queued for scheduling on the next round robin token
 196     * then decide the token is the current bs because chances are
 197     * the current bs get the current request queued.
 198     */
 199    if (token == start && !blkp->pending_reqs[is_write]) {
 200        token = blk;
 201    }
 202
 203    return token;
 204}
 205
 206/* Check if the next I/O request for a BlockBackend needs to be throttled or
 207 * not. If there's no timer set in this group, set one and update the token
 208 * accordingly.
 209 *
 210 * This assumes that tg->lock is held.
 211 *
 212 * @blk:        the current BlockBackend
 213 * @is_write:   the type of operation (read/write)
 214 * @ret:        whether the I/O request needs to be throttled or not
 215 */
 216static bool throttle_group_schedule_timer(BlockBackend *blk, bool is_write)
 217{
 218    BlockBackendPublic *blkp = blk_get_public(blk);
 219    ThrottleState *ts = blkp->throttle_state;
 220    ThrottleTimers *tt = &blkp->throttle_timers;
 221    ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
 222    bool must_wait;
 223
 224    if (blkp->io_limits_disabled) {
 225        return false;
 226    }
 227
 228    /* Check if any of the timers in this group is already armed */
 229    if (tg->any_timer_armed[is_write]) {
 230        return true;
 231    }
 232
 233    must_wait = throttle_schedule_timer(ts, tt, is_write);
 234
 235    /* If a timer just got armed, set blk as the current token */
 236    if (must_wait) {
 237        tg->tokens[is_write] = blk;
 238        tg->any_timer_armed[is_write] = true;
 239    }
 240
 241    return must_wait;
 242}
 243
 244/* Look for the next pending I/O request and schedule it.
 245 *
 246 * This assumes that tg->lock is held.
 247 *
 248 * @blk:       the current BlockBackend
 249 * @is_write:  the type of operation (read/write)
 250 */
 251static void schedule_next_request(BlockBackend *blk, bool is_write)
 252{
 253    BlockBackendPublic *blkp = blk_get_public(blk);
 254    ThrottleGroup *tg = container_of(blkp->throttle_state, ThrottleGroup, ts);
 255    bool must_wait;
 256    BlockBackend *token;
 257
 258    /* Check if there's any pending request to schedule next */
 259    token = next_throttle_token(blk, is_write);
 260    if (!blkp->pending_reqs[is_write]) {
 261        return;
 262    }
 263
 264    /* Set a timer for the request if it needs to be throttled */
 265    must_wait = throttle_group_schedule_timer(token, is_write);
 266
 267    /* If it doesn't have to wait, queue it for immediate execution */
 268    if (!must_wait) {
 269        /* Give preference to requests from the current blk */
 270        if (qemu_in_coroutine() &&
 271            qemu_co_queue_next(&blkp->throttled_reqs[is_write])) {
 272            token = blk;
 273        } else {
 274            ThrottleTimers *tt = &blkp->throttle_timers;
 275            int64_t now = qemu_clock_get_ns(tt->clock_type);
 276            timer_mod(tt->timers[is_write], now + 1);
 277            tg->any_timer_armed[is_write] = true;
 278        }
 279        tg->tokens[is_write] = token;
 280    }
 281}
 282
 283/* Check if an I/O request needs to be throttled, wait and set a timer
 284 * if necessary, and schedule the next request using a round robin
 285 * algorithm.
 286 *
 287 * @blk:       the current BlockBackend
 288 * @bytes:     the number of bytes for this I/O
 289 * @is_write:  the type of operation (read/write)
 290 */
 291void coroutine_fn throttle_group_co_io_limits_intercept(BlockBackend *blk,
 292                                                        unsigned int bytes,
 293                                                        bool is_write)
 294{
 295    bool must_wait;
 296    BlockBackend *token;
 297
 298    BlockBackendPublic *blkp = blk_get_public(blk);
 299    ThrottleGroup *tg = container_of(blkp->throttle_state, ThrottleGroup, ts);
 300    qemu_mutex_lock(&tg->lock);
 301
 302    /* First we check if this I/O has to be throttled. */
 303    token = next_throttle_token(blk, is_write);
 304    must_wait = throttle_group_schedule_timer(token, is_write);
 305
 306    /* Wait if there's a timer set or queued requests of this type */
 307    if (must_wait || blkp->pending_reqs[is_write]) {
 308        blkp->pending_reqs[is_write]++;
 309        qemu_mutex_unlock(&tg->lock);
 310        qemu_co_queue_wait(&blkp->throttled_reqs[is_write]);
 311        qemu_mutex_lock(&tg->lock);
 312        blkp->pending_reqs[is_write]--;
 313    }
 314
 315    /* The I/O will be executed, so do the accounting */
 316    throttle_account(blkp->throttle_state, is_write, bytes);
 317
 318    /* Schedule the next request */
 319    schedule_next_request(blk, is_write);
 320
 321    qemu_mutex_unlock(&tg->lock);
 322}
 323
 324void throttle_group_restart_blk(BlockBackend *blk)
 325{
 326    BlockBackendPublic *blkp = blk_get_public(blk);
 327    int i;
 328
 329    for (i = 0; i < 2; i++) {
 330        while (qemu_co_enter_next(&blkp->throttled_reqs[i])) {
 331            ;
 332        }
 333    }
 334}
 335
 336/* Update the throttle configuration for a particular group. Similar
 337 * to throttle_config(), but guarantees atomicity within the
 338 * throttling group.
 339 *
 340 * @blk: a BlockBackend that is a member of the group
 341 * @cfg: the configuration to set
 342 */
 343void throttle_group_config(BlockBackend *blk, ThrottleConfig *cfg)
 344{
 345    BlockBackendPublic *blkp = blk_get_public(blk);
 346    ThrottleTimers *tt = &blkp->throttle_timers;
 347    ThrottleState *ts = blkp->throttle_state;
 348    ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
 349    qemu_mutex_lock(&tg->lock);
 350    /* throttle_config() cancels the timers */
 351    if (timer_pending(tt->timers[0])) {
 352        tg->any_timer_armed[0] = false;
 353    }
 354    if (timer_pending(tt->timers[1])) {
 355        tg->any_timer_armed[1] = false;
 356    }
 357    throttle_config(ts, tt, cfg);
 358    qemu_mutex_unlock(&tg->lock);
 359
 360    qemu_co_enter_next(&blkp->throttled_reqs[0]);
 361    qemu_co_enter_next(&blkp->throttled_reqs[1]);
 362}
 363
 364/* Get the throttle configuration from a particular group. Similar to
 365 * throttle_get_config(), but guarantees atomicity within the
 366 * throttling group.
 367 *
 368 * @blk: a BlockBackend that is a member of the group
 369 * @cfg: the configuration will be written here
 370 */
 371void throttle_group_get_config(BlockBackend *blk, ThrottleConfig *cfg)
 372{
 373    BlockBackendPublic *blkp = blk_get_public(blk);
 374    ThrottleState *ts = blkp->throttle_state;
 375    ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
 376    qemu_mutex_lock(&tg->lock);
 377    throttle_get_config(ts, cfg);
 378    qemu_mutex_unlock(&tg->lock);
 379}
 380
 381/* ThrottleTimers callback. This wakes up a request that was waiting
 382 * because it had been throttled.
 383 *
 384 * @blk:       the BlockBackend whose request had been throttled
 385 * @is_write:  the type of operation (read/write)
 386 */
 387static void timer_cb(BlockBackend *blk, bool is_write)
 388{
 389    BlockBackendPublic *blkp = blk_get_public(blk);
 390    ThrottleState *ts = blkp->throttle_state;
 391    ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
 392    bool empty_queue;
 393
 394    /* The timer has just been fired, so we can update the flag */
 395    qemu_mutex_lock(&tg->lock);
 396    tg->any_timer_armed[is_write] = false;
 397    qemu_mutex_unlock(&tg->lock);
 398
 399    /* Run the request that was waiting for this timer */
 400    empty_queue = !qemu_co_enter_next(&blkp->throttled_reqs[is_write]);
 401
 402    /* If the request queue was empty then we have to take care of
 403     * scheduling the next one */
 404    if (empty_queue) {
 405        qemu_mutex_lock(&tg->lock);
 406        schedule_next_request(blk, is_write);
 407        qemu_mutex_unlock(&tg->lock);
 408    }
 409}
 410
 411static void read_timer_cb(void *opaque)
 412{
 413    timer_cb(opaque, false);
 414}
 415
 416static void write_timer_cb(void *opaque)
 417{
 418    timer_cb(opaque, true);
 419}
 420
 421/* Register a BlockBackend in the throttling group, also initializing its
 422 * timers and updating its throttle_state pointer to point to it. If a
 423 * throttling group with that name does not exist yet, it will be created.
 424 *
 425 * @blk:       the BlockBackend to insert
 426 * @groupname: the name of the group
 427 */
 428void throttle_group_register_blk(BlockBackend *blk, const char *groupname)
 429{
 430    int i;
 431    BlockBackendPublic *blkp = blk_get_public(blk);
 432    ThrottleState *ts = throttle_group_incref(groupname);
 433    ThrottleGroup *tg = container_of(ts, ThrottleGroup, ts);
 434    int clock_type = QEMU_CLOCK_REALTIME;
 435
 436    if (qtest_enabled()) {
 437        /* For testing block IO throttling only */
 438        clock_type = QEMU_CLOCK_VIRTUAL;
 439    }
 440
 441    blkp->throttle_state = ts;
 442
 443    qemu_mutex_lock(&tg->lock);
 444    /* If the ThrottleGroup is new set this BlockBackend as the token */
 445    for (i = 0; i < 2; i++) {
 446        if (!tg->tokens[i]) {
 447            tg->tokens[i] = blk;
 448        }
 449    }
 450
 451    QLIST_INSERT_HEAD(&tg->head, blkp, round_robin);
 452
 453    throttle_timers_init(&blkp->throttle_timers,
 454                         blk_get_aio_context(blk),
 455                         clock_type,
 456                         read_timer_cb,
 457                         write_timer_cb,
 458                         blk);
 459
 460    qemu_mutex_unlock(&tg->lock);
 461}
 462
 463/* Unregister a BlockBackend from its group, removing it from the list,
 464 * destroying the timers and setting the throttle_state pointer to NULL.
 465 *
 466 * The BlockBackend must not have pending throttled requests, so the caller has
 467 * to drain them first.
 468 *
 469 * The group will be destroyed if it's empty after this operation.
 470 *
 471 * @blk: the BlockBackend to remove
 472 */
 473void throttle_group_unregister_blk(BlockBackend *blk)
 474{
 475    BlockBackendPublic *blkp = blk_get_public(blk);
 476    ThrottleGroup *tg = container_of(blkp->throttle_state, ThrottleGroup, ts);
 477    int i;
 478
 479    assert(blkp->pending_reqs[0] == 0 && blkp->pending_reqs[1] == 0);
 480    assert(qemu_co_queue_empty(&blkp->throttled_reqs[0]));
 481    assert(qemu_co_queue_empty(&blkp->throttled_reqs[1]));
 482
 483    qemu_mutex_lock(&tg->lock);
 484    for (i = 0; i < 2; i++) {
 485        if (tg->tokens[i] == blk) {
 486            BlockBackend *token = throttle_group_next_blk(blk);
 487            /* Take care of the case where this is the last blk in the group */
 488            if (token == blk) {
 489                token = NULL;
 490            }
 491            tg->tokens[i] = token;
 492        }
 493    }
 494
 495    /* remove the current blk from the list */
 496    QLIST_REMOVE(blkp, round_robin);
 497    throttle_timers_destroy(&blkp->throttle_timers);
 498    qemu_mutex_unlock(&tg->lock);
 499
 500    throttle_group_unref(&tg->ts);
 501    blkp->throttle_state = NULL;
 502}
 503
 504static void throttle_groups_init(void)
 505{
 506    qemu_mutex_init(&throttle_groups_lock);
 507}
 508
 509block_init(throttle_groups_init);
 510