linux/drivers/block/rnbd/rnbd-clt.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3 * RDMA Network Block Driver
   4 *
   5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
   6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
   7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
   8 */
   9
  10#undef pr_fmt
  11#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
  12
  13#include <linux/module.h>
  14#include <linux/blkdev.h>
  15#include <linux/hdreg.h>
  16#include <linux/scatterlist.h>
  17#include <linux/idr.h>
  18
  19#include "rnbd-clt.h"
  20
  21MODULE_DESCRIPTION("RDMA Network Block Device Client");
  22MODULE_LICENSE("GPL");
  23
  24static int rnbd_client_major;
  25static DEFINE_IDA(index_ida);
  26static DEFINE_MUTEX(sess_lock);
  27static LIST_HEAD(sess_list);
  28static struct workqueue_struct *rnbd_clt_wq;
  29
  30/*
  31 * Maximum number of partitions an instance can have.
  32 * 6 bits = 64 minors = 63 partitions (one minor is used for the device itself)
  33 */
  34#define RNBD_PART_BITS          6
  35
  36static inline bool rnbd_clt_get_sess(struct rnbd_clt_session *sess)
  37{
  38        return refcount_inc_not_zero(&sess->refcount);
  39}
  40
  41static void free_sess(struct rnbd_clt_session *sess);
  42
  43static void rnbd_clt_put_sess(struct rnbd_clt_session *sess)
  44{
  45        might_sleep();
  46
  47        if (refcount_dec_and_test(&sess->refcount))
  48                free_sess(sess);
  49}
  50
  51static void rnbd_clt_put_dev(struct rnbd_clt_dev *dev)
  52{
  53        might_sleep();
  54
  55        if (!refcount_dec_and_test(&dev->refcount))
  56                return;
  57
  58        ida_free(&index_ida, dev->clt_device_id);
  59        kfree(dev->hw_queues);
  60        kfree(dev->pathname);
  61        rnbd_clt_put_sess(dev->sess);
  62        mutex_destroy(&dev->lock);
  63        kfree(dev);
  64}
  65
  66static inline bool rnbd_clt_get_dev(struct rnbd_clt_dev *dev)
  67{
  68        return refcount_inc_not_zero(&dev->refcount);
  69}
  70
  71static int rnbd_clt_set_dev_attr(struct rnbd_clt_dev *dev,
  72                                 const struct rnbd_msg_open_rsp *rsp)
  73{
  74        struct rnbd_clt_session *sess = dev->sess;
  75
  76        if (!rsp->logical_block_size)
  77                return -EINVAL;
  78
  79        dev->device_id              = le32_to_cpu(rsp->device_id);
  80        dev->nsectors               = le64_to_cpu(rsp->nsectors);
  81        dev->logical_block_size     = le16_to_cpu(rsp->logical_block_size);
  82        dev->physical_block_size    = le16_to_cpu(rsp->physical_block_size);
  83        dev->max_discard_sectors    = le32_to_cpu(rsp->max_discard_sectors);
  84        dev->discard_granularity    = le32_to_cpu(rsp->discard_granularity);
  85        dev->discard_alignment      = le32_to_cpu(rsp->discard_alignment);
  86        dev->secure_discard         = le16_to_cpu(rsp->secure_discard);
  87        dev->wc                     = !!(rsp->cache_policy & RNBD_WRITEBACK);
  88        dev->fua                    = !!(rsp->cache_policy & RNBD_FUA);
  89
  90        dev->max_hw_sectors = sess->max_io_size / SECTOR_SIZE;
  91        dev->max_segments = sess->max_segments;
  92
  93        return 0;
  94}
  95
  96static int rnbd_clt_change_capacity(struct rnbd_clt_dev *dev,
  97                                    size_t new_nsectors)
  98{
  99        rnbd_clt_info(dev, "Device size changed from %zu to %zu sectors\n",
 100                       dev->nsectors, new_nsectors);
 101        dev->nsectors = new_nsectors;
 102        set_capacity_and_notify(dev->gd, dev->nsectors);
 103        return 0;
 104}
 105
 106static int process_msg_open_rsp(struct rnbd_clt_dev *dev,
 107                                struct rnbd_msg_open_rsp *rsp)
 108{
 109        struct kobject *gd_kobj;
 110        int err = 0;
 111
 112        mutex_lock(&dev->lock);
 113        if (dev->dev_state == DEV_STATE_UNMAPPED) {
 114                rnbd_clt_info(dev,
 115                               "Ignoring Open-Response message from server for  unmapped device\n");
 116                err = -ENOENT;
 117                goto out;
 118        }
 119        if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED) {
 120                u64 nsectors = le64_to_cpu(rsp->nsectors);
 121
 122                /*
 123                 * If the device was remapped and the size changed in the
 124                 * meantime we need to revalidate it
 125                 */
 126                if (dev->nsectors != nsectors)
 127                        rnbd_clt_change_capacity(dev, nsectors);
 128                gd_kobj = &disk_to_dev(dev->gd)->kobj;
 129                kobject_uevent(gd_kobj, KOBJ_ONLINE);
 130                rnbd_clt_info(dev, "Device online, device remapped successfully\n");
 131        }
 132        err = rnbd_clt_set_dev_attr(dev, rsp);
 133        if (err)
 134                goto out;
 135        dev->dev_state = DEV_STATE_MAPPED;
 136
 137out:
 138        mutex_unlock(&dev->lock);
 139
 140        return err;
 141}
 142
 143int rnbd_clt_resize_disk(struct rnbd_clt_dev *dev, size_t newsize)
 144{
 145        int ret = 0;
 146
 147        mutex_lock(&dev->lock);
 148        if (dev->dev_state != DEV_STATE_MAPPED) {
 149                pr_err("Failed to set new size of the device, device is not opened\n");
 150                ret = -ENOENT;
 151                goto out;
 152        }
 153        ret = rnbd_clt_change_capacity(dev, newsize);
 154
 155out:
 156        mutex_unlock(&dev->lock);
 157
 158        return ret;
 159}
 160
 161static inline void rnbd_clt_dev_requeue(struct rnbd_queue *q)
 162{
 163        if (WARN_ON(!q->hctx))
 164                return;
 165
 166        /* We can come here from interrupt, thus async=true */
 167        blk_mq_run_hw_queue(q->hctx, true);
 168}
 169
 170enum {
 171        RNBD_DELAY_IFBUSY = -1,
 172};
 173
 174/**
 175 * rnbd_get_cpu_qlist() - finds a list with HW queues to be rerun
 176 * @sess:       Session to find a queue for
 177 * @cpu:        Cpu to start the search from
 178 *
 179 * Description:
 180 *     Each CPU has a list of HW queues, which needs to be rerun.  If a list
 181 *     is not empty - it is marked with a bit.  This function finds first
 182 *     set bit in a bitmap and returns corresponding CPU list.
 183 */
 184static struct rnbd_cpu_qlist *
 185rnbd_get_cpu_qlist(struct rnbd_clt_session *sess, int cpu)
 186{
 187        int bit;
 188
 189        /* Search from cpu to nr_cpu_ids */
 190        bit = find_next_bit(sess->cpu_queues_bm, nr_cpu_ids, cpu);
 191        if (bit < nr_cpu_ids) {
 192                return per_cpu_ptr(sess->cpu_queues, bit);
 193        } else if (cpu != 0) {
 194                /* Search from 0 to cpu */
 195                bit = find_first_bit(sess->cpu_queues_bm, cpu);
 196                if (bit < cpu)
 197                        return per_cpu_ptr(sess->cpu_queues, bit);
 198        }
 199
 200        return NULL;
 201}
 202
 203static inline int nxt_cpu(int cpu)
 204{
 205        return (cpu + 1) % nr_cpu_ids;
 206}
 207
 208/**
 209 * rnbd_rerun_if_needed() - rerun next queue marked as stopped
 210 * @sess:       Session to rerun a queue on
 211 *
 212 * Description:
 213 *     Each CPU has it's own list of HW queues, which should be rerun.
 214 *     Function finds such list with HW queues, takes a list lock, picks up
 215 *     the first HW queue out of the list and requeues it.
 216 *
 217 * Return:
 218 *     True if the queue was requeued, false otherwise.
 219 *
 220 * Context:
 221 *     Does not matter.
 222 */
 223static bool rnbd_rerun_if_needed(struct rnbd_clt_session *sess)
 224{
 225        struct rnbd_queue *q = NULL;
 226        struct rnbd_cpu_qlist *cpu_q;
 227        unsigned long flags;
 228        int *cpup;
 229
 230        /*
 231         * To keep fairness and not to let other queues starve we always
 232         * try to wake up someone else in round-robin manner.  That of course
 233         * increases latency but queues always have a chance to be executed.
 234         */
 235        cpup = get_cpu_ptr(sess->cpu_rr);
 236        for (cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(*cpup)); cpu_q;
 237             cpu_q = rnbd_get_cpu_qlist(sess, nxt_cpu(cpu_q->cpu))) {
 238                if (!spin_trylock_irqsave(&cpu_q->requeue_lock, flags))
 239                        continue;
 240                if (!test_bit(cpu_q->cpu, sess->cpu_queues_bm))
 241                        goto unlock;
 242                q = list_first_entry_or_null(&cpu_q->requeue_list,
 243                                             typeof(*q), requeue_list);
 244                if (WARN_ON(!q))
 245                        goto clear_bit;
 246                list_del_init(&q->requeue_list);
 247                clear_bit_unlock(0, &q->in_list);
 248
 249                if (list_empty(&cpu_q->requeue_list)) {
 250                        /* Clear bit if nothing is left */
 251clear_bit:
 252                        clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
 253                }
 254unlock:
 255                spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
 256
 257                if (q)
 258                        break;
 259        }
 260
 261        /**
 262         * Saves the CPU that is going to be requeued on the per-cpu var. Just
 263         * incrementing it doesn't work because rnbd_get_cpu_qlist() will
 264         * always return the first CPU with something on the queue list when the
 265         * value stored on the var is greater than the last CPU with something
 266         * on the list.
 267         */
 268        if (cpu_q)
 269                *cpup = cpu_q->cpu;
 270        put_cpu_ptr(sess->cpu_rr);
 271
 272        if (q)
 273                rnbd_clt_dev_requeue(q);
 274
 275        return q;
 276}
 277
 278/**
 279 * rnbd_rerun_all_if_idle() - rerun all queues left in the list if
 280 *                               session is idling (there are no requests
 281 *                               in-flight).
 282 * @sess:       Session to rerun the queues on
 283 *
 284 * Description:
 285 *     This function tries to rerun all stopped queues if there are no
 286 *     requests in-flight anymore.  This function tries to solve an obvious
 287 *     problem, when number of tags < than number of queues (hctx), which
 288 *     are stopped and put to sleep.  If last permit, which has been just put,
 289 *     does not wake up all left queues (hctxs), IO requests hang forever.
 290 *
 291 *     That can happen when all number of permits, say N, have been exhausted
 292 *     from one CPU, and we have many block devices per session, say M.
 293 *     Each block device has it's own queue (hctx) for each CPU, so eventually
 294 *     we can put that number of queues (hctxs) to sleep: M x nr_cpu_ids.
 295 *     If number of permits N < M x nr_cpu_ids finally we will get an IO hang.
 296 *
 297 *     To avoid this hang last caller of rnbd_put_permit() (last caller is the
 298 *     one who observes sess->busy == 0) must wake up all remaining queues.
 299 *
 300 * Context:
 301 *     Does not matter.
 302 */
 303static void rnbd_rerun_all_if_idle(struct rnbd_clt_session *sess)
 304{
 305        bool requeued;
 306
 307        do {
 308                requeued = rnbd_rerun_if_needed(sess);
 309        } while (atomic_read(&sess->busy) == 0 && requeued);
 310}
 311
 312static struct rtrs_permit *rnbd_get_permit(struct rnbd_clt_session *sess,
 313                                             enum rtrs_clt_con_type con_type,
 314                                             enum wait_type wait)
 315{
 316        struct rtrs_permit *permit;
 317
 318        permit = rtrs_clt_get_permit(sess->rtrs, con_type, wait);
 319        if (permit)
 320                /* We have a subtle rare case here, when all permits can be
 321                 * consumed before busy counter increased.  This is safe,
 322                 * because loser will get NULL as a permit, observe 0 busy
 323                 * counter and immediately restart the queue himself.
 324                 */
 325                atomic_inc(&sess->busy);
 326
 327        return permit;
 328}
 329
 330static void rnbd_put_permit(struct rnbd_clt_session *sess,
 331                             struct rtrs_permit *permit)
 332{
 333        rtrs_clt_put_permit(sess->rtrs, permit);
 334        atomic_dec(&sess->busy);
 335        /* Paired with rnbd_clt_dev_add_to_requeue().  Decrement first
 336         * and then check queue bits.
 337         */
 338        smp_mb__after_atomic();
 339        rnbd_rerun_all_if_idle(sess);
 340}
 341
 342static struct rnbd_iu *rnbd_get_iu(struct rnbd_clt_session *sess,
 343                                     enum rtrs_clt_con_type con_type,
 344                                     enum wait_type wait)
 345{
 346        struct rnbd_iu *iu;
 347        struct rtrs_permit *permit;
 348
 349        iu = kzalloc(sizeof(*iu), GFP_KERNEL);
 350        if (!iu)
 351                return NULL;
 352
 353        permit = rnbd_get_permit(sess, con_type, wait);
 354        if (!permit) {
 355                kfree(iu);
 356                return NULL;
 357        }
 358
 359        iu->permit = permit;
 360        /*
 361         * 1st reference is dropped after finishing sending a "user" message,
 362         * 2nd reference is dropped after confirmation with the response is
 363         * returned.
 364         * 1st and 2nd can happen in any order, so the rnbd_iu should be
 365         * released (rtrs_permit returned to rtrs) only after both
 366         * are finished.
 367         */
 368        atomic_set(&iu->refcount, 2);
 369        init_waitqueue_head(&iu->comp.wait);
 370        iu->comp.errno = INT_MAX;
 371
 372        if (sg_alloc_table(&iu->sgt, 1, GFP_KERNEL)) {
 373                rnbd_put_permit(sess, permit);
 374                kfree(iu);
 375                return NULL;
 376        }
 377
 378        return iu;
 379}
 380
 381static void rnbd_put_iu(struct rnbd_clt_session *sess, struct rnbd_iu *iu)
 382{
 383        if (atomic_dec_and_test(&iu->refcount)) {
 384                sg_free_table(&iu->sgt);
 385                rnbd_put_permit(sess, iu->permit);
 386                kfree(iu);
 387        }
 388}
 389
 390static void rnbd_softirq_done_fn(struct request *rq)
 391{
 392        struct rnbd_clt_dev *dev        = rq->q->disk->private_data;
 393        struct rnbd_clt_session *sess   = dev->sess;
 394        struct rnbd_iu *iu;
 395
 396        iu = blk_mq_rq_to_pdu(rq);
 397        sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT);
 398        rnbd_put_permit(sess, iu->permit);
 399        blk_mq_end_request(rq, errno_to_blk_status(iu->errno));
 400}
 401
 402static void msg_io_conf(void *priv, int errno)
 403{
 404        struct rnbd_iu *iu = priv;
 405        struct rnbd_clt_dev *dev = iu->dev;
 406        struct request *rq = iu->rq;
 407        int rw = rq_data_dir(rq);
 408
 409        iu->errno = errno;
 410
 411        blk_mq_complete_request(rq);
 412
 413        if (errno)
 414                rnbd_clt_info_rl(dev, "%s I/O failed with err: %d\n",
 415                                 rw == READ ? "read" : "write", errno);
 416}
 417
 418static void wake_up_iu_comp(struct rnbd_iu *iu, int errno)
 419{
 420        iu->comp.errno = errno;
 421        wake_up(&iu->comp.wait);
 422}
 423
 424static void msg_conf(void *priv, int errno)
 425{
 426        struct rnbd_iu *iu = priv;
 427
 428        iu->errno = errno;
 429        schedule_work(&iu->work);
 430}
 431
 432static int send_usr_msg(struct rtrs_clt_sess *rtrs, int dir,
 433                        struct rnbd_iu *iu, struct kvec *vec,
 434                        size_t len, struct scatterlist *sg, unsigned int sg_len,
 435                        void (*conf)(struct work_struct *work),
 436                        int *errno, int wait)
 437{
 438        int err;
 439        struct rtrs_clt_req_ops req_ops;
 440
 441        INIT_WORK(&iu->work, conf);
 442        req_ops = (struct rtrs_clt_req_ops) {
 443                .priv = iu,
 444                .conf_fn = msg_conf,
 445        };
 446        err = rtrs_clt_request(dir, &req_ops, rtrs, iu->permit,
 447                                vec, 1, len, sg, sg_len);
 448        if (!err && wait) {
 449                wait_event(iu->comp.wait, iu->comp.errno != INT_MAX);
 450                *errno = iu->comp.errno;
 451        } else {
 452                *errno = 0;
 453        }
 454
 455        return err;
 456}
 457
 458static void msg_close_conf(struct work_struct *work)
 459{
 460        struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
 461        struct rnbd_clt_dev *dev = iu->dev;
 462
 463        wake_up_iu_comp(iu, iu->errno);
 464        rnbd_put_iu(dev->sess, iu);
 465        rnbd_clt_put_dev(dev);
 466}
 467
 468static int send_msg_close(struct rnbd_clt_dev *dev, u32 device_id,
 469                          enum wait_type wait)
 470{
 471        struct rnbd_clt_session *sess = dev->sess;
 472        struct rnbd_msg_close msg;
 473        struct rnbd_iu *iu;
 474        struct kvec vec = {
 475                .iov_base = &msg,
 476                .iov_len  = sizeof(msg)
 477        };
 478        int err, errno;
 479
 480        iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
 481        if (!iu)
 482                return -ENOMEM;
 483
 484        iu->buf = NULL;
 485        iu->dev = dev;
 486
 487        msg.hdr.type    = cpu_to_le16(RNBD_MSG_CLOSE);
 488        msg.device_id   = cpu_to_le32(device_id);
 489
 490        WARN_ON(!rnbd_clt_get_dev(dev));
 491        err = send_usr_msg(sess->rtrs, WRITE, iu, &vec, 0, NULL, 0,
 492                           msg_close_conf, &errno, wait);
 493        if (err) {
 494                rnbd_clt_put_dev(dev);
 495                rnbd_put_iu(sess, iu);
 496        } else {
 497                err = errno;
 498        }
 499
 500        rnbd_put_iu(sess, iu);
 501        return err;
 502}
 503
 504static void msg_open_conf(struct work_struct *work)
 505{
 506        struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
 507        struct rnbd_msg_open_rsp *rsp = iu->buf;
 508        struct rnbd_clt_dev *dev = iu->dev;
 509        int errno = iu->errno;
 510
 511        if (errno) {
 512                rnbd_clt_err(dev,
 513                              "Opening failed, server responded: %d\n",
 514                              errno);
 515        } else {
 516                errno = process_msg_open_rsp(dev, rsp);
 517                if (errno) {
 518                        u32 device_id = le32_to_cpu(rsp->device_id);
 519                        /*
 520                         * If server thinks its fine, but we fail to process
 521                         * then be nice and send a close to server.
 522                         */
 523                        send_msg_close(dev, device_id, RTRS_PERMIT_NOWAIT);
 524                }
 525        }
 526        kfree(rsp);
 527        wake_up_iu_comp(iu, errno);
 528        rnbd_put_iu(dev->sess, iu);
 529        rnbd_clt_put_dev(dev);
 530}
 531
 532static void msg_sess_info_conf(struct work_struct *work)
 533{
 534        struct rnbd_iu *iu = container_of(work, struct rnbd_iu, work);
 535        struct rnbd_msg_sess_info_rsp *rsp = iu->buf;
 536        struct rnbd_clt_session *sess = iu->sess;
 537
 538        if (!iu->errno)
 539                sess->ver = min_t(u8, rsp->ver, RNBD_PROTO_VER_MAJOR);
 540
 541        kfree(rsp);
 542        wake_up_iu_comp(iu, iu->errno);
 543        rnbd_put_iu(sess, iu);
 544        rnbd_clt_put_sess(sess);
 545}
 546
 547static int send_msg_open(struct rnbd_clt_dev *dev, enum wait_type wait)
 548{
 549        struct rnbd_clt_session *sess = dev->sess;
 550        struct rnbd_msg_open_rsp *rsp;
 551        struct rnbd_msg_open msg;
 552        struct rnbd_iu *iu;
 553        struct kvec vec = {
 554                .iov_base = &msg,
 555                .iov_len  = sizeof(msg)
 556        };
 557        int err, errno;
 558
 559        rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
 560        if (!rsp)
 561                return -ENOMEM;
 562
 563        iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
 564        if (!iu) {
 565                kfree(rsp);
 566                return -ENOMEM;
 567        }
 568
 569        iu->buf = rsp;
 570        iu->dev = dev;
 571
 572        sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp));
 573
 574        msg.hdr.type    = cpu_to_le16(RNBD_MSG_OPEN);
 575        msg.access_mode = dev->access_mode;
 576        strscpy(msg.dev_name, dev->pathname, sizeof(msg.dev_name));
 577
 578        WARN_ON(!rnbd_clt_get_dev(dev));
 579        err = send_usr_msg(sess->rtrs, READ, iu,
 580                           &vec, sizeof(*rsp), iu->sgt.sgl, 1,
 581                           msg_open_conf, &errno, wait);
 582        if (err) {
 583                rnbd_clt_put_dev(dev);
 584                rnbd_put_iu(sess, iu);
 585                kfree(rsp);
 586        } else {
 587                err = errno;
 588        }
 589
 590        rnbd_put_iu(sess, iu);
 591        return err;
 592}
 593
 594static int send_msg_sess_info(struct rnbd_clt_session *sess, enum wait_type wait)
 595{
 596        struct rnbd_msg_sess_info_rsp *rsp;
 597        struct rnbd_msg_sess_info msg;
 598        struct rnbd_iu *iu;
 599        struct kvec vec = {
 600                .iov_base = &msg,
 601                .iov_len  = sizeof(msg)
 602        };
 603        int err, errno;
 604
 605        rsp = kzalloc(sizeof(*rsp), GFP_KERNEL);
 606        if (!rsp)
 607                return -ENOMEM;
 608
 609        iu = rnbd_get_iu(sess, RTRS_ADMIN_CON, RTRS_PERMIT_WAIT);
 610        if (!iu) {
 611                kfree(rsp);
 612                return -ENOMEM;
 613        }
 614
 615        iu->buf = rsp;
 616        iu->sess = sess;
 617        sg_init_one(iu->sgt.sgl, rsp, sizeof(*rsp));
 618
 619        msg.hdr.type = cpu_to_le16(RNBD_MSG_SESS_INFO);
 620        msg.ver      = RNBD_PROTO_VER_MAJOR;
 621
 622        if (!rnbd_clt_get_sess(sess)) {
 623                /*
 624                 * That can happen only in one case, when RTRS has restablished
 625                 * the connection and link_ev() is called, but session is almost
 626                 * dead, last reference on session is put and caller is waiting
 627                 * for RTRS to close everything.
 628                 */
 629                err = -ENODEV;
 630                goto put_iu;
 631        }
 632        err = send_usr_msg(sess->rtrs, READ, iu,
 633                           &vec, sizeof(*rsp), iu->sgt.sgl, 1,
 634                           msg_sess_info_conf, &errno, wait);
 635        if (err) {
 636                rnbd_clt_put_sess(sess);
 637put_iu:
 638                rnbd_put_iu(sess, iu);
 639                kfree(rsp);
 640        } else {
 641                err = errno;
 642        }
 643        rnbd_put_iu(sess, iu);
 644        return err;
 645}
 646
 647static void set_dev_states_to_disconnected(struct rnbd_clt_session *sess)
 648{
 649        struct rnbd_clt_dev *dev;
 650        struct kobject *gd_kobj;
 651
 652        mutex_lock(&sess->lock);
 653        list_for_each_entry(dev, &sess->devs_list, list) {
 654                rnbd_clt_err(dev, "Device disconnected.\n");
 655
 656                mutex_lock(&dev->lock);
 657                if (dev->dev_state == DEV_STATE_MAPPED) {
 658                        dev->dev_state = DEV_STATE_MAPPED_DISCONNECTED;
 659                        gd_kobj = &disk_to_dev(dev->gd)->kobj;
 660                        kobject_uevent(gd_kobj, KOBJ_OFFLINE);
 661                }
 662                mutex_unlock(&dev->lock);
 663        }
 664        mutex_unlock(&sess->lock);
 665}
 666
 667static void remap_devs(struct rnbd_clt_session *sess)
 668{
 669        struct rnbd_clt_dev *dev;
 670        struct rtrs_attrs attrs;
 671        int err;
 672
 673        /*
 674         * Careful here: we are called from RTRS link event directly,
 675         * thus we can't send any RTRS request and wait for response
 676         * or RTRS will not be able to complete request with failure
 677         * if something goes wrong (failing of outstanding requests
 678         * happens exactly from the context where we are blocking now).
 679         *
 680         * So to avoid deadlocks each usr message sent from here must
 681         * be asynchronous.
 682         */
 683
 684        err = send_msg_sess_info(sess, RTRS_PERMIT_NOWAIT);
 685        if (err) {
 686                pr_err("send_msg_sess_info(\"%s\"): %d\n", sess->sessname, err);
 687                return;
 688        }
 689
 690        err = rtrs_clt_query(sess->rtrs, &attrs);
 691        if (err) {
 692                pr_err("rtrs_clt_query(\"%s\"): %d\n", sess->sessname, err);
 693                return;
 694        }
 695        mutex_lock(&sess->lock);
 696        sess->max_io_size = attrs.max_io_size;
 697
 698        list_for_each_entry(dev, &sess->devs_list, list) {
 699                bool skip;
 700
 701                mutex_lock(&dev->lock);
 702                skip = (dev->dev_state == DEV_STATE_INIT);
 703                mutex_unlock(&dev->lock);
 704                if (skip)
 705                        /*
 706                         * When device is establishing connection for the first
 707                         * time - do not remap, it will be closed soon.
 708                         */
 709                        continue;
 710
 711                rnbd_clt_info(dev, "session reconnected, remapping device\n");
 712                err = send_msg_open(dev, RTRS_PERMIT_NOWAIT);
 713                if (err) {
 714                        rnbd_clt_err(dev, "send_msg_open(): %d\n", err);
 715                        break;
 716                }
 717        }
 718        mutex_unlock(&sess->lock);
 719}
 720
 721static void rnbd_clt_link_ev(void *priv, enum rtrs_clt_link_ev ev)
 722{
 723        struct rnbd_clt_session *sess = priv;
 724
 725        switch (ev) {
 726        case RTRS_CLT_LINK_EV_DISCONNECTED:
 727                set_dev_states_to_disconnected(sess);
 728                break;
 729        case RTRS_CLT_LINK_EV_RECONNECTED:
 730                remap_devs(sess);
 731                break;
 732        default:
 733                pr_err("Unknown session event received (%d), session: %s\n",
 734                       ev, sess->sessname);
 735        }
 736}
 737
 738static void rnbd_init_cpu_qlists(struct rnbd_cpu_qlist __percpu *cpu_queues)
 739{
 740        unsigned int cpu;
 741        struct rnbd_cpu_qlist *cpu_q;
 742
 743        for_each_possible_cpu(cpu) {
 744                cpu_q = per_cpu_ptr(cpu_queues, cpu);
 745
 746                cpu_q->cpu = cpu;
 747                INIT_LIST_HEAD(&cpu_q->requeue_list);
 748                spin_lock_init(&cpu_q->requeue_lock);
 749        }
 750}
 751
 752static void destroy_mq_tags(struct rnbd_clt_session *sess)
 753{
 754        if (sess->tag_set.tags)
 755                blk_mq_free_tag_set(&sess->tag_set);
 756}
 757
 758static inline void wake_up_rtrs_waiters(struct rnbd_clt_session *sess)
 759{
 760        sess->rtrs_ready = true;
 761        wake_up_all(&sess->rtrs_waitq);
 762}
 763
 764static void close_rtrs(struct rnbd_clt_session *sess)
 765{
 766        might_sleep();
 767
 768        if (!IS_ERR_OR_NULL(sess->rtrs)) {
 769                rtrs_clt_close(sess->rtrs);
 770                sess->rtrs = NULL;
 771                wake_up_rtrs_waiters(sess);
 772        }
 773}
 774
 775static void free_sess(struct rnbd_clt_session *sess)
 776{
 777        WARN_ON(!list_empty(&sess->devs_list));
 778
 779        might_sleep();
 780
 781        close_rtrs(sess);
 782        destroy_mq_tags(sess);
 783        if (!list_empty(&sess->list)) {
 784                mutex_lock(&sess_lock);
 785                list_del(&sess->list);
 786                mutex_unlock(&sess_lock);
 787        }
 788        free_percpu(sess->cpu_queues);
 789        free_percpu(sess->cpu_rr);
 790        mutex_destroy(&sess->lock);
 791        kfree(sess);
 792}
 793
 794static struct rnbd_clt_session *alloc_sess(const char *sessname)
 795{
 796        struct rnbd_clt_session *sess;
 797        int err, cpu;
 798
 799        sess = kzalloc_node(sizeof(*sess), GFP_KERNEL, NUMA_NO_NODE);
 800        if (!sess)
 801                return ERR_PTR(-ENOMEM);
 802        strscpy(sess->sessname, sessname, sizeof(sess->sessname));
 803        atomic_set(&sess->busy, 0);
 804        mutex_init(&sess->lock);
 805        INIT_LIST_HEAD(&sess->devs_list);
 806        INIT_LIST_HEAD(&sess->list);
 807        bitmap_zero(sess->cpu_queues_bm, num_possible_cpus());
 808        init_waitqueue_head(&sess->rtrs_waitq);
 809        refcount_set(&sess->refcount, 1);
 810
 811        sess->cpu_queues = alloc_percpu(struct rnbd_cpu_qlist);
 812        if (!sess->cpu_queues) {
 813                err = -ENOMEM;
 814                goto err;
 815        }
 816        rnbd_init_cpu_qlists(sess->cpu_queues);
 817
 818        /*
 819         * That is simple percpu variable which stores cpu indices, which are
 820         * incremented on each access.  We need that for the sake of fairness
 821         * to wake up queues in a round-robin manner.
 822         */
 823        sess->cpu_rr = alloc_percpu(int);
 824        if (!sess->cpu_rr) {
 825                err = -ENOMEM;
 826                goto err;
 827        }
 828        for_each_possible_cpu(cpu)
 829                * per_cpu_ptr(sess->cpu_rr, cpu) = cpu;
 830
 831        return sess;
 832
 833err:
 834        free_sess(sess);
 835
 836        return ERR_PTR(err);
 837}
 838
 839static int wait_for_rtrs_connection(struct rnbd_clt_session *sess)
 840{
 841        wait_event(sess->rtrs_waitq, sess->rtrs_ready);
 842        if (IS_ERR_OR_NULL(sess->rtrs))
 843                return -ECONNRESET;
 844
 845        return 0;
 846}
 847
 848static void wait_for_rtrs_disconnection(struct rnbd_clt_session *sess)
 849        __releases(&sess_lock)
 850        __acquires(&sess_lock)
 851{
 852        DEFINE_WAIT(wait);
 853
 854        prepare_to_wait(&sess->rtrs_waitq, &wait, TASK_UNINTERRUPTIBLE);
 855        if (IS_ERR_OR_NULL(sess->rtrs)) {
 856                finish_wait(&sess->rtrs_waitq, &wait);
 857                return;
 858        }
 859        mutex_unlock(&sess_lock);
 860        /* loop in caller, see __find_and_get_sess().
 861         * You can't leave mutex locked and call schedule(), you will catch a
 862         * deadlock with a caller of free_sess(), which has just put the last
 863         * reference and is about to take the sess_lock in order to delete
 864         * the session from the list.
 865         */
 866        schedule();
 867        mutex_lock(&sess_lock);
 868}
 869
 870static struct rnbd_clt_session *__find_and_get_sess(const char *sessname)
 871        __releases(&sess_lock)
 872        __acquires(&sess_lock)
 873{
 874        struct rnbd_clt_session *sess, *sn;
 875        int err;
 876
 877again:
 878        list_for_each_entry_safe(sess, sn, &sess_list, list) {
 879                if (strcmp(sessname, sess->sessname))
 880                        continue;
 881
 882                if (sess->rtrs_ready && IS_ERR_OR_NULL(sess->rtrs))
 883                        /*
 884                         * No RTRS connection, session is dying.
 885                         */
 886                        continue;
 887
 888                if (rnbd_clt_get_sess(sess)) {
 889                        /*
 890                         * Alive session is found, wait for RTRS connection.
 891                         */
 892                        mutex_unlock(&sess_lock);
 893                        err = wait_for_rtrs_connection(sess);
 894                        if (err)
 895                                rnbd_clt_put_sess(sess);
 896                        mutex_lock(&sess_lock);
 897
 898                        if (err)
 899                                /* Session is dying, repeat the loop */
 900                                goto again;
 901
 902                        return sess;
 903                }
 904                /*
 905                 * Ref is 0, session is dying, wait for RTRS disconnect
 906                 * in order to avoid session names clashes.
 907                 */
 908                wait_for_rtrs_disconnection(sess);
 909                /*
 910                 * RTRS is disconnected and soon session will be freed,
 911                 * so repeat a loop.
 912                 */
 913                goto again;
 914        }
 915
 916        return NULL;
 917}
 918
 919/* caller is responsible for initializing 'first' to false */
 920static struct
 921rnbd_clt_session *find_or_create_sess(const char *sessname, bool *first)
 922{
 923        struct rnbd_clt_session *sess = NULL;
 924
 925        mutex_lock(&sess_lock);
 926        sess = __find_and_get_sess(sessname);
 927        if (!sess) {
 928                sess = alloc_sess(sessname);
 929                if (IS_ERR(sess)) {
 930                        mutex_unlock(&sess_lock);
 931                        return sess;
 932                }
 933                list_add(&sess->list, &sess_list);
 934                *first = true;
 935        }
 936        mutex_unlock(&sess_lock);
 937
 938        return sess;
 939}
 940
 941static int rnbd_client_open(struct block_device *block_device, fmode_t mode)
 942{
 943        struct rnbd_clt_dev *dev = block_device->bd_disk->private_data;
 944
 945        if (dev->read_only && (mode & FMODE_WRITE))
 946                return -EPERM;
 947
 948        if (dev->dev_state == DEV_STATE_UNMAPPED ||
 949            !rnbd_clt_get_dev(dev))
 950                return -EIO;
 951
 952        return 0;
 953}
 954
 955static void rnbd_client_release(struct gendisk *gen, fmode_t mode)
 956{
 957        struct rnbd_clt_dev *dev = gen->private_data;
 958
 959        rnbd_clt_put_dev(dev);
 960}
 961
 962static int rnbd_client_getgeo(struct block_device *block_device,
 963                              struct hd_geometry *geo)
 964{
 965        u64 size;
 966        struct rnbd_clt_dev *dev;
 967
 968        dev = block_device->bd_disk->private_data;
 969        size = dev->size * (dev->logical_block_size / SECTOR_SIZE);
 970        geo->cylinders  = size >> 6;    /* size/64 */
 971        geo->heads      = 4;
 972        geo->sectors    = 16;
 973        geo->start      = 0;
 974
 975        return 0;
 976}
 977
 978static const struct block_device_operations rnbd_client_ops = {
 979        .owner          = THIS_MODULE,
 980        .open           = rnbd_client_open,
 981        .release        = rnbd_client_release,
 982        .getgeo         = rnbd_client_getgeo
 983};
 984
 985/* The amount of data that belongs to an I/O and the amount of data that
 986 * should be read or written to the disk (bi_size) can differ.
 987 *
 988 * E.g. When WRITE_SAME is used, only a small amount of data is
 989 * transferred that is then written repeatedly over a lot of sectors.
 990 *
 991 * Get the size of data to be transferred via RTRS by summing up the size
 992 * of the scather-gather list entries.
 993 */
 994static size_t rnbd_clt_get_sg_size(struct scatterlist *sglist, u32 len)
 995{
 996        struct scatterlist *sg;
 997        size_t tsize = 0;
 998        int i;
 999
1000        for_each_sg(sglist, sg, len, i)
1001                tsize += sg->length;
1002        return tsize;
1003}
1004
1005static int rnbd_client_xfer_request(struct rnbd_clt_dev *dev,
1006                                     struct request *rq,
1007                                     struct rnbd_iu *iu)
1008{
1009        struct rtrs_clt_sess *rtrs = dev->sess->rtrs;
1010        struct rtrs_permit *permit = iu->permit;
1011        struct rnbd_msg_io msg;
1012        struct rtrs_clt_req_ops req_ops;
1013        unsigned int sg_cnt = 0;
1014        struct kvec vec;
1015        size_t size;
1016        int err;
1017
1018        iu->rq          = rq;
1019        iu->dev         = dev;
1020        msg.sector      = cpu_to_le64(blk_rq_pos(rq));
1021        msg.bi_size     = cpu_to_le32(blk_rq_bytes(rq));
1022        msg.rw          = cpu_to_le32(rq_to_rnbd_flags(rq));
1023        msg.prio        = cpu_to_le16(req_get_ioprio(rq));
1024
1025        /*
1026         * We only support discards with single segment for now.
1027         * See queue limits.
1028         */
1029        if (req_op(rq) != REQ_OP_DISCARD)
1030                sg_cnt = blk_rq_map_sg(dev->queue, rq, iu->sgt.sgl);
1031
1032        if (sg_cnt == 0)
1033                sg_mark_end(&iu->sgt.sgl[0]);
1034
1035        msg.hdr.type    = cpu_to_le16(RNBD_MSG_IO);
1036        msg.device_id   = cpu_to_le32(dev->device_id);
1037
1038        vec = (struct kvec) {
1039                .iov_base = &msg,
1040                .iov_len  = sizeof(msg)
1041        };
1042        size = rnbd_clt_get_sg_size(iu->sgt.sgl, sg_cnt);
1043        req_ops = (struct rtrs_clt_req_ops) {
1044                .priv = iu,
1045                .conf_fn = msg_io_conf,
1046        };
1047        err = rtrs_clt_request(rq_data_dir(rq), &req_ops, rtrs, permit,
1048                               &vec, 1, size, iu->sgt.sgl, sg_cnt);
1049        if (err) {
1050                rnbd_clt_err_rl(dev, "RTRS failed to transfer IO, err: %d\n",
1051                                 err);
1052                return err;
1053        }
1054
1055        return 0;
1056}
1057
1058/**
1059 * rnbd_clt_dev_add_to_requeue() - add device to requeue if session is busy
1060 * @dev:        Device to be checked
1061 * @q:          Queue to be added to the requeue list if required
1062 *
1063 * Description:
1064 *     If session is busy, that means someone will requeue us when resources
1065 *     are freed.  If session is not doing anything - device is not added to
1066 *     the list and @false is returned.
1067 */
1068static bool rnbd_clt_dev_add_to_requeue(struct rnbd_clt_dev *dev,
1069                                                struct rnbd_queue *q)
1070{
1071        struct rnbd_clt_session *sess = dev->sess;
1072        struct rnbd_cpu_qlist *cpu_q;
1073        unsigned long flags;
1074        bool added = true;
1075        bool need_set;
1076
1077        cpu_q = get_cpu_ptr(sess->cpu_queues);
1078        spin_lock_irqsave(&cpu_q->requeue_lock, flags);
1079
1080        if (!test_and_set_bit_lock(0, &q->in_list)) {
1081                if (WARN_ON(!list_empty(&q->requeue_list)))
1082                        goto unlock;
1083
1084                need_set = !test_bit(cpu_q->cpu, sess->cpu_queues_bm);
1085                if (need_set) {
1086                        set_bit(cpu_q->cpu, sess->cpu_queues_bm);
1087                        /* Paired with rnbd_put_permit(). Set a bit first
1088                         * and then observe the busy counter.
1089                         */
1090                        smp_mb__before_atomic();
1091                }
1092                if (atomic_read(&sess->busy)) {
1093                        list_add_tail(&q->requeue_list, &cpu_q->requeue_list);
1094                } else {
1095                        /* Very unlikely, but possible: busy counter was
1096                         * observed as zero.  Drop all bits and return
1097                         * false to restart the queue by ourselves.
1098                         */
1099                        if (need_set)
1100                                clear_bit(cpu_q->cpu, sess->cpu_queues_bm);
1101                        clear_bit_unlock(0, &q->in_list);
1102                        added = false;
1103                }
1104        }
1105unlock:
1106        spin_unlock_irqrestore(&cpu_q->requeue_lock, flags);
1107        put_cpu_ptr(sess->cpu_queues);
1108
1109        return added;
1110}
1111
1112static void rnbd_clt_dev_kick_mq_queue(struct rnbd_clt_dev *dev,
1113                                        struct blk_mq_hw_ctx *hctx,
1114                                        int delay)
1115{
1116        struct rnbd_queue *q = hctx->driver_data;
1117
1118        if (delay != RNBD_DELAY_IFBUSY)
1119                blk_mq_delay_run_hw_queue(hctx, delay);
1120        else if (!rnbd_clt_dev_add_to_requeue(dev, q))
1121                /*
1122                 * If session is not busy we have to restart
1123                 * the queue ourselves.
1124                 */
1125                blk_mq_delay_run_hw_queue(hctx, 10/*ms*/);
1126}
1127
1128static blk_status_t rnbd_queue_rq(struct blk_mq_hw_ctx *hctx,
1129                                   const struct blk_mq_queue_data *bd)
1130{
1131        struct request *rq = bd->rq;
1132        struct rnbd_clt_dev *dev = rq->q->disk->private_data;
1133        struct rnbd_iu *iu = blk_mq_rq_to_pdu(rq);
1134        int err;
1135        blk_status_t ret = BLK_STS_IOERR;
1136
1137        if (dev->dev_state != DEV_STATE_MAPPED)
1138                return BLK_STS_IOERR;
1139
1140        iu->permit = rnbd_get_permit(dev->sess, RTRS_IO_CON,
1141                                      RTRS_PERMIT_NOWAIT);
1142        if (!iu->permit) {
1143                rnbd_clt_dev_kick_mq_queue(dev, hctx, RNBD_DELAY_IFBUSY);
1144                return BLK_STS_RESOURCE;
1145        }
1146
1147        iu->sgt.sgl = iu->first_sgl;
1148        err = sg_alloc_table_chained(&iu->sgt,
1149                                     /* Even-if the request has no segment,
1150                                      * sglist must have one entry at least.
1151                                      */
1152                                     blk_rq_nr_phys_segments(rq) ? : 1,
1153                                     iu->sgt.sgl,
1154                                     RNBD_INLINE_SG_CNT);
1155        if (err) {
1156                rnbd_clt_err_rl(dev, "sg_alloc_table_chained ret=%d\n", err);
1157                rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/);
1158                rnbd_put_permit(dev->sess, iu->permit);
1159                return BLK_STS_RESOURCE;
1160        }
1161
1162        blk_mq_start_request(rq);
1163        err = rnbd_client_xfer_request(dev, rq, iu);
1164        if (err == 0)
1165                return BLK_STS_OK;
1166        if (err == -EAGAIN || err == -ENOMEM) {
1167                rnbd_clt_dev_kick_mq_queue(dev, hctx, 10/*ms*/);
1168                ret = BLK_STS_RESOURCE;
1169        }
1170        sg_free_table_chained(&iu->sgt, RNBD_INLINE_SG_CNT);
1171        rnbd_put_permit(dev->sess, iu->permit);
1172        return ret;
1173}
1174
1175static int rnbd_rdma_poll(struct blk_mq_hw_ctx *hctx, struct io_comp_batch *iob)
1176{
1177        struct rnbd_queue *q = hctx->driver_data;
1178        struct rnbd_clt_dev *dev = q->dev;
1179        int cnt;
1180
1181        cnt = rtrs_clt_rdma_cq_direct(dev->sess->rtrs, hctx->queue_num);
1182        return cnt;
1183}
1184
1185static int rnbd_rdma_map_queues(struct blk_mq_tag_set *set)
1186{
1187        struct rnbd_clt_session *sess = set->driver_data;
1188
1189        /* shared read/write queues */
1190        set->map[HCTX_TYPE_DEFAULT].nr_queues = num_online_cpus();
1191        set->map[HCTX_TYPE_DEFAULT].queue_offset = 0;
1192        set->map[HCTX_TYPE_READ].nr_queues = num_online_cpus();
1193        set->map[HCTX_TYPE_READ].queue_offset = 0;
1194        blk_mq_map_queues(&set->map[HCTX_TYPE_DEFAULT]);
1195        blk_mq_map_queues(&set->map[HCTX_TYPE_READ]);
1196
1197        if (sess->nr_poll_queues) {
1198                /* dedicated queue for poll */
1199                set->map[HCTX_TYPE_POLL].nr_queues = sess->nr_poll_queues;
1200                set->map[HCTX_TYPE_POLL].queue_offset = set->map[HCTX_TYPE_READ].queue_offset +
1201                        set->map[HCTX_TYPE_READ].nr_queues;
1202                blk_mq_map_queues(&set->map[HCTX_TYPE_POLL]);
1203                pr_info("[session=%s] mapped %d/%d/%d default/read/poll queues.\n",
1204                        sess->sessname,
1205                        set->map[HCTX_TYPE_DEFAULT].nr_queues,
1206                        set->map[HCTX_TYPE_READ].nr_queues,
1207                        set->map[HCTX_TYPE_POLL].nr_queues);
1208        } else {
1209                pr_info("[session=%s] mapped %d/%d default/read queues.\n",
1210                        sess->sessname,
1211                        set->map[HCTX_TYPE_DEFAULT].nr_queues,
1212                        set->map[HCTX_TYPE_READ].nr_queues);
1213        }
1214
1215        return 0;
1216}
1217
1218static struct blk_mq_ops rnbd_mq_ops = {
1219        .queue_rq       = rnbd_queue_rq,
1220        .complete       = rnbd_softirq_done_fn,
1221        .map_queues     = rnbd_rdma_map_queues,
1222        .poll           = rnbd_rdma_poll,
1223};
1224
1225static int setup_mq_tags(struct rnbd_clt_session *sess)
1226{
1227        struct blk_mq_tag_set *tag_set = &sess->tag_set;
1228
1229        memset(tag_set, 0, sizeof(*tag_set));
1230        tag_set->ops            = &rnbd_mq_ops;
1231        tag_set->queue_depth    = sess->queue_depth;
1232        tag_set->numa_node              = NUMA_NO_NODE;
1233        tag_set->flags          = BLK_MQ_F_SHOULD_MERGE |
1234                                  BLK_MQ_F_TAG_QUEUE_SHARED;
1235        tag_set->cmd_size       = sizeof(struct rnbd_iu) + RNBD_RDMA_SGL_SIZE;
1236
1237        /* for HCTX_TYPE_DEFAULT, HCTX_TYPE_READ, HCTX_TYPE_POLL */
1238        tag_set->nr_maps        = sess->nr_poll_queues ? HCTX_MAX_TYPES : 2;
1239        /*
1240         * HCTX_TYPE_DEFAULT and HCTX_TYPE_READ share one set of queues
1241         * others are for HCTX_TYPE_POLL
1242         */
1243        tag_set->nr_hw_queues   = num_online_cpus() + sess->nr_poll_queues;
1244        tag_set->driver_data    = sess;
1245
1246        return blk_mq_alloc_tag_set(tag_set);
1247}
1248
1249static struct rnbd_clt_session *
1250find_and_get_or_create_sess(const char *sessname,
1251                            const struct rtrs_addr *paths,
1252                            size_t path_cnt, u16 port_nr, u32 nr_poll_queues)
1253{
1254        struct rnbd_clt_session *sess;
1255        struct rtrs_attrs attrs;
1256        int err;
1257        bool first = false;
1258        struct rtrs_clt_ops rtrs_ops;
1259
1260        sess = find_or_create_sess(sessname, &first);
1261        if (sess == ERR_PTR(-ENOMEM)) {
1262                return ERR_PTR(-ENOMEM);
1263        } else if ((nr_poll_queues && !first) ||  (!nr_poll_queues && sess->nr_poll_queues)) {
1264                /*
1265                 * A device MUST have its own session to use the polling-mode.
1266                 * It must fail to map new device with the same session.
1267                 */
1268                err = -EINVAL;
1269                goto put_sess;
1270        }
1271
1272        if (!first)
1273                return sess;
1274
1275        if (!path_cnt) {
1276                pr_err("Session %s not found, and path parameter not given", sessname);
1277                err = -ENXIO;
1278                goto put_sess;
1279        }
1280
1281        rtrs_ops = (struct rtrs_clt_ops) {
1282                .priv = sess,
1283                .link_ev = rnbd_clt_link_ev,
1284        };
1285        /*
1286         * Nothing was found, establish rtrs connection and proceed further.
1287         */
1288        sess->rtrs = rtrs_clt_open(&rtrs_ops, sessname,
1289                                   paths, path_cnt, port_nr,
1290                                   0, /* Do not use pdu of rtrs */
1291                                   RECONNECT_DELAY,
1292                                   MAX_RECONNECTS, nr_poll_queues);
1293        if (IS_ERR(sess->rtrs)) {
1294                err = PTR_ERR(sess->rtrs);
1295                goto wake_up_and_put;
1296        }
1297
1298        err = rtrs_clt_query(sess->rtrs, &attrs);
1299        if (err)
1300                goto close_rtrs;
1301
1302        sess->max_io_size = attrs.max_io_size;
1303        sess->queue_depth = attrs.queue_depth;
1304        sess->nr_poll_queues = nr_poll_queues;
1305        sess->max_segments = attrs.max_segments;
1306
1307        err = setup_mq_tags(sess);
1308        if (err)
1309                goto close_rtrs;
1310
1311        err = send_msg_sess_info(sess, RTRS_PERMIT_WAIT);
1312        if (err)
1313                goto close_rtrs;
1314
1315        wake_up_rtrs_waiters(sess);
1316
1317        return sess;
1318
1319close_rtrs:
1320        close_rtrs(sess);
1321put_sess:
1322        rnbd_clt_put_sess(sess);
1323
1324        return ERR_PTR(err);
1325
1326wake_up_and_put:
1327        wake_up_rtrs_waiters(sess);
1328        goto put_sess;
1329}
1330
1331static inline void rnbd_init_hw_queue(struct rnbd_clt_dev *dev,
1332                                       struct rnbd_queue *q,
1333                                       struct blk_mq_hw_ctx *hctx)
1334{
1335        INIT_LIST_HEAD(&q->requeue_list);
1336        q->dev  = dev;
1337        q->hctx = hctx;
1338}
1339
1340static void rnbd_init_mq_hw_queues(struct rnbd_clt_dev *dev)
1341{
1342        unsigned long i;
1343        struct blk_mq_hw_ctx *hctx;
1344        struct rnbd_queue *q;
1345
1346        queue_for_each_hw_ctx(dev->queue, hctx, i) {
1347                q = &dev->hw_queues[i];
1348                rnbd_init_hw_queue(dev, q, hctx);
1349                hctx->driver_data = q;
1350        }
1351}
1352
1353static void setup_request_queue(struct rnbd_clt_dev *dev)
1354{
1355        blk_queue_logical_block_size(dev->queue, dev->logical_block_size);
1356        blk_queue_physical_block_size(dev->queue, dev->physical_block_size);
1357        blk_queue_max_hw_sectors(dev->queue, dev->max_hw_sectors);
1358
1359        /*
1360         * we don't support discards to "discontiguous" segments
1361         * in on request
1362         */
1363        blk_queue_max_discard_segments(dev->queue, 1);
1364
1365        blk_queue_max_discard_sectors(dev->queue, dev->max_discard_sectors);
1366        dev->queue->limits.discard_granularity  = dev->discard_granularity;
1367        dev->queue->limits.discard_alignment    = dev->discard_alignment;
1368        if (dev->secure_discard)
1369                blk_queue_max_secure_erase_sectors(dev->queue,
1370                                dev->max_discard_sectors);
1371        blk_queue_flag_set(QUEUE_FLAG_SAME_COMP, dev->queue);
1372        blk_queue_flag_set(QUEUE_FLAG_SAME_FORCE, dev->queue);
1373        blk_queue_max_segments(dev->queue, dev->max_segments);
1374        blk_queue_io_opt(dev->queue, dev->sess->max_io_size);
1375        blk_queue_virt_boundary(dev->queue, SZ_4K - 1);
1376        blk_queue_write_cache(dev->queue, dev->wc, dev->fua);
1377}
1378
1379static int rnbd_clt_setup_gen_disk(struct rnbd_clt_dev *dev, int idx)
1380{
1381        int err;
1382
1383        dev->gd->major          = rnbd_client_major;
1384        dev->gd->first_minor    = idx << RNBD_PART_BITS;
1385        dev->gd->minors         = 1 << RNBD_PART_BITS;
1386        dev->gd->fops           = &rnbd_client_ops;
1387        dev->gd->queue          = dev->queue;
1388        dev->gd->private_data   = dev;
1389        snprintf(dev->gd->disk_name, sizeof(dev->gd->disk_name), "rnbd%d",
1390                 idx);
1391        pr_debug("disk_name=%s, capacity=%zu\n",
1392                 dev->gd->disk_name,
1393                 dev->nsectors * (dev->logical_block_size / SECTOR_SIZE)
1394                 );
1395
1396        set_capacity(dev->gd, dev->nsectors);
1397
1398        if (dev->access_mode == RNBD_ACCESS_RO) {
1399                dev->read_only = true;
1400                set_disk_ro(dev->gd, true);
1401        } else {
1402                dev->read_only = false;
1403        }
1404
1405        /*
1406         * Network device does not need rotational
1407         */
1408        blk_queue_flag_set(QUEUE_FLAG_NONROT, dev->queue);
1409        err = add_disk(dev->gd);
1410        if (err)
1411                blk_cleanup_disk(dev->gd);
1412
1413        return err;
1414}
1415
1416static int rnbd_client_setup_device(struct rnbd_clt_dev *dev)
1417{
1418        int idx = dev->clt_device_id;
1419
1420        dev->size = dev->nsectors * dev->logical_block_size;
1421
1422        dev->gd = blk_mq_alloc_disk(&dev->sess->tag_set, dev);
1423        if (IS_ERR(dev->gd))
1424                return PTR_ERR(dev->gd);
1425        dev->queue = dev->gd->queue;
1426        rnbd_init_mq_hw_queues(dev);
1427
1428        setup_request_queue(dev);
1429        return rnbd_clt_setup_gen_disk(dev, idx);
1430}
1431
1432static struct rnbd_clt_dev *init_dev(struct rnbd_clt_session *sess,
1433                                      enum rnbd_access_mode access_mode,
1434                                      const char *pathname,
1435                                      u32 nr_poll_queues)
1436{
1437        struct rnbd_clt_dev *dev;
1438        int ret;
1439
1440        dev = kzalloc_node(sizeof(*dev), GFP_KERNEL, NUMA_NO_NODE);
1441        if (!dev)
1442                return ERR_PTR(-ENOMEM);
1443
1444        /*
1445         * nr_cpu_ids: the number of softirq queues
1446         * nr_poll_queues: the number of polling queues
1447         */
1448        dev->hw_queues = kcalloc(nr_cpu_ids + nr_poll_queues,
1449                                 sizeof(*dev->hw_queues),
1450                                 GFP_KERNEL);
1451        if (!dev->hw_queues) {
1452                ret = -ENOMEM;
1453                goto out_alloc;
1454        }
1455
1456        ret = ida_alloc_max(&index_ida, 1 << (MINORBITS - RNBD_PART_BITS),
1457                            GFP_KERNEL);
1458        if (ret < 0) {
1459                pr_err("Failed to initialize device '%s' from session %s, allocating idr failed, err: %d\n",
1460                       pathname, sess->sessname, ret);
1461                goto out_queues;
1462        }
1463
1464        dev->pathname = kstrdup(pathname, GFP_KERNEL);
1465        if (!dev->pathname) {
1466                ret = -ENOMEM;
1467                goto out_queues;
1468        }
1469
1470        dev->clt_device_id      = ret;
1471        dev->sess               = sess;
1472        dev->access_mode        = access_mode;
1473        dev->nr_poll_queues     = nr_poll_queues;
1474        mutex_init(&dev->lock);
1475        refcount_set(&dev->refcount, 1);
1476        dev->dev_state = DEV_STATE_INIT;
1477
1478        /*
1479         * Here we called from sysfs entry, thus clt-sysfs is
1480         * responsible that session will not disappear.
1481         */
1482        WARN_ON(!rnbd_clt_get_sess(sess));
1483
1484        return dev;
1485
1486out_queues:
1487        kfree(dev->hw_queues);
1488out_alloc:
1489        kfree(dev);
1490        return ERR_PTR(ret);
1491}
1492
1493static bool __exists_dev(const char *pathname, const char *sessname)
1494{
1495        struct rnbd_clt_session *sess;
1496        struct rnbd_clt_dev *dev;
1497        bool found = false;
1498
1499        list_for_each_entry(sess, &sess_list, list) {
1500                if (sessname && strncmp(sess->sessname, sessname,
1501                                        sizeof(sess->sessname)))
1502                        continue;
1503                mutex_lock(&sess->lock);
1504                list_for_each_entry(dev, &sess->devs_list, list) {
1505                        if (strlen(dev->pathname) == strlen(pathname) &&
1506                            !strcmp(dev->pathname, pathname)) {
1507                                found = true;
1508                                break;
1509                        }
1510                }
1511                mutex_unlock(&sess->lock);
1512                if (found)
1513                        break;
1514        }
1515
1516        return found;
1517}
1518
1519static bool exists_devpath(const char *pathname, const char *sessname)
1520{
1521        bool found;
1522
1523        mutex_lock(&sess_lock);
1524        found = __exists_dev(pathname, sessname);
1525        mutex_unlock(&sess_lock);
1526
1527        return found;
1528}
1529
1530static bool insert_dev_if_not_exists_devpath(struct rnbd_clt_dev *dev)
1531{
1532        bool found;
1533        struct rnbd_clt_session *sess = dev->sess;
1534
1535        mutex_lock(&sess_lock);
1536        found = __exists_dev(dev->pathname, sess->sessname);
1537        if (!found) {
1538                mutex_lock(&sess->lock);
1539                list_add_tail(&dev->list, &sess->devs_list);
1540                mutex_unlock(&sess->lock);
1541        }
1542        mutex_unlock(&sess_lock);
1543
1544        return found;
1545}
1546
1547static void delete_dev(struct rnbd_clt_dev *dev)
1548{
1549        struct rnbd_clt_session *sess = dev->sess;
1550
1551        mutex_lock(&sess->lock);
1552        list_del(&dev->list);
1553        mutex_unlock(&sess->lock);
1554}
1555
1556struct rnbd_clt_dev *rnbd_clt_map_device(const char *sessname,
1557                                           struct rtrs_addr *paths,
1558                                           size_t path_cnt, u16 port_nr,
1559                                           const char *pathname,
1560                                           enum rnbd_access_mode access_mode,
1561                                           u32 nr_poll_queues)
1562{
1563        struct rnbd_clt_session *sess;
1564        struct rnbd_clt_dev *dev;
1565        int ret;
1566
1567        if (exists_devpath(pathname, sessname))
1568                return ERR_PTR(-EEXIST);
1569
1570        sess = find_and_get_or_create_sess(sessname, paths, path_cnt, port_nr, nr_poll_queues);
1571        if (IS_ERR(sess))
1572                return ERR_CAST(sess);
1573
1574        dev = init_dev(sess, access_mode, pathname, nr_poll_queues);
1575        if (IS_ERR(dev)) {
1576                pr_err("map_device: failed to map device '%s' from session %s, can't initialize device, err: %ld\n",
1577                       pathname, sess->sessname, PTR_ERR(dev));
1578                ret = PTR_ERR(dev);
1579                goto put_sess;
1580        }
1581        if (insert_dev_if_not_exists_devpath(dev)) {
1582                ret = -EEXIST;
1583                goto put_dev;
1584        }
1585        ret = send_msg_open(dev, RTRS_PERMIT_WAIT);
1586        if (ret) {
1587                rnbd_clt_err(dev,
1588                              "map_device: failed, can't open remote device, err: %d\n",
1589                              ret);
1590                goto del_dev;
1591        }
1592        mutex_lock(&dev->lock);
1593        pr_debug("Opened remote device: session=%s, path='%s'\n",
1594                 sess->sessname, pathname);
1595        ret = rnbd_client_setup_device(dev);
1596        if (ret) {
1597                rnbd_clt_err(dev,
1598                              "map_device: Failed to configure device, err: %d\n",
1599                              ret);
1600                mutex_unlock(&dev->lock);
1601                goto send_close;
1602        }
1603
1604        rnbd_clt_info(dev,
1605                       "map_device: Device mapped as %s (nsectors: %zu, logical_block_size: %d, physical_block_size: %d, max_discard_sectors: %d, discard_granularity: %d, discard_alignment: %d, secure_discard: %d, max_segments: %d, max_hw_sectors: %d, wc: %d, fua: %d)\n",
1606                       dev->gd->disk_name, dev->nsectors,
1607                       dev->logical_block_size, dev->physical_block_size,
1608                       dev->max_discard_sectors,
1609                       dev->discard_granularity, dev->discard_alignment,
1610                       dev->secure_discard, dev->max_segments,
1611                       dev->max_hw_sectors, dev->wc, dev->fua);
1612
1613        mutex_unlock(&dev->lock);
1614        rnbd_clt_put_sess(sess);
1615
1616        return dev;
1617
1618send_close:
1619        send_msg_close(dev, dev->device_id, RTRS_PERMIT_WAIT);
1620del_dev:
1621        delete_dev(dev);
1622put_dev:
1623        rnbd_clt_put_dev(dev);
1624put_sess:
1625        rnbd_clt_put_sess(sess);
1626
1627        return ERR_PTR(ret);
1628}
1629
1630static void destroy_gen_disk(struct rnbd_clt_dev *dev)
1631{
1632        del_gendisk(dev->gd);
1633        blk_cleanup_disk(dev->gd);
1634}
1635
1636static void destroy_sysfs(struct rnbd_clt_dev *dev,
1637                          const struct attribute *sysfs_self)
1638{
1639        rnbd_clt_remove_dev_symlink(dev);
1640        if (dev->kobj.state_initialized) {
1641                if (sysfs_self)
1642                        /* To avoid deadlock firstly remove itself */
1643                        sysfs_remove_file_self(&dev->kobj, sysfs_self);
1644                kobject_del(&dev->kobj);
1645                kobject_put(&dev->kobj);
1646        }
1647}
1648
1649int rnbd_clt_unmap_device(struct rnbd_clt_dev *dev, bool force,
1650                           const struct attribute *sysfs_self)
1651{
1652        struct rnbd_clt_session *sess = dev->sess;
1653        int refcount, ret = 0;
1654        bool was_mapped;
1655
1656        mutex_lock(&dev->lock);
1657        if (dev->dev_state == DEV_STATE_UNMAPPED) {
1658                rnbd_clt_info(dev, "Device is already being unmapped\n");
1659                ret = -EALREADY;
1660                goto err;
1661        }
1662        refcount = refcount_read(&dev->refcount);
1663        if (!force && refcount > 1) {
1664                rnbd_clt_err(dev,
1665                              "Closing device failed, device is in use, (%d device users)\n",
1666                              refcount - 1);
1667                ret = -EBUSY;
1668                goto err;
1669        }
1670        was_mapped = (dev->dev_state == DEV_STATE_MAPPED);
1671        dev->dev_state = DEV_STATE_UNMAPPED;
1672        mutex_unlock(&dev->lock);
1673
1674        delete_dev(dev);
1675        destroy_sysfs(dev, sysfs_self);
1676        destroy_gen_disk(dev);
1677        if (was_mapped && sess->rtrs)
1678                send_msg_close(dev, dev->device_id, RTRS_PERMIT_WAIT);
1679
1680        rnbd_clt_info(dev, "Device is unmapped\n");
1681
1682        /* Likely last reference put */
1683        rnbd_clt_put_dev(dev);
1684
1685        /*
1686         * Here device and session can be vanished!
1687         */
1688
1689        return 0;
1690err:
1691        mutex_unlock(&dev->lock);
1692
1693        return ret;
1694}
1695
1696int rnbd_clt_remap_device(struct rnbd_clt_dev *dev)
1697{
1698        int err;
1699
1700        mutex_lock(&dev->lock);
1701        if (dev->dev_state == DEV_STATE_MAPPED_DISCONNECTED)
1702                err = 0;
1703        else if (dev->dev_state == DEV_STATE_UNMAPPED)
1704                err = -ENODEV;
1705        else if (dev->dev_state == DEV_STATE_MAPPED)
1706                err = -EALREADY;
1707        else
1708                err = -EBUSY;
1709        mutex_unlock(&dev->lock);
1710        if (!err) {
1711                rnbd_clt_info(dev, "Remapping device.\n");
1712                err = send_msg_open(dev, RTRS_PERMIT_WAIT);
1713                if (err)
1714                        rnbd_clt_err(dev, "remap_device: %d\n", err);
1715        }
1716
1717        return err;
1718}
1719
1720static void unmap_device_work(struct work_struct *work)
1721{
1722        struct rnbd_clt_dev *dev;
1723
1724        dev = container_of(work, typeof(*dev), unmap_on_rmmod_work);
1725        rnbd_clt_unmap_device(dev, true, NULL);
1726}
1727
1728static void rnbd_destroy_sessions(void)
1729{
1730        struct rnbd_clt_session *sess, *sn;
1731        struct rnbd_clt_dev *dev, *tn;
1732
1733        /* Firstly forbid access through sysfs interface */
1734        rnbd_clt_destroy_sysfs_files();
1735
1736        /*
1737         * Here at this point there is no any concurrent access to sessions
1738         * list and devices list:
1739         *   1. New session or device can't be created - session sysfs files
1740         *      are removed.
1741         *   2. Device or session can't be removed - module reference is taken
1742         *      into account in unmap device sysfs callback.
1743         *   3. No IO requests inflight - each file open of block_dev increases
1744         *      module reference in get_disk().
1745         *
1746         * But still there can be user requests inflights, which are sent by
1747         * asynchronous send_msg_*() functions, thus before unmapping devices
1748         * RTRS session must be explicitly closed.
1749         */
1750
1751        list_for_each_entry_safe(sess, sn, &sess_list, list) {
1752                if (!rnbd_clt_get_sess(sess))
1753                        continue;
1754                close_rtrs(sess);
1755                list_for_each_entry_safe(dev, tn, &sess->devs_list, list) {
1756                        /*
1757                         * Here unmap happens in parallel for only one reason:
1758                         * blk_cleanup_queue() takes around half a second, so
1759                         * on huge amount of devices the whole module unload
1760                         * procedure takes minutes.
1761                         */
1762                        INIT_WORK(&dev->unmap_on_rmmod_work, unmap_device_work);
1763                        queue_work(rnbd_clt_wq, &dev->unmap_on_rmmod_work);
1764                }
1765                rnbd_clt_put_sess(sess);
1766        }
1767        /* Wait for all scheduled unmap works */
1768        flush_workqueue(rnbd_clt_wq);
1769        WARN_ON(!list_empty(&sess_list));
1770}
1771
1772static int __init rnbd_client_init(void)
1773{
1774        int err = 0;
1775
1776        BUILD_BUG_ON(sizeof(struct rnbd_msg_hdr) != 4);
1777        BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info) != 36);
1778        BUILD_BUG_ON(sizeof(struct rnbd_msg_sess_info_rsp) != 36);
1779        BUILD_BUG_ON(sizeof(struct rnbd_msg_open) != 264);
1780        BUILD_BUG_ON(sizeof(struct rnbd_msg_close) != 8);
1781        BUILD_BUG_ON(sizeof(struct rnbd_msg_open_rsp) != 56);
1782        rnbd_client_major = register_blkdev(rnbd_client_major, "rnbd");
1783        if (rnbd_client_major <= 0) {
1784                pr_err("Failed to load module, block device registration failed\n");
1785                return -EBUSY;
1786        }
1787
1788        err = rnbd_clt_create_sysfs_files();
1789        if (err) {
1790                pr_err("Failed to load module, creating sysfs device files failed, err: %d\n",
1791                       err);
1792                unregister_blkdev(rnbd_client_major, "rnbd");
1793                return err;
1794        }
1795        rnbd_clt_wq = alloc_workqueue("rnbd_clt_wq", 0, 0);
1796        if (!rnbd_clt_wq) {
1797                pr_err("Failed to load module, alloc_workqueue failed.\n");
1798                rnbd_clt_destroy_sysfs_files();
1799                unregister_blkdev(rnbd_client_major, "rnbd");
1800                err = -ENOMEM;
1801        }
1802
1803        return err;
1804}
1805
1806static void __exit rnbd_client_exit(void)
1807{
1808        rnbd_destroy_sessions();
1809        unregister_blkdev(rnbd_client_major, "rnbd");
1810        ida_destroy(&index_ida);
1811        destroy_workqueue(rnbd_clt_wq);
1812}
1813
1814module_init(rnbd_client_init);
1815module_exit(rnbd_client_exit);
1816