linux/drivers/block/rnbd/rnbd-clt.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3 * RDMA Network Block Driver
   4 *
   5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
   6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
   7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
   8 */
   9
  10#undef pr_fmt
  11#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
  12
  13#include <linux/module.h>
  14#include <linux/blkdev.h>
  15#include <linux/hdreg.h>
  16#include <linux/scatterlist.h>
  17#include <linux/idr.h>
  18
  19#include "rnbd-clt.h"
  20
  21MODULE_DESCRIPTION("RDMA Network Block Device Client");
  22MODULE_LICENSE("GPL");
  23
  24static int rnbd_client_major;
  25static DEFINE_IDA(index_ida);
  26static DEFINE_MUTEX(ida_lock);
  27static DEFINE_MUTEX(sess_lock);
  28static LIST_HEAD(sess_list);
  29
  30/*
  31 * Maximum number of partitions an instance can have.
  32 * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself)
  33 */
  34#define RNBD_PART_BITS          6
  35
  36static inline bool rnbd_clt_get_sess(struct rnbd_clt_session *sess)
  37{
  38        return refcount_inc_not_zero(&sess->refcount);
  39}
  40
  41static void free_sess(struct rnbd_clt_session *sess);
  42
  43static void rnbd_clt_put_sess(struct rnbd_clt_session *sess)
  44{
  45        might_sleep();
  46
  47        if (refcount_dec_and_test(&sess->refcount))
  48                free_sess(sess);
  49}
  50
  51static void rnbd_clt_put_dev(struct rnbd_clt_dev *dev)
  52{
  53        might_sleep();
  54
  55        if (!refcount_dec_and_test(&dev->refcount))
  56                return;
  57
  58        mutex_lock(&ida_lock);
  59        ida_simple_remove(&index_ida, dev->clt_device_id);
  60        mutex_unlock(&ida_lock);
  61        kfree(dev->hw_queues);
  62        kfree(dev->pathname);
  63        rnbd_clt_put_sess(dev->sess);
  64        mutex_destroy(&dev->lock);
  65        kfree(dev);
  66}
  67
  68static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev *dev)
  69{
  70        return refcount_inc_not_zero(&dev->refcount);
  71}
  72
  73static int rnbd_clt_set_dev_attr(struct rnbd_clt_dev *dev,
  74                                 const struct rnbd_msg_open_rsp *rsp)
  75{
  76        struct rnbd_clt_session *sess = dev->sess;
  77
  78        if (!rsp->logical_block_size)
  79                return -EINVAL;
  80
  81        dev->device_id              = le32_to_cpu(rsp->device_id);
  82        dev->nsectors               = le64_to_cpu(rsp->nsectors);
  83        dev->logical_block_size     = le16_to_cpu(rsp->logical_block_size);
  84        dev->physical_block_size    = le16_to_cpu(rsp->physical_block_size);
  85        dev->max_write_same_sectors = le32_to_cpu(rsp->max_write_same_sectors);
  86        dev->max_discard_sectors    = le32_to_cpu(rsp->max_discard_sectors);
  87        dev->discard_granularity    = le32_to_cpu(rsp->discard_granularity);
  88        dev->discard_alignment      = le32_to_cpu(rsp->discard_alignment);
  89        dev->secure_discard         = le16_to_cpu(rsp->secure_discard);
  90        dev->rotational             = rsp->rotational;
  91        dev->wc                     = !!(rsp->cache_policy & RNBD_WRITEBACK);
  92        dev->fua                    = !!(rsp->cache_policy & RNBD_FUA);
  93
  94        dev->max_hw_sectors = sess->max_io_size / SECTOR_SIZE;
  95        dev->max_segments = BMAX_SEGMENTS;
  96
  97        return 0;
  98}
  99
 100static int rnbd_clt_change_capacity(struct rnbd_clt_dev *dev,
 101                                    size_t new_nsectors)
 102{
 103        rnbd_clt_info(dev, "Device size changed from %zu to %zu sectors\n",
 104                       dev->nsectors, new_nsectors);
 105        dev->nsectors = new_nsectors;
 106        set_capacity_and_notify(dev->gd, dev->nsectors);
 107        return 0;
 108}
 109
 110static int process_msg_open_rsp(struct rnbd_clt_dev *dev,
 111                                struct rnbd_msg_open_rsp *rsp)
 112{
 113        int err = 0;
 114
 115        mutex_lock(&dev->lock);
 116        if (dev->dev_state == DEV_STATE_UNMAPPED) {
 117                rnbd_clt_info(dev,
 118                               "Ignoring Open-Response message from server for  unmapped device\n");
 119                err = -ENOENT;
 120                goto out;
 121        }
 122        if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) {
 123                u64 nsectors = le64_to_cpu(rsp->nsectors);
 124
 125                /*
 126                 * If the device was remapped and the size changed in the
 127                 * meantime we need to revalidate it
 128                 */
 129                if (dev->nsectors != nsectors)
 130                        rnbd_clt_change_capacity(dev, nsectors);
 131                rnbd_clt_info(dev, "Device online, device remapped successfully\n");
 132        }
 133        err = rnbd_clt_set_dev_attr(dev, rsp);
 134        if (err)
 135                goto out;
 136        dev->dev_state = DEV_STATE_MAPPED;
 137
 138out:
 139        mutex_unlock(&dev->lock);
 140
 141        return err;
 142}
 143
 144int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, size_t newsize)
 145{
 146        int ret = 0;
 147
 148        mutex_lock(&dev->lock);
 149        if (dev->dev_state != DEV_STATE_MAPPED) {
 150                pr_err("Failed to set new size of the device, device is not opened\n");
 151                ret = -ENOENT;
 152                goto out;
 153        }
 154        ret = rnbd_clt_change_capacity(dev, newsize);
 155
 156out:
 157        mutex_unlock(&dev->lock);
 158
 159        return ret;
 160}
 161
 162static inline void rnbd_clt_dev_requeue(struct rnbd_queue *q)
 163{
 164        if (WARN_ON(!q->hctx))
 165                return;
 166
 167        /* We can come here from interrupt, thus async=true */
 168        blk_mq_run_hw_queue(q->hctx, true);
 169}
 170
 171enum {
 172        RNBD_DELAY_IFBUSY = -1,
 173};
 174
 175/**
 176 * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun
 177 * @sess:       Session to find a queue for
 178 * @cpu:        Cpu to start the search from
 179 *
 180 * Description:
 181 *     Each CPU has a list of HW queues, which needs to be rerun.  If a list
 182 *     is not empty - it is marked with a bit.  This function finds first
 183 *     set bit in a bitmap and returns corresponding CPU list.
 184 */
 185static struct rnbd_cpu_qlist *
 186rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu)
 187{
 188        int bit;
 189
 190        /* Search from cpu to nr_cpu_ids */
 191        bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu);
 192        if (bit < nr_cpu_ids) {
 193                return per_cpu_ptr(sess->cpu_queues, bit);
 194        } else if (cpu != 0) {
 195                /* Search from 0 to cpu */
 196                bit = find_next_bit(sess->cpu_queues_bm, cpu, 0);
 197                if (bit < cpu)
 198                        return per_cpu_ptr(sess->cpu_queues, bit);
 199        }
 200
 201        return NULL;
 202}
 203
 204static inline int nxt_cpu(int cpu)
 205{
 206        return (cpu + 1) % nr_cpu_ids;
 207}
 208
 209/**
 210 * rnbd_rerun_if_needed() - rerun next queue marked as stopped
 211 * @sess:       Session to rerun a queue on
 212 *
 213 * Description:
 214 *     Each CPU has it's own list of HW queues, which should be rerun.
 215 *     Function finds such list with HW queues, takes a list lock, picks up
 216 *     the first HW queue out of the list and requeues it.
 217 *
 218 * Return:
 219 *     True if the queue was requeued, false otherwise.
 220 *
 221 * Context:
 222 *     Does not matter.
 223 */
 224static bool rnbd_rerun_if_needed(struct rnbd_clt_session *sess)
 225{
 226        struct rnbd_queue *q = NULL;
 227        struct rnbd_cpu_qlist *cpu_q;
 228        unsigned long flags;
 229        int *cpup;
 230
 231        /*
 232         * To keep fairness and not to let other queues starve we always
 233         * try to wake up someone else in round-robin manner.  That of course
 234         * increases latency but queues always have a chance to be executed.
 235         */
 236        cpup = get_cpu_ptr(sess->cpu_rr);
 237        for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q;
 238             cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) {
 239                if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags))
 240                        continue;
 241                if (unlikely(!test_bit(cpu_q->cpu, sess->cpu_queues_bm)))
 242                        goto unlock;
 243                q = list_first_entry_or_null(&cpu_q->requeue_list,
 244                                             typeof(*q), requeue_list);
 245                if (WARN_ON(!q))
 246                        goto clear_bit;
 247                list_del_init(&q->requeue_list);
 248                clear_bit_unlock(0, &q->in_list);
 249
 250                if (list_empty(&cpu_q->requeue_list)) {
 251                        /* Clear bit if nothing is left */
 252clear_bit:
 253                        clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
 254                }
 255unlock:
 256                spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
 257
 258                if (q)
 259                        break;
 260        }
 261
 262        /**
 263         * Saves the CPU that is going to be requeued on the per-cpu var. Just
 264         * incrementing it doesn't work because rnbd_get_cpu_qlist() will
 265         * always return the first CPU with something on the queue list when the
 266         * value stored on the var is greater than the last CPU with something
 267         * on the list.
 268         */
 269        if (cpu_q)
 270                *cpup = cpu_q->cpu;
 271        put_cpu_var(sess->cpu_rr);
 272
 273        if (q)
 274                rnbd_clt_dev_requeue(q);
 275
 276        return q;
 277}
 278
 279/**
 280 * rnbd_rerun_all_if_idle() - rerun all queues left in the list if
 281 *                               session is idling (there are no requests
 282 *                               in-flight).
 283 * @sess:       Session to rerun the queues on
 284 *
 285 * Description:
 286 *     This function tries to rerun all stopped queues if there are no
 287 *     requests in-flight anymore.  This function tries to solve an obvious
 288 *     problem, when number of tags < than number of queues (hctx), which
 289 *     are stopped and put to sleep.  If last permit, which has been just put,
 290 *     does not wake up all left queues (hctxs), IO requests hang forever.
 291 *
 292 *     That can happen when all number of permits, say N, have been exhausted
 293 *     from one CPU, and we have many block devices per session, say M.
 294 *     Each block device has it's own queue (hctx) for each CPU, so eventually
 295 *     we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids.
 296 *     If number of permits N < M x nr_cpu_ids finally we will get an IO hang.
 297 *
 298 *     To avoid this hang last caller of rnbd_put_permit() (last caller is the
 299 *     one who observes sess->busy == 0) must wake up all remaining queues.
 300 *
 301 * Context:
 302 *     Does not matter.
 303 */
 304static void rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess)
 305{
 306        bool requeued;
 307
 308        do {
 309                requeued = rnbd_rerun_if_needed(sess);
 310        } while (atomic_read(&sess->busy) == 0 && requeued);
 311}
 312
 313static struct rtrs_permit *rnbd_get_permit(struct rnbd_clt_session *sess,
 314                                             enum rtrs_clt_con_type con_type,
 315                                             int wait)
 316{
 317        struct rtrs_permit *permit;
 318
 319        permit = rtrs_clt_get_permit(sess->rtrs, con_type,
 320                                      wait ? RTRS_PERMIT_WAIT :
 321                                      RTRS_PERMIT_NOWAIT);
 322        if (likely(permit))
 323                /* We have a subtle rare case here, when all permits can be
 324                 * consumed before busy counter increased.  This is safe,
 325                 * because loser will get NULL as a permit, observe 0 busy
 326                 * counter and immediately restart the queue himself.
 327                 */
 328                atomic_inc(&sess->busy);
 329
 330        return permit;
 331}
 332
 333static void rnbd_put_permit(struct rnbd_clt_session *sess,
 334                             struct rtrs_permit *permit)
 335{
 336        rtrs_clt_put_permit(sess->rtrs, permit);
 337        atomic_dec(&sess->busy);
 338        /* Paired with rnbd_clt_dev_add_to_requeue().  Decrement first
 339         * and then check queue bits.
 340         */
 341        smp_mb__after_atomic();
 342        rnbd_rerun_all_if_idle(sess);
 343}
 344
 345static struct rnbd_iu *rnbd_get_iu(struct rnbd_clt_session *sess,
 346                                     enum rtrs_clt_con_type con_type,
 347                                     int wait)
 348{
 349        struct rnbd_iu *iu;
 350        struct rtrs_permit *permit;
 351
 352        iu = kzalloc(sizeof(*iu), GFP_KERNEL);
 353        if (!iu) {
 354                return NULL;
 355        }
 356
 357        permit = rnbd_get_permit(sess, con_type,
 358                                  wait ? RTRS_PERMIT_WAIT :
 359                                  RTRS_PERMIT_NOWAIT);
 360        if (unlikely(!permit)) {
 361                kfree(iu);
 362                return NULL;
 363        }
 364
 365        iu->permit = permit;
 366        /*
 367         * 1st reference is dropped after finishing sending a "user" message,
 368         * 2nd reference is dropped after confirmation with the response is
 369         * returned.
 370         * 1st and 2nd can happen in any order, so the rnbd_iu should be
 371         * released (rtrs_permit returned to rtrs) only after both
 372         * are finished.
 373         */
 374        atomic_set(&iu->refcount, 2);
 375        init_waitqueue_head(&iu->comp.wait);
 376        iu->comp.errno = INT_MAX;
 377
 378        if (sg_alloc_table(&iu->sgt, 1, GFP_KERNEL)) {
 379                rnbd_put_permit(sess, permit);
 380                kfree(iu);
 381                return NULL;
 382        }
 383
 384        return iu;
 385}
 386
 387static void rnbd_put_iu(struct rnbd_clt_session *sess, struct rnbd_iu *iu)
 388{
 389        if (atomic_dec_and_test(&iu->refcount)) {
 390                sg_free_table(&iu->sgt);
 391                rnbd_put_permit(sess, iu->permit);
 392                kfree(iu);
 393        }
 394}
 395
 396static void rnbd_softirq_done_fn(struct request *rq)
 397{
 398        struct rnbd_clt_dev *dev        = rq->rq_disk->private_data;
 399        struct rnbd_clt_session *sess   = dev->sess;
 400        struct rnbd_iu *iu;
 401
 402        iu = blk_mq_rq_to_pdu(rq);
 403        sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT);
 404        rnbd_put_permit(sess, iu->permit);
 405        blk_mq_end_request(rq, errno_to_blk_status(iu->errno));
 406}
 407
 408static void msg_io_conf(void *priv, int errno)
 409{
 410        struct rnbd_iu *iu = priv;
 411        struct rnbd_clt_dev *dev = iu->dev;
 412        struct request *rq = iu->rq;
 413        int rw = rq_data_dir(rq);
 414
 415        iu->errno = errno;
 416
 417        blk_mq_complete_request(rq);
 418
 419        if (errno)
 420                rnbd_clt_info_rl(dev, "%s I/O failed with err: %d\n",
 421                                 rw == READ ? "read" : "write", errno);
 422}
 423
 424static void wake_up_iu_comp(struct rnbd_iu *iu, int errno)
 425{
 426        iu->comp.errno = errno;
 427        wake_up(&iu->comp.wait);
 428}
 429
 430static void msg_conf(void *priv, int errno)
 431{
 432        struct rnbd_iu *iu = priv;
 433
 434        iu->errno = errno;
 435        schedule_work(&iu->work);
 436}
 437
 438enum wait_type {
 439        NO_WAIT = 0,
 440        WAIT    = 1
 441};
 442
 443static int send_usr_msg(struct rtrs_clt *rtrs, int dir,
 444                        struct rnbd_iu *iu, struct kvec *vec,
 445                        size_t len, struct scatterlist *sg, unsigned int sg_len,
 446                        void (*conf)(struct work_struct *work),
 447                        int *errno, enum wait_type wait)
 448{
 449        int err;
 450        struct rtrs_clt_req_ops req_ops;
 451
 452        INIT_WORK(&iu->work, conf);
 453        req_ops = (struct rtrs_clt_req_ops) {
 454                .priv = iu,
 455                .conf_fn = msg_conf,
 456        };
 457        err = rtrs_clt_request(dir, &req_ops, rtrs, iu->permit,
 458                                vec, 1, len, sg, sg_len);
 459        if (!err && wait) {
 460                wait_event(iu->comp.wait, iu->comp.errno != INT_MAX);
 461                *errno = iu->comp.errno;
 462        } else {
 463                *errno = 0;
 464        }
 465
 466        return err;
 467}
 468
 469static void msg_close_conf(struct work_struct *work)
 470{
 471        struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
 472        struct rnbd_clt_dev *dev = iu->dev;
 473
 474        wake_up_iu_comp(iu, iu->errno);
 475        rnbd_put_iu(dev->sess, iu);
 476        rnbd_clt_put_dev(dev);
 477}
 478
 479static int send_msg_close(struct rnbd_clt_dev *dev, u32 device_id, bool wait)
 480{
 481        struct rnbd_clt_session *sess = dev->sess;
 482        struct rnbd_msg_close msg;
 483        struct rnbd_iu *iu;
 484        struct kvec vec = {
 485                .iov_base = &msg,
 486                .iov_len  = sizeof(msg)
 487        };
 488        int err, errno;
 489
 490        iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
 491        if (!iu)
 492                return -ENOMEM;
 493
 494        iu->buf = NULL;
 495        iu->dev = dev;
 496
 497        msg.hdr.type    = cpu_to_le16(RNBD_MSG_CLOSE);
 498        msg.device_id   = cpu_to_le32(device_id);
 499
 500        WARN_ON(!rnbd_clt_get_dev(dev));
 501        err = send_usr_msg(sess->rtrs, WRITE, iu, &vec, 0, NULL, 0,
 502                           msg_close_conf, &errno, wait);
 503        if (err) {
 504                rnbd_clt_put_dev(dev);
 505                rnbd_put_iu(sess, iu);
 506        } else {
 507                err = errno;
 508        }
 509
 510        rnbd_put_iu(sess, iu);
 511        return err;
 512}
 513
 514static void msg_open_conf(struct work_struct *work)
 515{
 516        struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
 517        struct rnbd_msg_open_rsp *rsp = iu->buf;
 518        struct rnbd_clt_dev *dev = iu->dev;
 519        int errno = iu->errno;
 520
 521        if (errno) {
 522                rnbd_clt_err(dev,
 523                              "Opening failed, server responded: %d\n",
 524                              errno);
 525        } else {
 526                errno = process_msg_open_rsp(dev, rsp);
 527                if (errno) {
 528                        u32 device_id = le32_to_cpu(rsp->device_id);
 529                        /*
 530                         * If server thinks its fine, but we fail to process
 531                         * then be nice and send a close to server.
 532                         */
 533                        (void)send_msg_close(dev, device_id, NO_WAIT);
 534                }
 535        }
 536        kfree(rsp);
 537        wake_up_iu_comp(iu, errno);
 538        rnbd_put_iu(dev->sess, iu);
 539        rnbd_clt_put_dev(dev);
 540}
 541
 542static void msg_sess_info_conf(struct work_struct *work)
 543{
 544        struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
 545        struct rnbd_msg_sess_info_rsp *rsp = iu->buf;
 546        struct rnbd_clt_session *sess = iu->sess;
 547
 548        if (!iu->errno)
 549                sess->ver = min_t(u8, rsp->ver, RNBD_PROTO_VER_MAJOR);
 550
 551        kfree(rsp);
 552        wake_up_iu_comp(iu, iu->errno);
 553        rnbd_put_iu(sess, iu);
 554        rnbd_clt_put_sess(sess);
 555}
 556
 557static int send_msg_open(struct rnbd_clt_dev *dev, bool wait)
 558{
 559        struct rnbd_clt_session *sess = dev->sess;
 560        struct rnbd_msg_open_rsp *rsp;
 561        struct rnbd_msg_open msg;
 562        struct rnbd_iu *iu;
 563        struct kvec vec = {
 564                .iov_base = &msg,
 565                .iov_len  = sizeof(msg)
 566        };
 567        int err, errno;
 568
 569        rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
 570        if (!rsp)
 571                return -ENOMEM;
 572
 573        iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
 574        if (!iu) {
 575                kfree(rsp);
 576                return -ENOMEM;
 577        }
 578
 579        iu->buf = rsp;
 580        iu->dev = dev;
 581
 582        sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp));
 583
 584        msg.hdr.type    = cpu_to_le16(RNBD_MSG_OPEN);
 585        msg.access_mode = dev->access_mode;
 586        strlcpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name));
 587
 588        WARN_ON(!rnbd_clt_get_dev(dev));
 589        err = send_usr_msg(sess->rtrs, READ, iu,
 590                           &vec, sizeof(*rsp), iu->sgt.sgl, 1,
 591                           msg_open_conf, &errno, wait);
 592        if (err) {
 593                rnbd_clt_put_dev(dev);
 594                rnbd_put_iu(sess, iu);
 595                kfree(rsp);
 596        } else {
 597                err = errno;
 598        }
 599
 600        rnbd_put_iu(sess, iu);
 601        return err;
 602}
 603
 604static int send_msg_sess_info(struct rnbd_clt_session *sess, bool wait)
 605{
 606        struct rnbd_msg_sess_info_rsp *rsp;
 607        struct rnbd_msg_sess_info msg;
 608        struct rnbd_iu *iu;
 609        struct kvec vec = {
 610                .iov_base = &msg,
 611                .iov_len  = sizeof(msg)
 612        };
 613        int err, errno;
 614
 615        rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
 616        if (!rsp)
 617                return -ENOMEM;
 618
 619        iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
 620        if (!iu) {
 621                kfree(rsp);
 622                return -ENOMEM;
 623        }
 624
 625        iu->buf = rsp;
 626        iu->sess = sess;
 627        sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp));
 628
 629        msg.hdr.type = cpu_to_le16(RNBD_MSG_SESS_INFO);
 630        msg.ver      = RNBD_PROTO_VER_MAJOR;
 631
 632        if (!rnbd_clt_get_sess(sess)) {
 633                /*
 634                 * That can happen only in one case, when RTRS has restablished
 635                 * the connection and link_ev() is called, but session is almost
 636                 * dead, last reference on session is put and caller is waiting
 637                 * for RTRS to close everything.
 638                 */
 639                err = -ENODEV;
 640                goto put_iu;
 641        }
 642        err = send_usr_msg(sess->rtrs, READ, iu,
 643                           &vec, sizeof(*rsp), iu->sgt.sgl, 1,
 644                           msg_sess_info_conf, &errno, wait);
 645        if (err) {
 646                rnbd_clt_put_sess(sess);
 647put_iu:
 648                rnbd_put_iu(sess, iu);
 649                kfree(rsp);
 650        } else {
 651                err = errno;
 652        }
 653        rnbd_put_iu(sess, iu);
 654        return err;
 655}
 656
 657static void set_dev_states_to_disconnected(struct rnbd_clt_session *sess)
 658{
 659        struct rnbd_clt_dev *dev;
 660
 661        mutex_lock(&sess->lock);
 662        list_for_each_entry(dev, &sess->devs_list, list) {
 663                rnbd_clt_err(dev, "Device disconnected.\n");
 664
 665                mutex_lock(&dev->lock);
 666                if (dev->dev_state == DEV_STATE_MAPPED)
 667                        dev->dev_state = DEV_STATE_MAPPED_DISCONNECTED;
 668                mutex_unlock(&dev->lock);
 669        }
 670        mutex_unlock(&sess->lock);
 671}
 672
 673static void remap_devs(struct rnbd_clt_session *sess)
 674{
 675        struct rnbd_clt_dev *dev;
 676        struct rtrs_attrs attrs;
 677        int err;
 678
 679        /*
 680         * Careful here: we are called from RTRS link event directly,
 681         * thus we can't send any RTRS request and wait for response
 682         * or RTRS will not be able to complete request with failure
 683         * if something goes wrong (failing of outstanding requests
 684         * happens exactly from the context where we are blocking now).
 685         *
 686         * So to avoid deadlocks each usr message sent from here must
 687         * be asynchronous.
 688         */
 689
 690        err = send_msg_sess_info(sess, NO_WAIT);
 691        if (err) {
 692                pr_err("send_msg_sess_info(\"%s\"): %d\n", sess->sessname, err);
 693                return;
 694        }
 695
 696        rtrs_clt_query(sess->rtrs, &attrs);
 697        mutex_lock(&sess->lock);
 698        sess->max_io_size = attrs.max_io_size;
 699
 700        list_for_each_entry(dev, &sess->devs_list, list) {
 701                bool skip;
 702
 703                mutex_lock(&dev->lock);
 704                skip = (dev->dev_state == DEV_STATE_INIT);
 705                mutex_unlock(&dev->lock);
 706                if (skip)
 707                        /*
 708                         * When device is establishing connection for the first
 709                         * time - do not remap, it will be closed soon.
 710                         */
 711                        continue;
 712
 713                rnbd_clt_info(dev, "session reconnected, remapping device\n");
 714                err = send_msg_open(dev, NO_WAIT);
 715                if (err) {
 716                        rnbd_clt_err(dev, "send_msg_open(): %d\n", err);
 717                        break;
 718                }
 719        }
 720        mutex_unlock(&sess->lock);
 721}
 722
 723static void rnbd_clt_link_ev(void *priv, enum rtrs_clt_link_ev ev)
 724{
 725        struct rnbd_clt_session *sess = priv;
 726
 727        switch (ev) {
 728        case RTRS_CLT_LINK_EV_DISCONNECTED:
 729                set_dev_states_to_disconnected(sess);
 730                break;
 731        case RTRS_CLT_LINK_EV_RECONNECTED:
 732                remap_devs(sess);
 733                break;
 734        default:
 735                pr_err("Unknown session event received (%d), session: %s\n",
 736                       ev, sess->sessname);
 737        }
 738}
 739
 740static void rnbd_init_cpu_qlists(struct rnbd_cpu_qlist __percpu *cpu_queues)
 741{
 742        unsigned int cpu;
 743        struct rnbd_cpu_qlist *cpu_q;
 744
 745        for_each_possible_cpu(cpu) {
 746                cpu_q = per_cpu_ptr(cpu_queues, cpu);
 747
 748                cpu_q->cpu = cpu;
 749                INIT_LIST_HEAD(&cpu_q->requeue_list);
 750                spin_lock_init(&cpu_q->requeue_lock);
 751        }
 752}
 753
 754static void destroy_mq_tags(struct rnbd_clt_session *sess)
 755{
 756        if (sess->tag_set.tags)
 757                blk_mq_free_tag_set(&sess->tag_set);
 758}
 759
 760static inline void wake_up_rtrs_waiters(struct rnbd_clt_session *sess)
 761{
 762        sess->rtrs_ready = true;
 763        wake_up_all(&sess->rtrs_waitq);
 764}
 765
 766static void close_rtrs(struct rnbd_clt_session *sess)
 767{
 768        might_sleep();
 769
 770        if (!IS_ERR_OR_NULL(sess->rtrs)) {
 771                rtrs_clt_close(sess->rtrs);
 772                sess->rtrs = NULL;
 773                wake_up_rtrs_waiters(sess);
 774        }
 775}
 776
 777static void free_sess(struct rnbd_clt_session *sess)
 778{
 779        WARN_ON(!list_empty(&sess->devs_list));
 780
 781        might_sleep();
 782
 783        close_rtrs(sess);
 784        destroy_mq_tags(sess);
 785        if (!list_empty(&sess->list)) {
 786                mutex_lock(&sess_lock);
 787                list_del(&sess->list);
 788                mutex_unlock(&sess_lock);
 789        }
 790        free_percpu(sess->cpu_queues);
 791        free_percpu(sess->cpu_rr);
 792        mutex_destroy(&sess->lock);
 793        kfree(sess);
 794}
 795
 796static struct rnbd_clt_session *alloc_sess(const char *sessname)
 797{
 798        struct rnbd_clt_session *sess;
 799        int err, cpu;
 800
 801        sess = kzalloc_node(sizeof(*sess), GFP_KERNEL, NUMA_NO_NODE);
 802        if (!sess)
 803                return ERR_PTR(-ENOMEM);
 804        strlcpy(sess->sessname, sessname, sizeof(sess->sessname));
 805        atomic_set(&sess->busy, 0);
 806        mutex_init(&sess->lock);
 807        INIT_LIST_HEAD(&sess->devs_list);
 808        INIT_LIST_HEAD(&sess->list);
 809        bitmap_zero(sess->cpu_queues_bm, NR_CPUS);
 810        init_waitqueue_head(&sess->rtrs_waitq);
 811        refcount_set(&sess->refcount, 1);
 812
 813        sess->cpu_queues = alloc_percpu(struct rnbd_cpu_qlist);
 814        if (!sess->cpu_queues) {
 815                err = -ENOMEM;
 816                goto err;
 817        }
 818        rnbd_init_cpu_qlists(sess->cpu_queues);
 819
 820        /*
 821         * That is simple percpu variable which stores cpu indices, which are
 822         * incremented on each access.  We need that for the sake of fairness
 823         * to wake up queues in a round-robin manner.
 824         */
 825        sess->cpu_rr = alloc_percpu(int);
 826        if (!sess->cpu_rr) {
 827                err = -ENOMEM;
 828                goto err;
 829        }
 830        for_each_possible_cpu(cpu)
 831                * per_cpu_ptr(sess->cpu_rr, cpu) = cpu;
 832
 833        return sess;
 834
 835err:
 836        free_sess(sess);
 837
 838        return ERR_PTR(err);
 839}
 840
 841static int wait_for_rtrs_connection(struct rnbd_clt_session *sess)
 842{
 843        wait_event(sess->rtrs_waitq, sess->rtrs_ready);
 844        if (IS_ERR_OR_NULL(sess->rtrs))
 845                return -ECONNRESET;
 846
 847        return 0;
 848}
 849
 850static void wait_for_rtrs_disconnection(struct rnbd_clt_session *sess)
 851        __releases(&sess_lock)
 852        __acquires(&sess_lock)
 853{
 854        DEFINE_WAIT(wait);
 855
 856        prepare_to_wait(&sess->rtrs_waitq, &wait, TASK_UNINTERRUPTIBLE);
 857        if (IS_ERR_OR_NULL(sess->rtrs)) {
 858                finish_wait(&sess->rtrs_waitq, &wait);
 859                return;
 860        }
 861        mutex_unlock(&sess_lock);
 862        /* loop in caller, see __find_and_get_sess().
 863         * You can't leave mutex locked and call schedule(), you will catch a
 864         * deadlock with a caller of free_sess(), which has just put the last
 865         * reference and is about to take the sess_lock in order to delete
 866         * the session from the list.
 867         */
 868        schedule();
 869        mutex_lock(&sess_lock);
 870}
 871
 872static struct rnbd_clt_session *__find_and_get_sess(const char *sessname)
 873        __releases(&sess_lock)
 874        __acquires(&sess_lock)
 875{
 876        struct rnbd_clt_session *sess, *sn;
 877        int err;
 878
 879again:
 880        list_for_each_entry_safe(sess, sn, &sess_list, list) {
 881                if (strcmp(sessname, sess->sessname))
 882                        continue;
 883
 884                if (sess->rtrs_ready && IS_ERR_OR_NULL(sess->rtrs))
 885                        /*
 886                         * No RTRS connection, session is dying.
 887                         */
 888                        continue;
 889
 890                if (rnbd_clt_get_sess(sess)) {
 891                        /*
 892                         * Alive session is found, wait for RTRS connection.
 893                         */
 894                        mutex_unlock(&sess_lock);
 895                        err = wait_for_rtrs_connection(sess);
 896                        if (err)
 897                                rnbd_clt_put_sess(sess);
 898                        mutex_lock(&sess_lock);
 899
 900                        if (err)
 901                                /* Session is dying, repeat the loop */
 902                                goto again;
 903
 904                        return sess;
 905                }
 906                /*
 907                 * Ref is 0, session is dying, wait for RTRS disconnect
 908                 * in order to avoid session names clashes.
 909                 */
 910                wait_for_rtrs_disconnection(sess);
 911                /*
 912                 * RTRS is disconnected and soon session will be freed,
 913                 * so repeat a loop.
 914                 */
 915                goto again;
 916        }
 917
 918        return NULL;
 919}
 920
 921static struct
 922rnbd_clt_session *find_or_create_sess(const char *sessname, bool *first)
 923{
 924        struct rnbd_clt_session *sess = NULL;
 925
 926        mutex_lock(&sess_lock);
 927        sess = __find_and_get_sess(sessname);
 928        if (!sess) {
 929                sess = alloc_sess(sessname);
 930                if (IS_ERR(sess)) {
 931                        mutex_unlock(&sess_lock);
 932                        return sess;
 933                }
 934                list_add(&sess->list, &sess_list);
 935                *first = true;
 936        } else
 937                *first = false;
 938        mutex_unlock(&sess_lock);
 939
 940        return sess;
 941}
 942
 943static int rnbd_client_open(struct block_device *block_device, fmode_t mode)
 944{
 945        struct rnbd_clt_dev *dev = block_device->bd_disk->private_data;
 946
 947        if (dev->read_only && (mode & FMODE_WRITE))
 948                return -EPERM;
 949
 950        if (dev->dev_state == DEV_STATE_UNMAPPED ||
 951            !rnbd_clt_get_dev(dev))
 952                return -EIO;
 953
 954        return 0;
 955}
 956
 957static void rnbd_client_release(struct gendisk *gen, fmode_t mode)
 958{
 959        struct rnbd_clt_dev *dev = gen->private_data;
 960
 961        rnbd_clt_put_dev(dev);
 962}
 963
 964static int rnbd_client_getgeo(struct block_device *block_device,
 965                              struct hd_geometry *geo)
 966{
 967        u64 size;
 968        struct rnbd_clt_dev *dev;
 969
 970        dev = block_device->bd_disk->private_data;
 971        size = dev->size * (dev->logical_block_size / SECTOR_SIZE);
 972        geo->cylinders  = size >> 6;    /* size/64 */
 973        geo->heads      = 4;
 974        geo->sectors    = 16;
 975        geo->start      = 0;
 976
 977        return 0;
 978}
 979
 980static const struct block_device_operations rnbd_client_ops = {
 981        .owner          = THIS_MODULE,
 982        .open           = rnbd_client_open,
 983        .release        = rnbd_client_release,
 984        .getgeo         = rnbd_client_getgeo
 985};
 986
 987/* The amount of data that belongs to an I/O and the amount of data that
 988 * should be read or written to the disk (bi_size) can differ.
 989 *
 990 * E.g. When WRITE_SAME is used, only a small amount of data is
 991 * transferred that is then written repeatedly over a lot of sectors.
 992 *
 993 * Get the size of data to be transferred via RTRS by summing up the size
 994 * of the scather-gather list entries.
 995 */
 996static size_t rnbd_clt_get_sg_size(struct scatterlist *sglist, u32 len)
 997{
 998        struct scatterlist *sg;
 999        size_t tsize = 0;
1000        int i;
1001
1002        for_each_sg(sglist, sg, len, i)
1003                tsize += sg->length;
1004        return tsize;
1005}
1006
1007static int rnbd_client_xfer_request(struct rnbd_clt_dev *dev,
1008                                     struct request *rq,
1009                                     struct rnbd_iu *iu)
1010{
1011        struct rtrs_clt *rtrs = dev->sess->rtrs;
1012        struct rtrs_permit *permit = iu->permit;
1013        struct rnbd_msg_io msg;
1014        struct rtrs_clt_req_ops req_ops;
1015        unsigned int sg_cnt = 0;
1016        struct kvec vec;
1017        size_t size;
1018        int err;
1019
1020        iu->rq          = rq;
1021        iu->dev         = dev;
1022        msg.sector      = cpu_to_le64(blk_rq_pos(rq));
1023        msg.bi_size     = cpu_to_le32(blk_rq_bytes(rq));
1024        msg.rw          = cpu_to_le32(rq_to_rnbd_flags(rq));
1025        msg.prio        = cpu_to_le16(req_get_ioprio(rq));
1026
1027        /*
1028         * We only support discards with single segment for now.
1029         * See queue limits.
1030         */
1031        if (req_op(rq) != REQ_OP_DISCARD)
1032                sg_cnt = blk_rq_map_sg(dev->queue, rq, iu->sgt.sgl);
1033
1034        if (sg_cnt == 0)
1035                sg_mark_end(&iu->sgt.sgl[0]);
1036
1037        msg.hdr.type    = cpu_to_le16(RNBD_MSG_IO);
1038        msg.device_id   = cpu_to_le32(dev->device_id);
1039
1040        vec = (struct kvec) {
1041                .iov_base = &msg,
1042                .iov_len  = sizeof(msg)
1043        };
1044        size = rnbd_clt_get_sg_size(iu->sgt.sgl, sg_cnt);
1045        req_ops = (struct rtrs_clt_req_ops) {
1046                .priv = iu,
1047                .conf_fn = msg_io_conf,
1048        };
1049        err = rtrs_clt_request(rq_data_dir(rq), &req_ops, rtrs, permit,
1050                               &vec, 1, size, iu->sgt.sgl, sg_cnt);
1051        if (unlikely(err)) {
1052                rnbd_clt_err_rl(dev, "RTRS failed to transfer IO, err: %d\n",
1053                                 err);
1054                return err;
1055        }
1056
1057        return 0;
1058}
1059
1060/**
1061 * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy
1062 * @dev:        Device to be checked
1063 * @q:          Queue to be added to the requeue list if required
1064 *
1065 * Description:
1066 *     If session is busy, that means someone will requeue us when resources
1067 *     are freed.  If session is not doing anything - device is not added to
1068 *     the list and @false is returned.
1069 */
1070static bool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev *dev,
1071                                                struct rnbd_queue *q)
1072{
1073        struct rnbd_clt_session *sess = dev->sess;
1074        struct rnbd_cpu_qlist *cpu_q;
1075        unsigned long flags;
1076        bool added = true;
1077        bool need_set;
1078
1079        cpu_q = get_cpu_ptr(sess->cpu_queues);
1080        spin_lock_irqsave(&cpu_q->requeue_lock, flags);
1081
1082        if (likely(!test_and_set_bit_lock(0, &q->in_list))) {
1083                if (WARN_ON(!list_empty(&q->requeue_list)))
1084                        goto unlock;
1085
1086                need_set = !test_bit(cpu_q->cpu, sess->cpu_queues_bm);
1087                if (need_set) {
1088                        set_bit(cpu_q->cpu, sess->cpu_queues_bm);
1089                        /* Paired with rnbd_put_permit(). Set a bit first
1090                         * and then observe the busy counter.
1091                         */
1092                        smp_mb__before_atomic();
1093                }
1094                if (likely(atomic_read(&sess->busy))) {
1095                        list_add_tail(&q->requeue_list, &cpu_q->requeue_list);
1096                } else {
1097                        /* Very unlikely, but possible: busy counter was
1098                         * observed as zero.  Drop all bits and return
1099                         * false to restart the queue by ourselves.
1100                         */
1101                        if (need_set)
1102                                clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
1103                        clear_bit_unlock(0, &q->in_list);
1104                        added = false;
1105                }
1106        }
1107unlock:
1108        spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
1109        put_cpu_ptr(sess->cpu_queues);
1110
1111        return added;
1112}
1113
1114static void rnbd_clt_dev_kick_mq_queue(struct rnbd_clt_dev *dev,
1115                                        struct blk_mq_hw_ctx *hctx,
1116                                        int delay)
1117{
1118        struct rnbd_queue *q = hctx->driver_data;
1119
1120        if (delay != RNBD_DELAY_IFBUSY)
1121                blk_mq_delay_run_hw_queue(hctx, delay);
1122        else if (unlikely(!rnbd_clt_dev_add_to_requeue(dev, q)))
1123                /*
1124                 * If session is not busy we have to restart
1125                 * the queue ourselves.
1126                 */
1127                blk_mq_delay_run_hw_queue(hctx, 10/*ms*/);
1128}
1129
1130static blk_status_t rnbd_queue_rq(struct blk_mq_hw_ctx *hctx,
1131                                   const struct blk_mq_queue_data *bd)
1132{
1133        struct request *rq = bd->rq;
1134        struct rnbd_clt_dev *dev = rq->rq_disk->private_data;
1135        struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq);
1136        int err;
1137        blk_status_t ret = BLK_STS_IOERR;
1138
1139        if (unlikely(dev->dev_state != DEV_STATE_MAPPED))
1140                return BLK_STS_IOERR;
1141
1142        iu->permit = rnbd_get_permit(dev->sess, RTRS_IO_CON,
1143                                      RTRS_PERMIT_NOWAIT);
1144        if (unlikely(!iu->permit)) {
1145                rnbd_clt_dev_kick_mq_queue(dev, hctx, RNBD_DELAY_IFBUSY);
1146                return BLK_STS_RESOURCE;
1147        }
1148
1149        iu->sgt.sgl = iu->first_sgl;
1150        err = sg_alloc_table_chained(&iu->sgt,
1151                                     /* Even-if the request has no segment,
1152                                      * sglist must have one entry at least */
1153                                     blk_rq_nr_phys_segments(rq) ? : 1,
1154                                     iu->sgt.sgl,
1155                                     RNBD_INLINE_SG_CNT);
1156        if (err) {
1157                rnbd_clt_err_rl(dev, "sg_alloc_table_chained ret=%d\n", err);
1158                rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/);
1159                rnbd_put_permit(dev->sess, iu->permit);
1160                return BLK_STS_RESOURCE;
1161        }
1162
1163        blk_mq_start_request(rq);
1164        err = rnbd_client_xfer_request(dev, rq, iu);
1165        if (likely(err == 0))
1166                return BLK_STS_OK;
1167        if (unlikely(err == -EAGAIN || err == -ENOMEM)) {
1168                rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/);
1169                ret = BLK_STS_RESOURCE;
1170        }
1171        sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT);
1172        rnbd_put_permit(dev->sess, iu->permit);
1173        return ret;
1174}
1175
1176static struct blk_mq_ops rnbd_mq_ops = {
1177        .queue_rq       = rnbd_queue_rq,
1178        .complete       = rnbd_softirq_done_fn,
1179};
1180
1181static int setup_mq_tags(struct rnbd_clt_session *sess)
1182{
1183        struct blk_mq_tag_set *tag_set = &sess->tag_set;
1184
1185        memset(tag_set, 0, sizeof(*tag_set));
1186        tag_set->ops            = &rnbd_mq_ops;
1187        tag_set->queue_depth    = sess->queue_depth;
1188        tag_set->numa_node              = NUMA_NO_NODE;
1189        tag_set->flags          = BLK_MQ_F_SHOULD_MERGE |
1190                                  BLK_MQ_F_TAG_QUEUE_SHARED;
1191        tag_set->cmd_size       = sizeof(struct rnbd_iu) + RNBD_RDMA_SGL_SIZE;
1192        tag_set->nr_hw_queues   = num_online_cpus();
1193
1194        return blk_mq_alloc_tag_set(tag_set);
1195}
1196
1197static struct rnbd_clt_session *
1198find_and_get_or_create_sess(const char *sessname,
1199                            const struct rtrs_addr *paths,
1200                            size_t path_cnt, u16 port_nr)
1201{
1202        struct rnbd_clt_session *sess;
1203        struct rtrs_attrs attrs;
1204        int err;
1205        bool first;
1206        struct rtrs_clt_ops rtrs_ops;
1207
1208        sess = find_or_create_sess(sessname, &first);
1209        if (sess == ERR_PTR(-ENOMEM))
1210                return ERR_PTR(-ENOMEM);
1211        else if (!first)
1212                return sess;
1213
1214        if (!path_cnt) {
1215                pr_err("Session %s not found, and path parameter not given", sessname);
1216                err = -ENXIO;
1217                goto put_sess;
1218        }
1219
1220        rtrs_ops = (struct rtrs_clt_ops) {
1221                .priv = sess,
1222                .link_ev = rnbd_clt_link_ev,
1223        };
1224        /*
1225         * Nothing was found, establish rtrs connection and proceed further.
1226         */
1227        sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname,
1228                                   paths, path_cnt, port_nr,
1229                                   0, /* Do not use pdu of rtrs */
1230                                   RECONNECT_DELAY, BMAX_SEGMENTS,
1231                                   BLK_MAX_SEGMENT_SIZE,
1232                                   MAX_RECONNECTS);
1233        if (IS_ERR(sess->rtrs)) {
1234                err = PTR_ERR(sess->rtrs);
1235                goto wake_up_and_put;
1236        }
1237        rtrs_clt_query(sess->rtrs, &attrs);
1238        sess->max_io_size = attrs.max_io_size;
1239        sess->queue_depth = attrs.queue_depth;
1240
1241        err = setup_mq_tags(sess);
1242        if (err)
1243                goto close_rtrs;
1244
1245        err = send_msg_sess_info(sess, WAIT);
1246        if (err)
1247                goto close_rtrs;
1248
1249        wake_up_rtrs_waiters(sess);
1250
1251        return sess;
1252
1253close_rtrs:
1254        close_rtrs(sess);
1255put_sess:
1256        rnbd_clt_put_sess(sess);
1257
1258        return ERR_PTR(err);
1259
1260wake_up_and_put:
1261        wake_up_rtrs_waiters(sess);
1262        goto put_sess;
1263}
1264
1265static inline void rnbd_init_hw_queue(struct rnbd_clt_dev *dev,
1266                                       struct rnbd_queue *q,
1267                                       struct blk_mq_hw_ctx *hctx)
1268{
1269        INIT_LIST_HEAD(&q->requeue_list);
1270        q->dev  = dev;
1271        q->hctx = hctx;
1272}
1273
1274static void rnbd_init_mq_hw_queues(struct rnbd_clt_dev *dev)
1275{
1276        int i;
1277        struct blk_mq_hw_ctx *hctx;
1278        struct rnbd_queue *q;
1279
1280        queue_for_each_hw_ctx(dev->queue, hctx, i) {
1281                q = &dev->hw_queues[i];
1282                rnbd_init_hw_queue(dev, q, hctx);
1283                hctx->driver_data = q;
1284        }
1285}
1286
1287static int setup_mq_dev(struct rnbd_clt_dev *dev)
1288{
1289        dev->queue = blk_mq_init_queue(&dev->sess->tag_set);
1290        if (IS_ERR(dev->queue)) {
1291                rnbd_clt_err(dev, "Initializing multiqueue queue failed, err: %ld\n",
1292                              PTR_ERR(dev->queue));
1293                return PTR_ERR(dev->queue);
1294        }
1295        rnbd_init_mq_hw_queues(dev);
1296        return 0;
1297}
1298
1299static void setup_request_queue(struct rnbd_clt_dev *dev)
1300{
1301        blk_queue_logical_block_size(dev->queue, dev->logical_block_size);
1302        blk_queue_physical_block_size(dev->queue, dev->physical_block_size);
1303        blk_queue_max_hw_sectors(dev->queue, dev->max_hw_sectors);
1304        blk_queue_max_write_same_sectors(dev->queue,
1305                                         dev->max_write_same_sectors);
1306
1307        /*
1308         * we don't support discards to "discontiguous" segments
1309         * in on request
1310         */
1311        blk_queue_max_discard_segments(dev->queue, 1);
1312
1313        blk_queue_max_discard_sectors(dev->queue, dev->max_discard_sectors);
1314        dev->queue->limits.discard_granularity  = dev->discard_granularity;
1315        dev->queue->limits.discard_alignment    = dev->discard_alignment;
1316        if (dev->max_discard_sectors)
1317                blk_queue_flag_set(QUEUE_FLAG_DISCARD, dev->queue);
1318        if (dev->secure_discard)
1319                blk_queue_flag_set(QUEUE_FLAG_SECERASE, dev->queue);
1320
1321        blk_queue_flag_set(QUEUE_FLAG_SAME_COMP, dev->queue);
1322        blk_queue_flag_set(QUEUE_FLAG_SAME_FORCE, dev->queue);
1323        blk_queue_max_segments(dev->queue, dev->max_segments);
1324        blk_queue_io_opt(dev->queue, dev->sess->max_io_size);
1325        blk_queue_virt_boundary(dev->queue, SZ_4K - 1);
1326        blk_queue_write_cache(dev->queue, dev->wc, dev->fua);
1327        dev->queue->queuedata = dev;
1328}
1329
1330static void rnbd_clt_setup_gen_disk(struct rnbd_clt_dev *dev, int idx)
1331{
1332        dev->gd->major          = rnbd_client_major;
1333        dev->gd->first_minor    = idx << RNBD_PART_BITS;
1334        dev->gd->fops           = &rnbd_client_ops;
1335        dev->gd->queue          = dev->queue;
1336        dev->gd->private_data   = dev;
1337        snprintf(dev->gd->disk_name, sizeof(dev->gd->disk_name), "rnbd%d",
1338                 idx);
1339        pr_debug("disk_name=%s, capacity=%zu\n",
1340                 dev->gd->disk_name,
1341                 dev->nsectors * (dev->logical_block_size / SECTOR_SIZE)
1342                 );
1343
1344        set_capacity(dev->gd, dev->nsectors);
1345
1346        if (dev->access_mode == RNBD_ACCESS_RO) {
1347                dev->read_only = true;
1348                set_disk_ro(dev->gd, true);
1349        } else {
1350                dev->read_only = false;
1351        }
1352
1353        if (!dev->rotational)
1354                blk_queue_flag_set(QUEUE_FLAG_NONROT, dev->queue);
1355}
1356
1357static int rnbd_client_setup_device(struct rnbd_clt_session *sess,
1358                                     struct rnbd_clt_dev *dev, int idx)
1359{
1360        int err;
1361
1362        dev->size = dev->nsectors * dev->logical_block_size;
1363
1364        err = setup_mq_dev(dev);
1365        if (err)
1366                return err;
1367
1368        setup_request_queue(dev);
1369
1370        dev->gd = alloc_disk_node(1 << RNBD_PART_BITS,  NUMA_NO_NODE);
1371        if (!dev->gd) {
1372                blk_cleanup_queue(dev->queue);
1373                return -ENOMEM;
1374        }
1375
1376        rnbd_clt_setup_gen_disk(dev, idx);
1377
1378        return 0;
1379}
1380
1381static struct rnbd_clt_dev *init_dev(struct rnbd_clt_session *sess,
1382                                      enum rnbd_access_mode access_mode,
1383                                      const char *pathname)
1384{
1385        struct rnbd_clt_dev *dev;
1386        int ret;
1387
1388        dev = kzalloc_node(sizeof(*dev), GFP_KERNEL, NUMA_NO_NODE);
1389        if (!dev)
1390                return ERR_PTR(-ENOMEM);
1391
1392        dev->hw_queues = kcalloc(nr_cpu_ids, sizeof(*dev->hw_queues),
1393                                 GFP_KERNEL);
1394        if (!dev->hw_queues) {
1395                ret = -ENOMEM;
1396                goto out_alloc;
1397        }
1398
1399        mutex_lock(&ida_lock);
1400        ret = ida_simple_get(&index_ida, 0, 1 << (MINORBITS - RNBD_PART_BITS),
1401                             GFP_KERNEL);
1402        mutex_unlock(&ida_lock);
1403        if (ret < 0) {
1404                pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n",
1405                       pathname, sess->sessname, ret);
1406                goto out_queues;
1407        }
1408
1409        dev->pathname = kstrdup(pathname, GFP_KERNEL);
1410        if (!dev->pathname) {
1411                ret = -ENOMEM;
1412                goto out_queues;
1413        }
1414
1415        dev->clt_device_id      = ret;
1416        dev->sess               = sess;
1417        dev->access_mode        = access_mode;
1418        mutex_init(&dev->lock);
1419        refcount_set(&dev->refcount, 1);
1420        dev->dev_state = DEV_STATE_INIT;
1421
1422        /*
1423         * Here we called from sysfs entry, thus clt-sysfs is
1424         * responsible that session will not disappear.
1425         */
1426        WARN_ON(!rnbd_clt_get_sess(sess));
1427
1428        return dev;
1429
1430out_queues:
1431        kfree(dev->hw_queues);
1432out_alloc:
1433        kfree(dev);
1434        return ERR_PTR(ret);
1435}
1436
1437static bool __exists_dev(const char *pathname, const char *sessname)
1438{
1439        struct rnbd_clt_session *sess;
1440        struct rnbd_clt_dev *dev;
1441        bool found = false;
1442
1443        list_for_each_entry(sess, &sess_list, list) {
1444                if (sessname && strncmp(sess->sessname, sessname,
1445                                        sizeof(sess->sessname)))
1446                        continue;
1447                mutex_lock(&sess->lock);
1448                list_for_each_entry(dev, &sess->devs_list, list) {
1449                        if (strlen(dev->pathname) == strlen(pathname) &&
1450                            !strcmp(dev->pathname, pathname)) {
1451                                found = true;
1452                                break;
1453                        }
1454                }
1455                mutex_unlock(&sess->lock);
1456                if (found)
1457                        break;
1458        }
1459
1460        return found;
1461}
1462
1463static bool exists_devpath(const char *pathname, const char *sessname)
1464{
1465        bool found;
1466
1467        mutex_lock(&sess_lock);
1468        found = __exists_dev(pathname, sessname);
1469        mutex_unlock(&sess_lock);
1470
1471        return found;
1472}
1473
1474static bool insert_dev_if_not_exists_devpath(const char *pathname,
1475                                             struct rnbd_clt_session *sess,
1476                                             struct rnbd_clt_dev *dev)
1477{
1478        bool found;
1479
1480        mutex_lock(&sess_lock);
1481        found = __exists_dev(pathname, sess->sessname);
1482        if (!found) {
1483                mutex_lock(&sess->lock);
1484                list_add_tail(&dev->list, &sess->devs_list);
1485                mutex_unlock(&sess->lock);
1486        }
1487        mutex_unlock(&sess_lock);
1488
1489        return found;
1490}
1491
1492static void delete_dev(struct rnbd_clt_dev *dev)
1493{
1494        struct rnbd_clt_session *sess = dev->sess;
1495
1496        mutex_lock(&sess->lock);
1497        list_del(&dev->list);
1498        mutex_unlock(&sess->lock);
1499}
1500
1501struct rnbd_clt_dev *rnbd_clt_map_device(const char *sessname,
1502                                           struct rtrs_addr *paths,
1503                                           size_t path_cnt, u16 port_nr,
1504                                           const char *pathname,
1505                                           enum rnbd_access_mode access_mode)
1506{
1507        struct rnbd_clt_session *sess;
1508        struct rnbd_clt_dev *dev;
1509        int ret;
1510
1511        if (unlikely(exists_devpath(pathname, sessname)))
1512                return ERR_PTR(-EEXIST);
1513
1514        sess = find_and_get_or_create_sess(sessname, paths, path_cnt, port_nr);
1515        if (IS_ERR(sess))
1516                return ERR_CAST(sess);
1517
1518        dev = init_dev(sess, access_mode, pathname);
1519        if (IS_ERR(dev)) {
1520                pr_err("map_device: failed to map device '%s' from session %s, can't initialize device, err: %ld\n",
1521                       pathname, sess->sessname, PTR_ERR(dev));
1522                ret = PTR_ERR(dev);
1523                goto put_sess;
1524        }
1525        if (insert_dev_if_not_exists_devpath(pathname, sess, dev)) {
1526                ret = -EEXIST;
1527                goto put_dev;
1528        }
1529        ret = send_msg_open(dev, WAIT);
1530        if (ret) {
1531                rnbd_clt_err(dev,
1532                              "map_device: failed, can't open remote device, err: %d\n",
1533                              ret);
1534                goto del_dev;
1535        }
1536        mutex_lock(&dev->lock);
1537        pr_debug("Opened remote device: session=%s, path='%s'\n",
1538                 sess->sessname, pathname);
1539        ret = rnbd_client_setup_device(sess, dev, dev->clt_device_id);
1540        if (ret) {
1541                rnbd_clt_err(dev,
1542                              "map_device: Failed to configure device, err: %d\n",
1543                              ret);
1544                mutex_unlock(&dev->lock);
1545                goto send_close;
1546        }
1547
1548        rnbd_clt_info(dev,
1549                       "map_device: Device mapped as %s (nsectors: %zu, logical_block_size: %d, physical_block_size: %d, max_write_same_sectors: %d, max_discard_sectors: %d, discard_granularity: %d, discard_alignment: %d, secure_discard: %d, max_segments: %d, max_hw_sectors: %d, rotational: %d, wc: %d, fua: %d)\n",
1550                       dev->gd->disk_name, dev->nsectors,
1551                       dev->logical_block_size, dev->physical_block_size,
1552                       dev->max_write_same_sectors, dev->max_discard_sectors,
1553                       dev->discard_granularity, dev->discard_alignment,
1554                       dev->secure_discard, dev->max_segments,
1555                       dev->max_hw_sectors, dev->rotational, dev->wc, dev->fua);
1556
1557        mutex_unlock(&dev->lock);
1558
1559        add_disk(dev->gd);
1560        rnbd_clt_put_sess(sess);
1561
1562        return dev;
1563
1564send_close:
1565        send_msg_close(dev, dev->device_id, WAIT);
1566del_dev:
1567        delete_dev(dev);
1568put_dev:
1569        rnbd_clt_put_dev(dev);
1570put_sess:
1571        rnbd_clt_put_sess(sess);
1572
1573        return ERR_PTR(ret);
1574}
1575
1576static void destroy_gen_disk(struct rnbd_clt_dev *dev)
1577{
1578        del_gendisk(dev->gd);
1579        blk_cleanup_queue(dev->queue);
1580        put_disk(dev->gd);
1581}
1582
1583static void destroy_sysfs(struct rnbd_clt_dev *dev,
1584                          const struct attribute *sysfs_self)
1585{
1586        rnbd_clt_remove_dev_symlink(dev);
1587        if (dev->kobj.state_initialized) {
1588                if (sysfs_self)
1589                        /* To avoid deadlock firstly remove itself */
1590                        sysfs_remove_file_self(&dev->kobj, sysfs_self);
1591                kobject_del(&dev->kobj);
1592                kobject_put(&dev->kobj);
1593        }
1594}
1595
1596int rnbd_clt_unmap_device(struct rnbd_clt_dev *dev, bool force,
1597                           const struct attribute *sysfs_self)
1598{
1599        struct rnbd_clt_session *sess = dev->sess;
1600        int refcount, ret = 0;
1601        bool was_mapped;
1602
1603        mutex_lock(&dev->lock);
1604        if (dev->dev_state == DEV_STATE_UNMAPPED) {
1605                rnbd_clt_info(dev, "Device is already being unmapped\n");
1606                ret = -EALREADY;
1607                goto err;
1608        }
1609        refcount = refcount_read(&dev->refcount);
1610        if (!force && refcount > 1) {
1611                rnbd_clt_err(dev,
1612                              "Closing device failed, device is in use, (%d device users)\n",
1613                              refcount - 1);
1614                ret = -EBUSY;
1615                goto err;
1616        }
1617        was_mapped = (dev->dev_state == DEV_STATE_MAPPED);
1618        dev->dev_state = DEV_STATE_UNMAPPED;
1619        mutex_unlock(&dev->lock);
1620
1621        delete_dev(dev);
1622        destroy_sysfs(dev, sysfs_self);
1623        destroy_gen_disk(dev);
1624        if (was_mapped && sess->rtrs)
1625                send_msg_close(dev, dev->device_id, WAIT);
1626
1627        rnbd_clt_info(dev, "Device is unmapped\n");
1628
1629        /* Likely last reference put */
1630        rnbd_clt_put_dev(dev);
1631
1632        /*
1633         * Here device and session can be vanished!
1634         */
1635
1636        return 0;
1637err:
1638        mutex_unlock(&dev->lock);
1639
1640        return ret;
1641}
1642
1643int rnbd_clt_remap_device(struct rnbd_clt_dev *dev)
1644{
1645        int err;
1646
1647        mutex_lock(&dev->lock);
1648        if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED)
1649                err = 0;
1650        else if (dev->dev_state == DEV_STATE_UNMAPPED)
1651                err = -ENODEV;
1652        else if (dev->dev_state == DEV_STATE_MAPPED)
1653                err = -EALREADY;
1654        else
1655                err = -EBUSY;
1656        mutex_unlock(&dev->lock);
1657        if (!err) {
1658                rnbd_clt_info(dev, "Remapping device.\n");
1659                err = send_msg_open(dev, WAIT);
1660                if (err)
1661                        rnbd_clt_err(dev, "remap_device: %d\n", err);
1662        }
1663
1664        return err;
1665}
1666
1667static void unmap_device_work(struct work_struct *work)
1668{
1669        struct rnbd_clt_dev *dev;
1670
1671        dev = container_of(work, typeof(*dev), unmap_on_rmmod_work);
1672        rnbd_clt_unmap_device(dev, true, NULL);
1673}
1674
1675static void rnbd_destroy_sessions(void)
1676{
1677        struct rnbd_clt_session *sess, *sn;
1678        struct rnbd_clt_dev *dev, *tn;
1679
1680        /* Firstly forbid access through sysfs interface */
1681        rnbd_clt_destroy_default_group();
1682        rnbd_clt_destroy_sysfs_files();
1683
1684        /*
1685         * Here at this point there is no any concurrent access to sessions
1686         * list and devices list:
1687         *   1. New session or device can't be created - session sysfs files
1688         *      are removed.
1689         *   2. Device or session can't be removed - module reference is taken
1690         *      into account in unmap device sysfs callback.
1691         *   3. No IO requests inflight - each file open of block_dev increases
1692         *      module reference in get_disk().
1693         *
1694         * But still there can be user requests inflights, which are sent by
1695         * asynchronous send_msg_*() functions, thus before unmapping devices
1696         * RTRS session must be explicitly closed.
1697         */
1698
1699        list_for_each_entry_safe(sess, sn, &sess_list, list) {
1700                if (!rnbd_clt_get_sess(sess))
1701                        continue;
1702                close_rtrs(sess);
1703                list_for_each_entry_safe(dev, tn, &sess->devs_list, list) {
1704                        /*
1705                         * Here unmap happens in parallel for only one reason:
1706                         * blk_cleanup_queue() takes around half a second, so
1707                         * on huge amount of devices the whole module unload
1708                         * procedure takes minutes.
1709                         */
1710                        INIT_WORK(&dev->unmap_on_rmmod_work, unmap_device_work);
1711                        queue_work(system_long_wq, &dev->unmap_on_rmmod_work);
1712                }
1713                rnbd_clt_put_sess(sess);
1714        }
1715        /* Wait for all scheduled unmap works */
1716        flush_workqueue(system_long_wq);
1717        WARN_ON(!list_empty(&sess_list));
1718}
1719
1720static int __init rnbd_client_init(void)
1721{
1722        int err = 0;
1723
1724        BUILD_BUG_ON(sizeof(struct rnbd_msg_hdr) != 4);
1725        BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info) != 36);
1726        BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info_rsp) != 36);
1727        BUILD_BUG_ON(sizeof(struct rnbd_msg_open) != 264);
1728        BUILD_BUG_ON(sizeof(struct rnbd_msg_close) != 8);
1729        BUILD_BUG_ON(sizeof(struct rnbd_msg_open_rsp) != 56);
1730        rnbd_client_major = register_blkdev(rnbd_client_major, "rnbd");
1731        if (rnbd_client_major <= 0) {
1732                pr_err("Failed to load module, block device registration failed\n");
1733                return -EBUSY;
1734        }
1735
1736        err = rnbd_clt_create_sysfs_files();
1737        if (err) {
1738                pr_err("Failed to load module, creating sysfs device files failed, err: %d\n",
1739                       err);
1740                unregister_blkdev(rnbd_client_major, "rnbd");
1741        }
1742
1743        return err;
1744}
1745
1746static void __exit rnbd_client_exit(void)
1747{
1748        rnbd_destroy_sessions();
1749        unregister_blkdev(rnbd_client_major, "rnbd");
1750        ida_destroy(&index_ida);
1751}
1752
1753module_init(rnbd_client_init);
1754module_exit(rnbd_client_exit);
1755