linux/drivers/md/dm-rq.c
<<
>>
Prefs
   1/*
   2 * Copyright (C) 2016 Red Hat, Inc. All rights reserved.
   3 *
   4 * This file is released under the GPL.
   5 */
   6
   7#include "dm-core.h"
   8#include "dm-rq.h"
   9
  10#include <linux/elevator.h> /* for rq_end_sector() */
  11#include <linux/blk-mq.h>
  12
  13#define DM_MSG_PREFIX "core-rq"
  14
  15#define DM_MQ_NR_HW_QUEUES 1
  16#define DM_MQ_QUEUE_DEPTH 2048
  17static unsigned dm_mq_nr_hw_queues = DM_MQ_NR_HW_QUEUES;
  18static unsigned dm_mq_queue_depth = DM_MQ_QUEUE_DEPTH;
  19
  20/*
  21 * Request-based DM's mempools' reserved IOs set by the user.
  22 */
  23#define RESERVED_REQUEST_BASED_IOS      256
  24static unsigned reserved_rq_based_ios = RESERVED_REQUEST_BASED_IOS;
  25
  26static bool use_blk_mq = IS_ENABLED(CONFIG_DM_MQ_DEFAULT);
  27
  28bool dm_use_blk_mq_default(void)
  29{
  30        return use_blk_mq;
  31}
  32
  33bool dm_use_blk_mq(struct mapped_device *md)
  34{
  35        return md->use_blk_mq;
  36}
  37EXPORT_SYMBOL_GPL(dm_use_blk_mq);
  38
  39unsigned dm_get_reserved_rq_based_ios(void)
  40{
  41        return __dm_get_module_param(&reserved_rq_based_ios,
  42                                     RESERVED_REQUEST_BASED_IOS, DM_RESERVED_MAX_IOS);
  43}
  44EXPORT_SYMBOL_GPL(dm_get_reserved_rq_based_ios);
  45
  46static unsigned dm_get_blk_mq_nr_hw_queues(void)
  47{
  48        return __dm_get_module_param(&dm_mq_nr_hw_queues, 1, 32);
  49}
  50
  51static unsigned dm_get_blk_mq_queue_depth(void)
  52{
  53        return __dm_get_module_param(&dm_mq_queue_depth,
  54                                     DM_MQ_QUEUE_DEPTH, BLK_MQ_MAX_DEPTH);
  55}
  56
  57int dm_request_based(struct mapped_device *md)
  58{
  59        return blk_queue_stackable(md->queue);
  60}
  61
  62static void dm_old_start_queue(struct request_queue *q)
  63{
  64        unsigned long flags;
  65
  66        spin_lock_irqsave(q->queue_lock, flags);
  67        if (blk_queue_stopped(q))
  68                blk_start_queue(q);
  69        spin_unlock_irqrestore(q->queue_lock, flags);
  70}
  71
  72static void dm_mq_start_queue(struct request_queue *q)
  73{
  74        blk_mq_start_stopped_hw_queues(q, true);
  75        blk_mq_kick_requeue_list(q);
  76}
  77
  78void dm_start_queue(struct request_queue *q)
  79{
  80        if (!q->mq_ops)
  81                dm_old_start_queue(q);
  82        else
  83                dm_mq_start_queue(q);
  84}
  85
  86static void dm_old_stop_queue(struct request_queue *q)
  87{
  88        unsigned long flags;
  89
  90        spin_lock_irqsave(q->queue_lock, flags);
  91        if (!blk_queue_stopped(q))
  92                blk_stop_queue(q);
  93        spin_unlock_irqrestore(q->queue_lock, flags);
  94}
  95
  96static void dm_mq_stop_queue(struct request_queue *q)
  97{
  98        if (blk_mq_queue_stopped(q))
  99                return;
 100
 101        blk_mq_quiesce_queue(q);
 102}
 103
 104void dm_stop_queue(struct request_queue *q)
 105{
 106        if (!q->mq_ops)
 107                dm_old_stop_queue(q);
 108        else
 109                dm_mq_stop_queue(q);
 110}
 111
 112/*
 113 * Partial completion handling for request-based dm
 114 */
 115static void end_clone_bio(struct bio *clone)
 116{
 117        struct dm_rq_clone_bio_info *info =
 118                container_of(clone, struct dm_rq_clone_bio_info, clone);
 119        struct dm_rq_target_io *tio = info->tio;
 120        struct bio *bio = info->orig;
 121        unsigned int nr_bytes = info->orig->bi_iter.bi_size;
 122        int error = clone->bi_error;
 123
 124        bio_put(clone);
 125
 126        if (tio->error)
 127                /*
 128                 * An error has already been detected on the request.
 129                 * Once error occurred, just let clone->end_io() handle
 130                 * the remainder.
 131                 */
 132                return;
 133        else if (error) {
 134                /*
 135                 * Don't notice the error to the upper layer yet.
 136                 * The error handling decision is made by the target driver,
 137                 * when the request is completed.
 138                 */
 139                tio->error = error;
 140                return;
 141        }
 142
 143        /*
 144         * I/O for the bio successfully completed.
 145         * Notice the data completion to the upper layer.
 146         */
 147
 148        /*
 149         * bios are processed from the head of the list.
 150         * So the completing bio should always be rq->bio.
 151         * If it's not, something wrong is happening.
 152         */
 153        if (tio->orig->bio != bio)
 154                DMERR("bio completion is going in the middle of the request");
 155
 156        /*
 157         * Update the original request.
 158         * Do not use blk_end_request() here, because it may complete
 159         * the original request before the clone, and break the ordering.
 160         */
 161        blk_update_request(tio->orig, 0, nr_bytes);
 162}
 163
 164static struct dm_rq_target_io *tio_from_request(struct request *rq)
 165{
 166        return blk_mq_rq_to_pdu(rq);
 167}
 168
 169static void rq_end_stats(struct mapped_device *md, struct request *orig)
 170{
 171        if (unlikely(dm_stats_used(&md->stats))) {
 172                struct dm_rq_target_io *tio = tio_from_request(orig);
 173                tio->duration_jiffies = jiffies - tio->duration_jiffies;
 174                dm_stats_account_io(&md->stats, rq_data_dir(orig),
 175                                    blk_rq_pos(orig), tio->n_sectors, true,
 176                                    tio->duration_jiffies, &tio->stats_aux);
 177        }
 178}
 179
 180/*
 181 * Don't touch any member of the md after calling this function because
 182 * the md may be freed in dm_put() at the end of this function.
 183 * Or do dm_get() before calling this function and dm_put() later.
 184 */
 185static void rq_completed(struct mapped_device *md, int rw, bool run_queue)
 186{
 187        struct request_queue *q = md->queue;
 188        unsigned long flags;
 189
 190        atomic_dec(&md->pending[rw]);
 191
 192        /* nudge anyone waiting on suspend queue */
 193        if (!md_in_flight(md))
 194                wake_up(&md->wait);
 195
 196        /*
 197         * Run this off this callpath, as drivers could invoke end_io while
 198         * inside their request_fn (and holding the queue lock). Calling
 199         * back into ->request_fn() could deadlock attempting to grab the
 200         * queue lock again.
 201         */
 202        if (!q->mq_ops && run_queue) {
 203                spin_lock_irqsave(q->queue_lock, flags);
 204                blk_run_queue_async(q);
 205                spin_unlock_irqrestore(q->queue_lock, flags);
 206        }
 207
 208        /*
 209         * dm_put() must be at the end of this function. See the comment above
 210         */
 211        dm_put(md);
 212}
 213
 214/*
 215 * Complete the clone and the original request.
 216 * Must be called without clone's queue lock held,
 217 * see end_clone_request() for more details.
 218 */
 219static void dm_end_request(struct request *clone, int error)
 220{
 221        int rw = rq_data_dir(clone);
 222        struct dm_rq_target_io *tio = clone->end_io_data;
 223        struct mapped_device *md = tio->md;
 224        struct request *rq = tio->orig;
 225
 226        blk_rq_unprep_clone(clone);
 227        tio->ti->type->release_clone_rq(clone);
 228
 229        rq_end_stats(md, rq);
 230        if (!rq->q->mq_ops)
 231                blk_end_request_all(rq, error);
 232        else
 233                blk_mq_end_request(rq, error);
 234        rq_completed(md, rw, true);
 235}
 236
 237/*
 238 * Requeue the original request of a clone.
 239 */
 240static void dm_old_requeue_request(struct request *rq)
 241{
 242        struct request_queue *q = rq->q;
 243        unsigned long flags;
 244
 245        spin_lock_irqsave(q->queue_lock, flags);
 246        blk_requeue_request(q, rq);
 247        blk_run_queue_async(q);
 248        spin_unlock_irqrestore(q->queue_lock, flags);
 249}
 250
 251static void __dm_mq_kick_requeue_list(struct request_queue *q, unsigned long msecs)
 252{
 253        blk_mq_delay_kick_requeue_list(q, msecs);
 254}
 255
 256void dm_mq_kick_requeue_list(struct mapped_device *md)
 257{
 258        __dm_mq_kick_requeue_list(dm_get_md_queue(md), 0);
 259}
 260EXPORT_SYMBOL(dm_mq_kick_requeue_list);
 261
 262static void dm_mq_delay_requeue_request(struct request *rq, unsigned long msecs)
 263{
 264        blk_mq_requeue_request(rq, false);
 265        __dm_mq_kick_requeue_list(rq->q, msecs);
 266}
 267
 268static void dm_requeue_original_request(struct dm_rq_target_io *tio, bool delay_requeue)
 269{
 270        struct mapped_device *md = tio->md;
 271        struct request *rq = tio->orig;
 272        int rw = rq_data_dir(rq);
 273
 274        rq_end_stats(md, rq);
 275        if (tio->clone) {
 276                blk_rq_unprep_clone(tio->clone);
 277                tio->ti->type->release_clone_rq(tio->clone);
 278        }
 279
 280        if (!rq->q->mq_ops)
 281                dm_old_requeue_request(rq);
 282        else
 283                dm_mq_delay_requeue_request(rq, delay_requeue ? 5000 : 0);
 284
 285        rq_completed(md, rw, false);
 286}
 287
 288static void dm_done(struct request *clone, int error, bool mapped)
 289{
 290        int r = error;
 291        struct dm_rq_target_io *tio = clone->end_io_data;
 292        dm_request_endio_fn rq_end_io = NULL;
 293
 294        if (tio->ti) {
 295                rq_end_io = tio->ti->type->rq_end_io;
 296
 297                if (mapped && rq_end_io)
 298                        r = rq_end_io(tio->ti, clone, error, &tio->info);
 299        }
 300
 301        if (unlikely(r == -EREMOTEIO && (req_op(clone) == REQ_OP_WRITE_SAME) &&
 302                     !clone->q->limits.max_write_same_sectors))
 303                disable_write_same(tio->md);
 304
 305        if (r <= 0)
 306                /* The target wants to complete the I/O */
 307                dm_end_request(clone, r);
 308        else if (r == DM_ENDIO_INCOMPLETE)
 309                /* The target will handle the I/O */
 310                return;
 311        else if (r == DM_ENDIO_REQUEUE)
 312                /* The target wants to requeue the I/O */
 313                dm_requeue_original_request(tio, false);
 314        else {
 315                DMWARN("unimplemented target endio return value: %d", r);
 316                BUG();
 317        }
 318}
 319
 320/*
 321 * Request completion handler for request-based dm
 322 */
 323static void dm_softirq_done(struct request *rq)
 324{
 325        bool mapped = true;
 326        struct dm_rq_target_io *tio = tio_from_request(rq);
 327        struct request *clone = tio->clone;
 328        int rw;
 329
 330        if (!clone) {
 331                struct mapped_device *md = tio->md;
 332
 333                rq_end_stats(md, rq);
 334                rw = rq_data_dir(rq);
 335                if (!rq->q->mq_ops)
 336                        blk_end_request_all(rq, tio->error);
 337                else
 338                        blk_mq_end_request(rq, tio->error);
 339                rq_completed(md, rw, false);
 340                return;
 341        }
 342
 343        if (rq->rq_flags & RQF_FAILED)
 344                mapped = false;
 345
 346        dm_done(clone, tio->error, mapped);
 347}
 348
 349/*
 350 * Complete the clone and the original request with the error status
 351 * through softirq context.
 352 */
 353static void dm_complete_request(struct request *rq, int error)
 354{
 355        struct dm_rq_target_io *tio = tio_from_request(rq);
 356
 357        tio->error = error;
 358        if (!rq->q->mq_ops)
 359                blk_complete_request(rq);
 360        else
 361                blk_mq_complete_request(rq, error);
 362}
 363
 364/*
 365 * Complete the not-mapped clone and the original request with the error status
 366 * through softirq context.
 367 * Target's rq_end_io() function isn't called.
 368 * This may be used when the target's map_rq() or clone_and_map_rq() functions fail.
 369 */
 370static void dm_kill_unmapped_request(struct request *rq, int error)
 371{
 372        rq->rq_flags |= RQF_FAILED;
 373        dm_complete_request(rq, error);
 374}
 375
 376/*
 377 * Called with the clone's queue lock held (in the case of .request_fn)
 378 */
 379static void end_clone_request(struct request *clone, int error)
 380{
 381        struct dm_rq_target_io *tio = clone->end_io_data;
 382
 383        /*
 384         * Actual request completion is done in a softirq context which doesn't
 385         * hold the clone's queue lock.  Otherwise, deadlock could occur because:
 386         *     - another request may be submitted by the upper level driver
 387         *       of the stacking during the completion
 388         *     - the submission which requires queue lock may be done
 389         *       against this clone's queue
 390         */
 391        dm_complete_request(tio->orig, error);
 392}
 393
 394static void dm_dispatch_clone_request(struct request *clone, struct request *rq)
 395{
 396        int r;
 397
 398        if (blk_queue_io_stat(clone->q))
 399                clone->rq_flags |= RQF_IO_STAT;
 400
 401        clone->start_time = jiffies;
 402        r = blk_insert_cloned_request(clone->q, clone);
 403        if (r)
 404                /* must complete clone in terms of original request */
 405                dm_complete_request(rq, r);
 406}
 407
 408static int dm_rq_bio_constructor(struct bio *bio, struct bio *bio_orig,
 409                                 void *data)
 410{
 411        struct dm_rq_target_io *tio = data;
 412        struct dm_rq_clone_bio_info *info =
 413                container_of(bio, struct dm_rq_clone_bio_info, clone);
 414
 415        info->orig = bio_orig;
 416        info->tio = tio;
 417        bio->bi_end_io = end_clone_bio;
 418
 419        return 0;
 420}
 421
 422static int setup_clone(struct request *clone, struct request *rq,
 423                       struct dm_rq_target_io *tio, gfp_t gfp_mask)
 424{
 425        int r;
 426
 427        r = blk_rq_prep_clone(clone, rq, tio->md->bs, gfp_mask,
 428                              dm_rq_bio_constructor, tio);
 429        if (r)
 430                return r;
 431
 432        clone->end_io = end_clone_request;
 433        clone->end_io_data = tio;
 434
 435        tio->clone = clone;
 436
 437        return 0;
 438}
 439
 440static void map_tio_request(struct kthread_work *work);
 441
 442static void init_tio(struct dm_rq_target_io *tio, struct request *rq,
 443                     struct mapped_device *md)
 444{
 445        tio->md = md;
 446        tio->ti = NULL;
 447        tio->clone = NULL;
 448        tio->orig = rq;
 449        tio->error = 0;
 450        /*
 451         * Avoid initializing info for blk-mq; it passes
 452         * target-specific data through info.ptr
 453         * (see: dm_mq_init_request)
 454         */
 455        if (!md->init_tio_pdu)
 456                memset(&tio->info, 0, sizeof(tio->info));
 457        if (md->kworker_task)
 458                kthread_init_work(&tio->work, map_tio_request);
 459}
 460
 461/*
 462 * Returns:
 463 * DM_MAPIO_*       : the request has been processed as indicated
 464 * DM_MAPIO_REQUEUE : the original request needs to be immediately requeued
 465 * < 0              : the request was completed due to failure
 466 */
 467static int map_request(struct dm_rq_target_io *tio)
 468{
 469        int r;
 470        struct dm_target *ti = tio->ti;
 471        struct mapped_device *md = tio->md;
 472        struct request *rq = tio->orig;
 473        struct request *clone = NULL;
 474
 475        r = ti->type->clone_and_map_rq(ti, rq, &tio->info, &clone);
 476        switch (r) {
 477        case DM_MAPIO_SUBMITTED:
 478                /* The target has taken the I/O to submit by itself later */
 479                break;
 480        case DM_MAPIO_REMAPPED:
 481                if (setup_clone(clone, rq, tio, GFP_ATOMIC)) {
 482                        /* -ENOMEM */
 483                        ti->type->release_clone_rq(clone);
 484                        return DM_MAPIO_REQUEUE;
 485                }
 486
 487                /* The target has remapped the I/O so dispatch it */
 488                trace_block_rq_remap(clone->q, clone, disk_devt(dm_disk(md)),
 489                                     blk_rq_pos(rq));
 490                dm_dispatch_clone_request(clone, rq);
 491                break;
 492        case DM_MAPIO_REQUEUE:
 493                /* The target wants to requeue the I/O */
 494                break;
 495        case DM_MAPIO_DELAY_REQUEUE:
 496                /* The target wants to requeue the I/O after a delay */
 497                dm_requeue_original_request(tio, true);
 498                break;
 499        default:
 500                if (r > 0) {
 501                        DMWARN("unimplemented target map return value: %d", r);
 502                        BUG();
 503                }
 504
 505                /* The target wants to complete the I/O */
 506                dm_kill_unmapped_request(rq, r);
 507        }
 508
 509        return r;
 510}
 511
 512static void dm_start_request(struct mapped_device *md, struct request *orig)
 513{
 514        if (!orig->q->mq_ops)
 515                blk_start_request(orig);
 516        else
 517                blk_mq_start_request(orig);
 518        atomic_inc(&md->pending[rq_data_dir(orig)]);
 519
 520        if (md->seq_rq_merge_deadline_usecs) {
 521                md->last_rq_pos = rq_end_sector(orig);
 522                md->last_rq_rw = rq_data_dir(orig);
 523                md->last_rq_start_time = ktime_get();
 524        }
 525
 526        if (unlikely(dm_stats_used(&md->stats))) {
 527                struct dm_rq_target_io *tio = tio_from_request(orig);
 528                tio->duration_jiffies = jiffies;
 529                tio->n_sectors = blk_rq_sectors(orig);
 530                dm_stats_account_io(&md->stats, rq_data_dir(orig),
 531                                    blk_rq_pos(orig), tio->n_sectors, false, 0,
 532                                    &tio->stats_aux);
 533        }
 534
 535        /*
 536         * Hold the md reference here for the in-flight I/O.
 537         * We can't rely on the reference count by device opener,
 538         * because the device may be closed during the request completion
 539         * when all bios are completed.
 540         * See the comment in rq_completed() too.
 541         */
 542        dm_get(md);
 543}
 544
 545static int __dm_rq_init_rq(struct mapped_device *md, struct request *rq)
 546{
 547        struct dm_rq_target_io *tio = blk_mq_rq_to_pdu(rq);
 548
 549        /*
 550         * Must initialize md member of tio, otherwise it won't
 551         * be available in dm_mq_queue_rq.
 552         */
 553        tio->md = md;
 554
 555        if (md->init_tio_pdu) {
 556                /* target-specific per-io data is immediately after the tio */
 557                tio->info.ptr = tio + 1;
 558        }
 559
 560        return 0;
 561}
 562
 563static int dm_rq_init_rq(struct request_queue *q, struct request *rq, gfp_t gfp)
 564{
 565        return __dm_rq_init_rq(q->rq_alloc_data, rq);
 566}
 567
 568static void map_tio_request(struct kthread_work *work)
 569{
 570        struct dm_rq_target_io *tio = container_of(work, struct dm_rq_target_io, work);
 571
 572        if (map_request(tio) == DM_MAPIO_REQUEUE)
 573                dm_requeue_original_request(tio, false);
 574}
 575
 576ssize_t dm_attr_rq_based_seq_io_merge_deadline_show(struct mapped_device *md, char *buf)
 577{
 578        return sprintf(buf, "%u\n", md->seq_rq_merge_deadline_usecs);
 579}
 580
 581#define MAX_SEQ_RQ_MERGE_DEADLINE_USECS 100000
 582
 583ssize_t dm_attr_rq_based_seq_io_merge_deadline_store(struct mapped_device *md,
 584                                                     const char *buf, size_t count)
 585{
 586        unsigned deadline;
 587
 588        if (dm_get_md_type(md) != DM_TYPE_REQUEST_BASED)
 589                return count;
 590
 591        if (kstrtouint(buf, 10, &deadline))
 592                return -EINVAL;
 593
 594        if (deadline > MAX_SEQ_RQ_MERGE_DEADLINE_USECS)
 595                deadline = MAX_SEQ_RQ_MERGE_DEADLINE_USECS;
 596
 597        md->seq_rq_merge_deadline_usecs = deadline;
 598
 599        return count;
 600}
 601
 602static bool dm_old_request_peeked_before_merge_deadline(struct mapped_device *md)
 603{
 604        ktime_t kt_deadline;
 605
 606        if (!md->seq_rq_merge_deadline_usecs)
 607                return false;
 608
 609        kt_deadline = ns_to_ktime((u64)md->seq_rq_merge_deadline_usecs * NSEC_PER_USEC);
 610        kt_deadline = ktime_add_safe(md->last_rq_start_time, kt_deadline);
 611
 612        return !ktime_after(ktime_get(), kt_deadline);
 613}
 614
 615/*
 616 * q->request_fn for old request-based dm.
 617 * Called with the queue lock held.
 618 */
 619static void dm_old_request_fn(struct request_queue *q)
 620{
 621        struct mapped_device *md = q->queuedata;
 622        struct dm_target *ti = md->immutable_target;
 623        struct request *rq;
 624        struct dm_rq_target_io *tio;
 625        sector_t pos = 0;
 626
 627        if (unlikely(!ti)) {
 628                int srcu_idx;
 629                struct dm_table *map = dm_get_live_table(md, &srcu_idx);
 630
 631                if (unlikely(!map)) {
 632                        dm_put_live_table(md, srcu_idx);
 633                        return;
 634                }
 635                ti = dm_table_find_target(map, pos);
 636                dm_put_live_table(md, srcu_idx);
 637        }
 638
 639        /*
 640         * For suspend, check blk_queue_stopped() and increment
 641         * ->pending within a single queue_lock not to increment the
 642         * number of in-flight I/Os after the queue is stopped in
 643         * dm_suspend().
 644         */
 645        while (!blk_queue_stopped(q)) {
 646                rq = blk_peek_request(q);
 647                if (!rq)
 648                        return;
 649
 650                /* always use block 0 to find the target for flushes for now */
 651                pos = 0;
 652                if (req_op(rq) != REQ_OP_FLUSH)
 653                        pos = blk_rq_pos(rq);
 654
 655                if ((dm_old_request_peeked_before_merge_deadline(md) &&
 656                     md_in_flight(md) && rq->bio && !bio_multiple_segments(rq->bio) &&
 657                     md->last_rq_pos == pos && md->last_rq_rw == rq_data_dir(rq)) ||
 658                    (ti->type->busy && ti->type->busy(ti))) {
 659                        blk_delay_queue(q, 10);
 660                        return;
 661                }
 662
 663                dm_start_request(md, rq);
 664
 665                tio = tio_from_request(rq);
 666                init_tio(tio, rq, md);
 667                /* Establish tio->ti before queuing work (map_tio_request) */
 668                tio->ti = ti;
 669                kthread_queue_work(&md->kworker, &tio->work);
 670                BUG_ON(!irqs_disabled());
 671        }
 672}
 673
 674/*
 675 * Fully initialize a .request_fn request-based queue.
 676 */
 677int dm_old_init_request_queue(struct mapped_device *md, struct dm_table *t)
 678{
 679        struct dm_target *immutable_tgt;
 680
 681        /* Fully initialize the queue */
 682        md->queue->cmd_size = sizeof(struct dm_rq_target_io);
 683        md->queue->rq_alloc_data = md;
 684        md->queue->request_fn = dm_old_request_fn;
 685        md->queue->init_rq_fn = dm_rq_init_rq;
 686
 687        immutable_tgt = dm_table_get_immutable_target(t);
 688        if (immutable_tgt && immutable_tgt->per_io_data_size) {
 689                /* any target-specific per-io data is immediately after the tio */
 690                md->queue->cmd_size += immutable_tgt->per_io_data_size;
 691                md->init_tio_pdu = true;
 692        }
 693        if (blk_init_allocated_queue(md->queue) < 0)
 694                return -EINVAL;
 695
 696        /* disable dm_old_request_fn's merge heuristic by default */
 697        md->seq_rq_merge_deadline_usecs = 0;
 698
 699        dm_init_normal_md_queue(md);
 700        blk_queue_softirq_done(md->queue, dm_softirq_done);
 701
 702        /* Initialize the request-based DM worker thread */
 703        kthread_init_worker(&md->kworker);
 704        md->kworker_task = kthread_run(kthread_worker_fn, &md->kworker,
 705                                       "kdmwork-%s", dm_device_name(md));
 706        if (IS_ERR(md->kworker_task)) {
 707                int error = PTR_ERR(md->kworker_task);
 708                md->kworker_task = NULL;
 709                return error;
 710        }
 711
 712        elv_register_queue(md->queue);
 713
 714        return 0;
 715}
 716
 717static int dm_mq_init_request(void *data, struct request *rq,
 718                       unsigned int hctx_idx, unsigned int request_idx,
 719                       unsigned int numa_node)
 720{
 721        return __dm_rq_init_rq(data, rq);
 722}
 723
 724static int dm_mq_queue_rq(struct blk_mq_hw_ctx *hctx,
 725                          const struct blk_mq_queue_data *bd)
 726{
 727        struct request *rq = bd->rq;
 728        struct dm_rq_target_io *tio = blk_mq_rq_to_pdu(rq);
 729        struct mapped_device *md = tio->md;
 730        struct dm_target *ti = md->immutable_target;
 731
 732        if (unlikely(!ti)) {
 733                int srcu_idx;
 734                struct dm_table *map = dm_get_live_table(md, &srcu_idx);
 735
 736                ti = dm_table_find_target(map, 0);
 737                dm_put_live_table(md, srcu_idx);
 738        }
 739
 740        if (ti->type->busy && ti->type->busy(ti))
 741                return BLK_MQ_RQ_QUEUE_BUSY;
 742
 743        dm_start_request(md, rq);
 744
 745        /* Init tio using md established in .init_request */
 746        init_tio(tio, rq, md);
 747
 748        /*
 749         * Establish tio->ti before calling map_request().
 750         */
 751        tio->ti = ti;
 752
 753        /* Direct call is fine since .queue_rq allows allocations */
 754        if (map_request(tio) == DM_MAPIO_REQUEUE) {
 755                /* Undo dm_start_request() before requeuing */
 756                rq_end_stats(md, rq);
 757                rq_completed(md, rq_data_dir(rq), false);
 758                blk_mq_delay_run_hw_queue(hctx, 100/*ms*/);
 759                return BLK_MQ_RQ_QUEUE_BUSY;
 760        }
 761
 762        return BLK_MQ_RQ_QUEUE_OK;
 763}
 764
 765static struct blk_mq_ops dm_mq_ops = {
 766        .queue_rq = dm_mq_queue_rq,
 767        .complete = dm_softirq_done,
 768        .init_request = dm_mq_init_request,
 769};
 770
 771int dm_mq_init_request_queue(struct mapped_device *md, struct dm_table *t)
 772{
 773        struct request_queue *q;
 774        struct dm_target *immutable_tgt;
 775        int err;
 776
 777        if (!dm_table_all_blk_mq_devices(t)) {
 778                DMERR("request-based dm-mq may only be stacked on blk-mq device(s)");
 779                return -EINVAL;
 780        }
 781
 782        md->tag_set = kzalloc_node(sizeof(struct blk_mq_tag_set), GFP_KERNEL, md->numa_node_id);
 783        if (!md->tag_set)
 784                return -ENOMEM;
 785
 786        md->tag_set->ops = &dm_mq_ops;
 787        md->tag_set->queue_depth = dm_get_blk_mq_queue_depth();
 788        md->tag_set->numa_node = md->numa_node_id;
 789        md->tag_set->flags = BLK_MQ_F_SHOULD_MERGE | BLK_MQ_F_SG_MERGE;
 790        md->tag_set->nr_hw_queues = dm_get_blk_mq_nr_hw_queues();
 791        md->tag_set->driver_data = md;
 792
 793        md->tag_set->cmd_size = sizeof(struct dm_rq_target_io);
 794        immutable_tgt = dm_table_get_immutable_target(t);
 795        if (immutable_tgt && immutable_tgt->per_io_data_size) {
 796                /* any target-specific per-io data is immediately after the tio */
 797                md->tag_set->cmd_size += immutable_tgt->per_io_data_size;
 798                md->init_tio_pdu = true;
 799        }
 800
 801        err = blk_mq_alloc_tag_set(md->tag_set);
 802        if (err)
 803                goto out_kfree_tag_set;
 804
 805        q = blk_mq_init_allocated_queue(md->tag_set, md->queue);
 806        if (IS_ERR(q)) {
 807                err = PTR_ERR(q);
 808                goto out_tag_set;
 809        }
 810        dm_init_md_queue(md);
 811
 812        /* backfill 'mq' sysfs registration normally done in blk_register_queue */
 813        blk_mq_register_dev(disk_to_dev(md->disk), q);
 814
 815        return 0;
 816
 817out_tag_set:
 818        blk_mq_free_tag_set(md->tag_set);
 819out_kfree_tag_set:
 820        kfree(md->tag_set);
 821
 822        return err;
 823}
 824
 825void dm_mq_cleanup_mapped_device(struct mapped_device *md)
 826{
 827        if (md->tag_set) {
 828                blk_mq_free_tag_set(md->tag_set);
 829                kfree(md->tag_set);
 830        }
 831}
 832
 833module_param(reserved_rq_based_ios, uint, S_IRUGO | S_IWUSR);
 834MODULE_PARM_DESC(reserved_rq_based_ios, "Reserved IOs in request-based mempools");
 835
 836module_param(use_blk_mq, bool, S_IRUGO | S_IWUSR);
 837MODULE_PARM_DESC(use_blk_mq, "Use block multiqueue for request-based DM devices");
 838
 839module_param(dm_mq_nr_hw_queues, uint, S_IRUGO | S_IWUSR);
 840MODULE_PARM_DESC(dm_mq_nr_hw_queues, "Number of hardware queues for request-based dm-mq devices");
 841
 842module_param(dm_mq_queue_depth, uint, S_IRUGO | S_IWUSR);
 843MODULE_PARM_DESC(dm_mq_queue_depth, "Queue depth for request-based dm-mq devices");
 844