linux/drivers/infiniband/ulp/rtrs/rtrs-clt.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3 * RDMA Transport Layer
   4 *
   5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
   6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
   7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
   8 */
   9
  10#undef pr_fmt
  11#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
  12
  13#include <linux/module.h>
  14#include <linux/rculist.h>
  15#include <linux/random.h>
  16
  17#include "rtrs-clt.h"
  18#include "rtrs-log.h"
  19
  20#define RTRS_CONNECT_TIMEOUT_MS 30000
  21/*
  22 * Wait a bit before trying to reconnect after a failure
  23 * in order to give server time to finish clean up which
  24 * leads to "false positives" failed reconnect attempts
  25 */
  26#define RTRS_RECONNECT_BACKOFF 1000
  27/*
  28 * Wait for additional random time between 0 and 8 seconds
  29 * before starting to reconnect to avoid clients reconnecting
  30 * all at once in case of a major network outage
  31 */
  32#define RTRS_RECONNECT_SEED 8
  33
  34#define FIRST_CONN 0x01
  35/* limit to 128 * 4k = 512k max IO */
  36#define RTRS_MAX_SEGMENTS          128
  37
  38MODULE_DESCRIPTION("RDMA Transport Client");
  39MODULE_LICENSE("GPL");
  40
  41static const struct rtrs_rdma_dev_pd_ops dev_pd_ops;
  42static struct rtrs_rdma_dev_pd dev_pd = {
  43        .ops = &dev_pd_ops
  44};
  45
  46static struct workqueue_struct *rtrs_wq;
  47static struct class *rtrs_clt_dev_class;
  48
  49static inline bool rtrs_clt_is_connected(const struct rtrs_clt *clt)
  50{
  51        struct rtrs_clt_sess *sess;
  52        bool connected = false;
  53
  54        rcu_read_lock();
  55        list_for_each_entry_rcu(sess, &clt->paths_list, s.entry)
  56                connected |= READ_ONCE(sess->state) == RTRS_CLT_CONNECTED;
  57        rcu_read_unlock();
  58
  59        return connected;
  60}
  61
  62static struct rtrs_permit *
  63__rtrs_get_permit(struct rtrs_clt *clt, enum rtrs_clt_con_type con_type)
  64{
  65        size_t max_depth = clt->queue_depth;
  66        struct rtrs_permit *permit;
  67        int bit;
  68
  69        /*
  70         * Adapted from null_blk get_tag(). Callers from different cpus may
  71         * grab the same bit, since find_first_zero_bit is not atomic.
  72         * But then the test_and_set_bit_lock will fail for all the
  73         * callers but one, so that they will loop again.
  74         * This way an explicit spinlock is not required.
  75         */
  76        do {
  77                bit = find_first_zero_bit(clt->permits_map, max_depth);
  78                if (unlikely(bit >= max_depth))
  79                        return NULL;
  80        } while (unlikely(test_and_set_bit_lock(bit, clt->permits_map)));
  81
  82        permit = get_permit(clt, bit);
  83        WARN_ON(permit->mem_id != bit);
  84        permit->cpu_id = raw_smp_processor_id();
  85        permit->con_type = con_type;
  86
  87        return permit;
  88}
  89
  90static inline void __rtrs_put_permit(struct rtrs_clt *clt,
  91                                      struct rtrs_permit *permit)
  92{
  93        clear_bit_unlock(permit->mem_id, clt->permits_map);
  94}
  95
  96/**
  97 * rtrs_clt_get_permit() - allocates permit for future RDMA operation
  98 * @clt:        Current session
  99 * @con_type:   Type of connection to use with the permit
 100 * @can_wait:   Wait type
 101 *
 102 * Description:
 103 *    Allocates permit for the following RDMA operation.  Permit is used
 104 *    to preallocate all resources and to propagate memory pressure
 105 *    up earlier.
 106 *
 107 * Context:
 108 *    Can sleep if @wait == RTRS_PERMIT_WAIT
 109 */
 110struct rtrs_permit *rtrs_clt_get_permit(struct rtrs_clt *clt,
 111                                          enum rtrs_clt_con_type con_type,
 112                                          enum wait_type can_wait)
 113{
 114        struct rtrs_permit *permit;
 115        DEFINE_WAIT(wait);
 116
 117        permit = __rtrs_get_permit(clt, con_type);
 118        if (likely(permit) || !can_wait)
 119                return permit;
 120
 121        do {
 122                prepare_to_wait(&clt->permits_wait, &wait,
 123                                TASK_UNINTERRUPTIBLE);
 124                permit = __rtrs_get_permit(clt, con_type);
 125                if (likely(permit))
 126                        break;
 127
 128                io_schedule();
 129        } while (1);
 130
 131        finish_wait(&clt->permits_wait, &wait);
 132
 133        return permit;
 134}
 135EXPORT_SYMBOL(rtrs_clt_get_permit);
 136
 137/**
 138 * rtrs_clt_put_permit() - puts allocated permit
 139 * @clt:        Current session
 140 * @permit:     Permit to be freed
 141 *
 142 * Context:
 143 *    Does not matter
 144 */
 145void rtrs_clt_put_permit(struct rtrs_clt *clt, struct rtrs_permit *permit)
 146{
 147        if (WARN_ON(!test_bit(permit->mem_id, clt->permits_map)))
 148                return;
 149
 150        __rtrs_put_permit(clt, permit);
 151
 152        /*
 153         * rtrs_clt_get_permit() adds itself to the &clt->permits_wait list
 154         * before calling schedule(). So if rtrs_clt_get_permit() is sleeping
 155         * it must have added itself to &clt->permits_wait before
 156         * __rtrs_put_permit() finished.
 157         * Hence it is safe to guard wake_up() with a waitqueue_active() test.
 158         */
 159        if (waitqueue_active(&clt->permits_wait))
 160                wake_up(&clt->permits_wait);
 161}
 162EXPORT_SYMBOL(rtrs_clt_put_permit);
 163
 164/**
 165 * rtrs_permit_to_clt_con() - returns RDMA connection pointer by the permit
 166 * @sess: client session pointer
 167 * @permit: permit for the allocation of the RDMA buffer
 168 * Note:
 169 *     IO connection starts from 1.
 170 *     0 connection is for user messages.
 171 */
 172static
 173struct rtrs_clt_con *rtrs_permit_to_clt_con(struct rtrs_clt_sess *sess,
 174                                            struct rtrs_permit *permit)
 175{
 176        int id = 0;
 177
 178        if (likely(permit->con_type == RTRS_IO_CON))
 179                id = (permit->cpu_id % (sess->s.irq_con_num - 1)) + 1;
 180
 181        return to_clt_con(sess->s.con[id]);
 182}
 183
 184/**
 185 * rtrs_clt_change_state() - change the session state through session state
 186 * machine.
 187 *
 188 * @sess: client session to change the state of.
 189 * @new_state: state to change to.
 190 *
 191 * returns true if sess's state is changed to new state, otherwise return false.
 192 *
 193 * Locks:
 194 * state_wq lock must be hold.
 195 */
 196static bool rtrs_clt_change_state(struct rtrs_clt_sess *sess,
 197                                     enum rtrs_clt_state new_state)
 198{
 199        enum rtrs_clt_state old_state;
 200        bool changed = false;
 201
 202        lockdep_assert_held(&sess->state_wq.lock);
 203
 204        old_state = sess->state;
 205        switch (new_state) {
 206        case RTRS_CLT_CONNECTING:
 207                switch (old_state) {
 208                case RTRS_CLT_RECONNECTING:
 209                        changed = true;
 210                        fallthrough;
 211                default:
 212                        break;
 213                }
 214                break;
 215        case RTRS_CLT_RECONNECTING:
 216                switch (old_state) {
 217                case RTRS_CLT_CONNECTED:
 218                case RTRS_CLT_CONNECTING_ERR:
 219                case RTRS_CLT_CLOSED:
 220                        changed = true;
 221                        fallthrough;
 222                default:
 223                        break;
 224                }
 225                break;
 226        case RTRS_CLT_CONNECTED:
 227                switch (old_state) {
 228                case RTRS_CLT_CONNECTING:
 229                        changed = true;
 230                        fallthrough;
 231                default:
 232                        break;
 233                }
 234                break;
 235        case RTRS_CLT_CONNECTING_ERR:
 236                switch (old_state) {
 237                case RTRS_CLT_CONNECTING:
 238                        changed = true;
 239                        fallthrough;
 240                default:
 241                        break;
 242                }
 243                break;
 244        case RTRS_CLT_CLOSING:
 245                switch (old_state) {
 246                case RTRS_CLT_CONNECTING:
 247                case RTRS_CLT_CONNECTING_ERR:
 248                case RTRS_CLT_RECONNECTING:
 249                case RTRS_CLT_CONNECTED:
 250                        changed = true;
 251                        fallthrough;
 252                default:
 253                        break;
 254                }
 255                break;
 256        case RTRS_CLT_CLOSED:
 257                switch (old_state) {
 258                case RTRS_CLT_CLOSING:
 259                        changed = true;
 260                        fallthrough;
 261                default:
 262                        break;
 263                }
 264                break;
 265        case RTRS_CLT_DEAD:
 266                switch (old_state) {
 267                case RTRS_CLT_CLOSED:
 268                        changed = true;
 269                        fallthrough;
 270                default:
 271                        break;
 272                }
 273                break;
 274        default:
 275                break;
 276        }
 277        if (changed) {
 278                sess->state = new_state;
 279                wake_up_locked(&sess->state_wq);
 280        }
 281
 282        return changed;
 283}
 284
 285static bool rtrs_clt_change_state_from_to(struct rtrs_clt_sess *sess,
 286                                           enum rtrs_clt_state old_state,
 287                                           enum rtrs_clt_state new_state)
 288{
 289        bool changed = false;
 290
 291        spin_lock_irq(&sess->state_wq.lock);
 292        if (sess->state == old_state)
 293                changed = rtrs_clt_change_state(sess, new_state);
 294        spin_unlock_irq(&sess->state_wq.lock);
 295
 296        return changed;
 297}
 298
 299static void rtrs_rdma_error_recovery(struct rtrs_clt_con *con)
 300{
 301        struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
 302
 303        if (rtrs_clt_change_state_from_to(sess,
 304                                           RTRS_CLT_CONNECTED,
 305                                           RTRS_CLT_RECONNECTING)) {
 306                struct rtrs_clt *clt = sess->clt;
 307                unsigned int delay_ms;
 308
 309                /*
 310                 * Normal scenario, reconnect if we were successfully connected
 311                 */
 312                delay_ms = clt->reconnect_delay_sec * 1000;
 313                queue_delayed_work(rtrs_wq, &sess->reconnect_dwork,
 314                                   msecs_to_jiffies(delay_ms +
 315                                                    prandom_u32() % RTRS_RECONNECT_SEED));
 316        } else {
 317                /*
 318                 * Error can happen just on establishing new connection,
 319                 * so notify waiter with error state, waiter is responsible
 320                 * for cleaning the rest and reconnect if needed.
 321                 */
 322                rtrs_clt_change_state_from_to(sess,
 323                                               RTRS_CLT_CONNECTING,
 324                                               RTRS_CLT_CONNECTING_ERR);
 325        }
 326}
 327
 328static void rtrs_clt_fast_reg_done(struct ib_cq *cq, struct ib_wc *wc)
 329{
 330        struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
 331
 332        if (unlikely(wc->status != IB_WC_SUCCESS)) {
 333                rtrs_err(con->c.sess, "Failed IB_WR_REG_MR: %s\n",
 334                          ib_wc_status_msg(wc->status));
 335                rtrs_rdma_error_recovery(con);
 336        }
 337}
 338
 339static struct ib_cqe fast_reg_cqe = {
 340        .done = rtrs_clt_fast_reg_done
 341};
 342
 343static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno,
 344                              bool notify, bool can_wait);
 345
 346static void rtrs_clt_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc)
 347{
 348        struct rtrs_clt_io_req *req =
 349                container_of(wc->wr_cqe, typeof(*req), inv_cqe);
 350        struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
 351
 352        if (unlikely(wc->status != IB_WC_SUCCESS)) {
 353                rtrs_err(con->c.sess, "Failed IB_WR_LOCAL_INV: %s\n",
 354                          ib_wc_status_msg(wc->status));
 355                rtrs_rdma_error_recovery(con);
 356        }
 357        req->need_inv = false;
 358        if (likely(req->need_inv_comp))
 359                complete(&req->inv_comp);
 360        else
 361                /* Complete request from INV callback */
 362                complete_rdma_req(req, req->inv_errno, true, false);
 363}
 364
 365static int rtrs_inv_rkey(struct rtrs_clt_io_req *req)
 366{
 367        struct rtrs_clt_con *con = req->con;
 368        struct ib_send_wr wr = {
 369                .opcode             = IB_WR_LOCAL_INV,
 370                .wr_cqe             = &req->inv_cqe,
 371                .send_flags         = IB_SEND_SIGNALED,
 372                .ex.invalidate_rkey = req->mr->rkey,
 373        };
 374        req->inv_cqe.done = rtrs_clt_inv_rkey_done;
 375
 376        return ib_post_send(con->c.qp, &wr, NULL);
 377}
 378
 379static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno,
 380                              bool notify, bool can_wait)
 381{
 382        struct rtrs_clt_con *con = req->con;
 383        struct rtrs_clt_sess *sess;
 384        int err;
 385
 386        if (WARN_ON(!req->in_use))
 387                return;
 388        if (WARN_ON(!req->con))
 389                return;
 390        sess = to_clt_sess(con->c.sess);
 391
 392        if (req->sg_cnt) {
 393                if (unlikely(req->dir == DMA_FROM_DEVICE && req->need_inv)) {
 394                        /*
 395                         * We are here to invalidate read requests
 396                         * ourselves.  In normal scenario server should
 397                         * send INV for all read requests, but
 398                         * we are here, thus two things could happen:
 399                         *
 400                         *    1.  this is failover, when errno != 0
 401                         *        and can_wait == 1,
 402                         *
 403                         *    2.  something totally bad happened and
 404                         *        server forgot to send INV, so we
 405                         *        should do that ourselves.
 406                         */
 407
 408                        if (likely(can_wait)) {
 409                                req->need_inv_comp = true;
 410                        } else {
 411                                /* This should be IO path, so always notify */
 412                                WARN_ON(!notify);
 413                                /* Save errno for INV callback */
 414                                req->inv_errno = errno;
 415                        }
 416
 417                        refcount_inc(&req->ref);
 418                        err = rtrs_inv_rkey(req);
 419                        if (unlikely(err)) {
 420                                rtrs_err(con->c.sess, "Send INV WR key=%#x: %d\n",
 421                                          req->mr->rkey, err);
 422                        } else if (likely(can_wait)) {
 423                                wait_for_completion(&req->inv_comp);
 424                        } else {
 425                                /*
 426                                 * Something went wrong, so request will be
 427                                 * completed from INV callback.
 428                                 */
 429                                WARN_ON_ONCE(1);
 430
 431                                return;
 432                        }
 433                        if (!refcount_dec_and_test(&req->ref))
 434                                return;
 435                }
 436                ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist,
 437                                req->sg_cnt, req->dir);
 438        }
 439        if (!refcount_dec_and_test(&req->ref))
 440                return;
 441        if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT)
 442                atomic_dec(&sess->stats->inflight);
 443
 444        req->in_use = false;
 445        req->con = NULL;
 446
 447        if (errno) {
 448                rtrs_err_rl(con->c.sess, "IO request failed: error=%d path=%s [%s:%u] notify=%d\n",
 449                            errno, kobject_name(&sess->kobj), sess->hca_name,
 450                            sess->hca_port, notify);
 451        }
 452
 453        if (notify)
 454                req->conf(req->priv, errno);
 455}
 456
 457static int rtrs_post_send_rdma(struct rtrs_clt_con *con,
 458                                struct rtrs_clt_io_req *req,
 459                                struct rtrs_rbuf *rbuf, u32 off,
 460                                u32 imm, struct ib_send_wr *wr)
 461{
 462        struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
 463        enum ib_send_flags flags;
 464        struct ib_sge sge;
 465
 466        if (unlikely(!req->sg_size)) {
 467                rtrs_wrn(con->c.sess,
 468                         "Doing RDMA Write failed, no data supplied\n");
 469                return -EINVAL;
 470        }
 471
 472        /* user data and user message in the first list element */
 473        sge.addr   = req->iu->dma_addr;
 474        sge.length = req->sg_size;
 475        sge.lkey   = sess->s.dev->ib_pd->local_dma_lkey;
 476
 477        /*
 478         * From time to time we have to post signalled sends,
 479         * or send queue will fill up and only QP reset can help.
 480         */
 481        flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ?
 482                        0 : IB_SEND_SIGNALED;
 483
 484        ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr,
 485                                      req->sg_size, DMA_TO_DEVICE);
 486
 487        return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, &sge, 1,
 488                                            rbuf->rkey, rbuf->addr + off,
 489                                            imm, flags, wr, NULL);
 490}
 491
 492static void process_io_rsp(struct rtrs_clt_sess *sess, u32 msg_id,
 493                           s16 errno, bool w_inval)
 494{
 495        struct rtrs_clt_io_req *req;
 496
 497        if (WARN_ON(msg_id >= sess->queue_depth))
 498                return;
 499
 500        req = &sess->reqs[msg_id];
 501        /* Drop need_inv if server responded with send with invalidation */
 502        req->need_inv &= !w_inval;
 503        complete_rdma_req(req, errno, true, false);
 504}
 505
 506static void rtrs_clt_recv_done(struct rtrs_clt_con *con, struct ib_wc *wc)
 507{
 508        struct rtrs_iu *iu;
 509        int err;
 510        struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
 511
 512        WARN_ON((sess->flags & RTRS_MSG_NEW_RKEY_F) == 0);
 513        iu = container_of(wc->wr_cqe, struct rtrs_iu,
 514                          cqe);
 515        err = rtrs_iu_post_recv(&con->c, iu);
 516        if (unlikely(err)) {
 517                rtrs_err(con->c.sess, "post iu failed %d\n", err);
 518                rtrs_rdma_error_recovery(con);
 519        }
 520}
 521
 522static void rtrs_clt_rkey_rsp_done(struct rtrs_clt_con *con, struct ib_wc *wc)
 523{
 524        struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
 525        struct rtrs_msg_rkey_rsp *msg;
 526        u32 imm_type, imm_payload;
 527        bool w_inval = false;
 528        struct rtrs_iu *iu;
 529        u32 buf_id;
 530        int err;
 531
 532        WARN_ON((sess->flags & RTRS_MSG_NEW_RKEY_F) == 0);
 533
 534        iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
 535
 536        if (unlikely(wc->byte_len < sizeof(*msg))) {
 537                rtrs_err(con->c.sess, "rkey response is malformed: size %d\n",
 538                          wc->byte_len);
 539                goto out;
 540        }
 541        ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr,
 542                                   iu->size, DMA_FROM_DEVICE);
 543        msg = iu->buf;
 544        if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_RKEY_RSP)) {
 545                rtrs_err(sess->clt, "rkey response is malformed: type %d\n",
 546                          le16_to_cpu(msg->type));
 547                goto out;
 548        }
 549        buf_id = le16_to_cpu(msg->buf_id);
 550        if (WARN_ON(buf_id >= sess->queue_depth))
 551                goto out;
 552
 553        rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), &imm_type, &imm_payload);
 554        if (likely(imm_type == RTRS_IO_RSP_IMM ||
 555                   imm_type == RTRS_IO_RSP_W_INV_IMM)) {
 556                u32 msg_id;
 557
 558                w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM);
 559                rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err);
 560
 561                if (WARN_ON(buf_id != msg_id))
 562                        goto out;
 563                sess->rbufs[buf_id].rkey = le32_to_cpu(msg->rkey);
 564                process_io_rsp(sess, msg_id, err, w_inval);
 565        }
 566        ib_dma_sync_single_for_device(sess->s.dev->ib_dev, iu->dma_addr,
 567                                      iu->size, DMA_FROM_DEVICE);
 568        return rtrs_clt_recv_done(con, wc);
 569out:
 570        rtrs_rdma_error_recovery(con);
 571}
 572
 573static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc);
 574
 575static struct ib_cqe io_comp_cqe = {
 576        .done = rtrs_clt_rdma_done
 577};
 578
 579/*
 580 * Post x2 empty WRs: first is for this RDMA with IMM,
 581 * second is for RECV with INV, which happened earlier.
 582 */
 583static int rtrs_post_recv_empty_x2(struct rtrs_con *con, struct ib_cqe *cqe)
 584{
 585        struct ib_recv_wr wr_arr[2], *wr;
 586        int i;
 587
 588        memset(wr_arr, 0, sizeof(wr_arr));
 589        for (i = 0; i < ARRAY_SIZE(wr_arr); i++) {
 590                wr = &wr_arr[i];
 591                wr->wr_cqe  = cqe;
 592                if (i)
 593                        /* Chain backwards */
 594                        wr->next = &wr_arr[i - 1];
 595        }
 596
 597        return ib_post_recv(con->qp, wr, NULL);
 598}
 599
 600static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc)
 601{
 602        struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
 603        struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
 604        u32 imm_type, imm_payload;
 605        bool w_inval = false;
 606        int err;
 607
 608        if (unlikely(wc->status != IB_WC_SUCCESS)) {
 609                if (wc->status != IB_WC_WR_FLUSH_ERR) {
 610                        rtrs_err(sess->clt, "RDMA failed: %s\n",
 611                                  ib_wc_status_msg(wc->status));
 612                        rtrs_rdma_error_recovery(con);
 613                }
 614                return;
 615        }
 616        rtrs_clt_update_wc_stats(con);
 617
 618        switch (wc->opcode) {
 619        case IB_WC_RECV_RDMA_WITH_IMM:
 620                /*
 621                 * post_recv() RDMA write completions of IO reqs (read/write)
 622                 * and hb
 623                 */
 624                if (WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done))
 625                        return;
 626                rtrs_from_imm(be32_to_cpu(wc->ex.imm_data),
 627                               &imm_type, &imm_payload);
 628                if (likely(imm_type == RTRS_IO_RSP_IMM ||
 629                           imm_type == RTRS_IO_RSP_W_INV_IMM)) {
 630                        u32 msg_id;
 631
 632                        w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM);
 633                        rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err);
 634
 635                        process_io_rsp(sess, msg_id, err, w_inval);
 636                } else if (imm_type == RTRS_HB_MSG_IMM) {
 637                        WARN_ON(con->c.cid);
 638                        rtrs_send_hb_ack(&sess->s);
 639                        if (sess->flags & RTRS_MSG_NEW_RKEY_F)
 640                                return  rtrs_clt_recv_done(con, wc);
 641                } else if (imm_type == RTRS_HB_ACK_IMM) {
 642                        WARN_ON(con->c.cid);
 643                        sess->s.hb_missed_cnt = 0;
 644                        sess->s.hb_cur_latency =
 645                                ktime_sub(ktime_get(), sess->s.hb_last_sent);
 646                        if (sess->flags & RTRS_MSG_NEW_RKEY_F)
 647                                return  rtrs_clt_recv_done(con, wc);
 648                } else {
 649                        rtrs_wrn(con->c.sess, "Unknown IMM type %u\n",
 650                                  imm_type);
 651                }
 652                if (w_inval)
 653                        /*
 654                         * Post x2 empty WRs: first is for this RDMA with IMM,
 655                         * second is for RECV with INV, which happened earlier.
 656                         */
 657                        err = rtrs_post_recv_empty_x2(&con->c, &io_comp_cqe);
 658                else
 659                        err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
 660                if (unlikely(err)) {
 661                        rtrs_err(con->c.sess, "rtrs_post_recv_empty(): %d\n",
 662                                  err);
 663                        rtrs_rdma_error_recovery(con);
 664                }
 665                break;
 666        case IB_WC_RECV:
 667                /*
 668                 * Key invalidations from server side
 669                 */
 670                WARN_ON(!(wc->wc_flags & IB_WC_WITH_INVALIDATE ||
 671                          wc->wc_flags & IB_WC_WITH_IMM));
 672                WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done);
 673                if (sess->flags & RTRS_MSG_NEW_RKEY_F) {
 674                        if (wc->wc_flags & IB_WC_WITH_INVALIDATE)
 675                                return  rtrs_clt_recv_done(con, wc);
 676
 677                        return  rtrs_clt_rkey_rsp_done(con, wc);
 678                }
 679                break;
 680        case IB_WC_RDMA_WRITE:
 681                /*
 682                 * post_send() RDMA write completions of IO reqs (read/write)
 683                 */
 684                break;
 685
 686        default:
 687                rtrs_wrn(sess->clt, "Unexpected WC type: %d\n", wc->opcode);
 688                return;
 689        }
 690}
 691
 692static int post_recv_io(struct rtrs_clt_con *con, size_t q_size)
 693{
 694        int err, i;
 695        struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
 696
 697        for (i = 0; i < q_size; i++) {
 698                if (sess->flags & RTRS_MSG_NEW_RKEY_F) {
 699                        struct rtrs_iu *iu = &con->rsp_ius[i];
 700
 701                        err = rtrs_iu_post_recv(&con->c, iu);
 702                } else {
 703                        err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
 704                }
 705                if (unlikely(err))
 706                        return err;
 707        }
 708
 709        return 0;
 710}
 711
 712static int post_recv_sess(struct rtrs_clt_sess *sess)
 713{
 714        size_t q_size = 0;
 715        int err, cid;
 716
 717        for (cid = 0; cid < sess->s.con_num; cid++) {
 718                if (cid == 0)
 719                        q_size = SERVICE_CON_QUEUE_DEPTH;
 720                else
 721                        q_size = sess->queue_depth;
 722
 723                /*
 724                 * x2 for RDMA read responses + FR key invalidations,
 725                 * RDMA writes do not require any FR registrations.
 726                 */
 727                q_size *= 2;
 728
 729                err = post_recv_io(to_clt_con(sess->s.con[cid]), q_size);
 730                if (unlikely(err)) {
 731                        rtrs_err(sess->clt, "post_recv_io(), err: %d\n", err);
 732                        return err;
 733                }
 734        }
 735
 736        return 0;
 737}
 738
 739struct path_it {
 740        int i;
 741        struct list_head skip_list;
 742        struct rtrs_clt *clt;
 743        struct rtrs_clt_sess *(*next_path)(struct path_it *it);
 744};
 745
 746/**
 747 * list_next_or_null_rr_rcu - get next list element in round-robin fashion.
 748 * @head:       the head for the list.
 749 * @ptr:        the list head to take the next element from.
 750 * @type:       the type of the struct this is embedded in.
 751 * @memb:       the name of the list_head within the struct.
 752 *
 753 * Next element returned in round-robin fashion, i.e. head will be skipped,
 754 * but if list is observed as empty, NULL will be returned.
 755 *
 756 * This primitive may safely run concurrently with the _rcu list-mutation
 757 * primitives such as list_add_rcu() as long as it's guarded by rcu_read_lock().
 758 */
 759#define list_next_or_null_rr_rcu(head, ptr, type, memb) \
 760({ \
 761        list_next_or_null_rcu(head, ptr, type, memb) ?: \
 762                list_next_or_null_rcu(head, READ_ONCE((ptr)->next), \
 763                                      type, memb); \
 764})
 765
 766/**
 767 * get_next_path_rr() - Returns path in round-robin fashion.
 768 * @it: the path pointer
 769 *
 770 * Related to @MP_POLICY_RR
 771 *
 772 * Locks:
 773 *    rcu_read_lock() must be hold.
 774 */
 775static struct rtrs_clt_sess *get_next_path_rr(struct path_it *it)
 776{
 777        struct rtrs_clt_sess __rcu **ppcpu_path;
 778        struct rtrs_clt_sess *path;
 779        struct rtrs_clt *clt;
 780
 781        clt = it->clt;
 782
 783        /*
 784         * Here we use two RCU objects: @paths_list and @pcpu_path
 785         * pointer.  See rtrs_clt_remove_path_from_arr() for details
 786         * how that is handled.
 787         */
 788
 789        ppcpu_path = this_cpu_ptr(clt->pcpu_path);
 790        path = rcu_dereference(*ppcpu_path);
 791        if (unlikely(!path))
 792                path = list_first_or_null_rcu(&clt->paths_list,
 793                                              typeof(*path), s.entry);
 794        else
 795                path = list_next_or_null_rr_rcu(&clt->paths_list,
 796                                                &path->s.entry,
 797                                                typeof(*path),
 798                                                s.entry);
 799        rcu_assign_pointer(*ppcpu_path, path);
 800
 801        return path;
 802}
 803
 804/**
 805 * get_next_path_min_inflight() - Returns path with minimal inflight count.
 806 * @it: the path pointer
 807 *
 808 * Related to @MP_POLICY_MIN_INFLIGHT
 809 *
 810 * Locks:
 811 *    rcu_read_lock() must be hold.
 812 */
 813static struct rtrs_clt_sess *get_next_path_min_inflight(struct path_it *it)
 814{
 815        struct rtrs_clt_sess *min_path = NULL;
 816        struct rtrs_clt *clt = it->clt;
 817        struct rtrs_clt_sess *sess;
 818        int min_inflight = INT_MAX;
 819        int inflight;
 820
 821        list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) {
 822                if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED))
 823                        continue;
 824
 825                if (unlikely(!list_empty(raw_cpu_ptr(sess->mp_skip_entry))))
 826                        continue;
 827
 828                inflight = atomic_read(&sess->stats->inflight);
 829
 830                if (inflight < min_inflight) {
 831                        min_inflight = inflight;
 832                        min_path = sess;
 833                }
 834        }
 835
 836        /*
 837         * add the path to the skip list, so that next time we can get
 838         * a different one
 839         */
 840        if (min_path)
 841                list_add(raw_cpu_ptr(min_path->mp_skip_entry), &it->skip_list);
 842
 843        return min_path;
 844}
 845
 846/**
 847 * get_next_path_min_latency() - Returns path with minimal latency.
 848 * @it: the path pointer
 849 *
 850 * Return: a path with the lowest latency or NULL if all paths are tried
 851 *
 852 * Locks:
 853 *    rcu_read_lock() must be hold.
 854 *
 855 * Related to @MP_POLICY_MIN_LATENCY
 856 *
 857 * This DOES skip an already-tried path.
 858 * There is a skip-list to skip a path if the path has tried but failed.
 859 * It will try the minimum latency path and then the second minimum latency
 860 * path and so on. Finally it will return NULL if all paths are tried.
 861 * Therefore the caller MUST check the returned
 862 * path is NULL and trigger the IO error.
 863 */
 864static struct rtrs_clt_sess *get_next_path_min_latency(struct path_it *it)
 865{
 866        struct rtrs_clt_sess *min_path = NULL;
 867        struct rtrs_clt *clt = it->clt;
 868        struct rtrs_clt_sess *sess;
 869        ktime_t min_latency = INT_MAX;
 870        ktime_t latency;
 871
 872        list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) {
 873                if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED))
 874                        continue;
 875
 876                if (unlikely(!list_empty(raw_cpu_ptr(sess->mp_skip_entry))))
 877                        continue;
 878
 879                latency = sess->s.hb_cur_latency;
 880
 881                if (latency < min_latency) {
 882                        min_latency = latency;
 883                        min_path = sess;
 884                }
 885        }
 886
 887        /*
 888         * add the path to the skip list, so that next time we can get
 889         * a different one
 890         */
 891        if (min_path)
 892                list_add(raw_cpu_ptr(min_path->mp_skip_entry), &it->skip_list);
 893
 894        return min_path;
 895}
 896
 897static inline void path_it_init(struct path_it *it, struct rtrs_clt *clt)
 898{
 899        INIT_LIST_HEAD(&it->skip_list);
 900        it->clt = clt;
 901        it->i = 0;
 902
 903        if (clt->mp_policy == MP_POLICY_RR)
 904                it->next_path = get_next_path_rr;
 905        else if (clt->mp_policy == MP_POLICY_MIN_INFLIGHT)
 906                it->next_path = get_next_path_min_inflight;
 907        else
 908                it->next_path = get_next_path_min_latency;
 909}
 910
 911static inline void path_it_deinit(struct path_it *it)
 912{
 913        struct list_head *skip, *tmp;
 914        /*
 915         * The skip_list is used only for the MIN_INFLIGHT policy.
 916         * We need to remove paths from it, so that next IO can insert
 917         * paths (->mp_skip_entry) into a skip_list again.
 918         */
 919        list_for_each_safe(skip, tmp, &it->skip_list)
 920                list_del_init(skip);
 921}
 922
 923/**
 924 * rtrs_clt_init_req() - Initialize an rtrs_clt_io_req holding information
 925 * about an inflight IO.
 926 * The user buffer holding user control message (not data) is copied into
 927 * the corresponding buffer of rtrs_iu (req->iu->buf), which later on will
 928 * also hold the control message of rtrs.
 929 * @req: an io request holding information about IO.
 930 * @sess: client session
 931 * @conf: conformation callback function to notify upper layer.
 932 * @permit: permit for allocation of RDMA remote buffer
 933 * @priv: private pointer
 934 * @vec: kernel vector containing control message
 935 * @usr_len: length of the user message
 936 * @sg: scater list for IO data
 937 * @sg_cnt: number of scater list entries
 938 * @data_len: length of the IO data
 939 * @dir: direction of the IO.
 940 */
 941static void rtrs_clt_init_req(struct rtrs_clt_io_req *req,
 942                              struct rtrs_clt_sess *sess,
 943                              void (*conf)(void *priv, int errno),
 944                              struct rtrs_permit *permit, void *priv,
 945                              const struct kvec *vec, size_t usr_len,
 946                              struct scatterlist *sg, size_t sg_cnt,
 947                              size_t data_len, int dir)
 948{
 949        struct iov_iter iter;
 950        size_t len;
 951
 952        req->permit = permit;
 953        req->in_use = true;
 954        req->usr_len = usr_len;
 955        req->data_len = data_len;
 956        req->sglist = sg;
 957        req->sg_cnt = sg_cnt;
 958        req->priv = priv;
 959        req->dir = dir;
 960        req->con = rtrs_permit_to_clt_con(sess, permit);
 961        req->conf = conf;
 962        req->need_inv = false;
 963        req->need_inv_comp = false;
 964        req->inv_errno = 0;
 965        refcount_set(&req->ref, 1);
 966
 967        iov_iter_kvec(&iter, READ, vec, 1, usr_len);
 968        len = _copy_from_iter(req->iu->buf, usr_len, &iter);
 969        WARN_ON(len != usr_len);
 970
 971        reinit_completion(&req->inv_comp);
 972}
 973
 974static struct rtrs_clt_io_req *
 975rtrs_clt_get_req(struct rtrs_clt_sess *sess,
 976                 void (*conf)(void *priv, int errno),
 977                 struct rtrs_permit *permit, void *priv,
 978                 const struct kvec *vec, size_t usr_len,
 979                 struct scatterlist *sg, size_t sg_cnt,
 980                 size_t data_len, int dir)
 981{
 982        struct rtrs_clt_io_req *req;
 983
 984        req = &sess->reqs[permit->mem_id];
 985        rtrs_clt_init_req(req, sess, conf, permit, priv, vec, usr_len,
 986                           sg, sg_cnt, data_len, dir);
 987        return req;
 988}
 989
 990static struct rtrs_clt_io_req *
 991rtrs_clt_get_copy_req(struct rtrs_clt_sess *alive_sess,
 992                       struct rtrs_clt_io_req *fail_req)
 993{
 994        struct rtrs_clt_io_req *req;
 995        struct kvec vec = {
 996                .iov_base = fail_req->iu->buf,
 997                .iov_len  = fail_req->usr_len
 998        };
 999
1000        req = &alive_sess->reqs[fail_req->permit->mem_id];
1001        rtrs_clt_init_req(req, alive_sess, fail_req->conf, fail_req->permit,
1002                           fail_req->priv, &vec, fail_req->usr_len,
1003                           fail_req->sglist, fail_req->sg_cnt,
1004                           fail_req->data_len, fail_req->dir);
1005        return req;
1006}
1007
1008static int rtrs_post_rdma_write_sg(struct rtrs_clt_con *con,
1009                                   struct rtrs_clt_io_req *req,
1010                                   struct rtrs_rbuf *rbuf, bool fr_en,
1011                                   u32 size, u32 imm, struct ib_send_wr *wr,
1012                                   struct ib_send_wr *tail)
1013{
1014        struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1015        struct ib_sge *sge = req->sge;
1016        enum ib_send_flags flags;
1017        struct scatterlist *sg;
1018        size_t num_sge;
1019        int i;
1020        struct ib_send_wr *ptail = NULL;
1021
1022        if (fr_en) {
1023                i = 0;
1024                sge[i].addr   = req->mr->iova;
1025                sge[i].length = req->mr->length;
1026                sge[i].lkey   = req->mr->lkey;
1027                i++;
1028                num_sge = 2;
1029                ptail = tail;
1030        } else {
1031                for_each_sg(req->sglist, sg, req->sg_cnt, i) {
1032                        sge[i].addr   = sg_dma_address(sg);
1033                        sge[i].length = sg_dma_len(sg);
1034                        sge[i].lkey   = sess->s.dev->ib_pd->local_dma_lkey;
1035                }
1036                num_sge = 1 + req->sg_cnt;
1037        }
1038        sge[i].addr   = req->iu->dma_addr;
1039        sge[i].length = size;
1040        sge[i].lkey   = sess->s.dev->ib_pd->local_dma_lkey;
1041
1042        /*
1043         * From time to time we have to post signalled sends,
1044         * or send queue will fill up and only QP reset can help.
1045         */
1046        flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ?
1047                        0 : IB_SEND_SIGNALED;
1048
1049        ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr,
1050                                      size, DMA_TO_DEVICE);
1051
1052        return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, sge, num_sge,
1053                                            rbuf->rkey, rbuf->addr, imm,
1054                                            flags, wr, ptail);
1055}
1056
1057static int rtrs_map_sg_fr(struct rtrs_clt_io_req *req, size_t count)
1058{
1059        int nr;
1060
1061        /* Align the MR to a 4K page size to match the block virt boundary */
1062        nr = ib_map_mr_sg(req->mr, req->sglist, count, NULL, SZ_4K);
1063        if (nr < 0)
1064                return nr;
1065        if (unlikely(nr < req->sg_cnt))
1066                return -EINVAL;
1067        ib_update_fast_reg_key(req->mr, ib_inc_rkey(req->mr->rkey));
1068
1069        return nr;
1070}
1071
1072static int rtrs_clt_write_req(struct rtrs_clt_io_req *req)
1073{
1074        struct rtrs_clt_con *con = req->con;
1075        struct rtrs_sess *s = con->c.sess;
1076        struct rtrs_clt_sess *sess = to_clt_sess(s);
1077        struct rtrs_msg_rdma_write *msg;
1078
1079        struct rtrs_rbuf *rbuf;
1080        int ret, count = 0;
1081        u32 imm, buf_id;
1082        struct ib_reg_wr rwr;
1083        struct ib_send_wr inv_wr;
1084        struct ib_send_wr *wr = NULL;
1085        bool fr_en = false;
1086
1087        const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len;
1088
1089        if (unlikely(tsize > sess->chunk_size)) {
1090                rtrs_wrn(s, "Write request failed, size too big %zu > %d\n",
1091                          tsize, sess->chunk_size);
1092                return -EMSGSIZE;
1093        }
1094        if (req->sg_cnt) {
1095                count = ib_dma_map_sg(sess->s.dev->ib_dev, req->sglist,
1096                                      req->sg_cnt, req->dir);
1097                if (unlikely(!count)) {
1098                        rtrs_wrn(s, "Write request failed, map failed\n");
1099                        return -EINVAL;
1100                }
1101        }
1102        /* put rtrs msg after sg and user message */
1103        msg = req->iu->buf + req->usr_len;
1104        msg->type = cpu_to_le16(RTRS_MSG_WRITE);
1105        msg->usr_len = cpu_to_le16(req->usr_len);
1106
1107        /* rtrs message on server side will be after user data and message */
1108        imm = req->permit->mem_off + req->data_len + req->usr_len;
1109        imm = rtrs_to_io_req_imm(imm);
1110        buf_id = req->permit->mem_id;
1111        req->sg_size = tsize;
1112        rbuf = &sess->rbufs[buf_id];
1113
1114        if (count) {
1115                ret = rtrs_map_sg_fr(req, count);
1116                if (ret < 0) {
1117                        rtrs_err_rl(s,
1118                                    "Write request failed, failed to map fast reg. data, err: %d\n",
1119                                    ret);
1120                        ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist,
1121                                        req->sg_cnt, req->dir);
1122                        return ret;
1123                }
1124                inv_wr = (struct ib_send_wr) {
1125                        .opcode             = IB_WR_LOCAL_INV,
1126                        .wr_cqe             = &req->inv_cqe,
1127                        .send_flags         = IB_SEND_SIGNALED,
1128                        .ex.invalidate_rkey = req->mr->rkey,
1129                };
1130                req->inv_cqe.done = rtrs_clt_inv_rkey_done;
1131                rwr = (struct ib_reg_wr) {
1132                        .wr.opcode = IB_WR_REG_MR,
1133                        .wr.wr_cqe = &fast_reg_cqe,
1134                        .mr = req->mr,
1135                        .key = req->mr->rkey,
1136                        .access = (IB_ACCESS_LOCAL_WRITE),
1137                };
1138                wr = &rwr.wr;
1139                fr_en = true;
1140                refcount_inc(&req->ref);
1141        }
1142        /*
1143         * Update stats now, after request is successfully sent it is not
1144         * safe anymore to touch it.
1145         */
1146        rtrs_clt_update_all_stats(req, WRITE);
1147
1148        ret = rtrs_post_rdma_write_sg(req->con, req, rbuf, fr_en,
1149                                      req->usr_len + sizeof(*msg),
1150                                      imm, wr, &inv_wr);
1151        if (unlikely(ret)) {
1152                rtrs_err_rl(s,
1153                            "Write request failed: error=%d path=%s [%s:%u]\n",
1154                            ret, kobject_name(&sess->kobj), sess->hca_name,
1155                            sess->hca_port);
1156                if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT)
1157                        atomic_dec(&sess->stats->inflight);
1158                if (req->sg_cnt)
1159                        ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist,
1160                                        req->sg_cnt, req->dir);
1161        }
1162
1163        return ret;
1164}
1165
1166static int rtrs_clt_read_req(struct rtrs_clt_io_req *req)
1167{
1168        struct rtrs_clt_con *con = req->con;
1169        struct rtrs_sess *s = con->c.sess;
1170        struct rtrs_clt_sess *sess = to_clt_sess(s);
1171        struct rtrs_msg_rdma_read *msg;
1172        struct rtrs_ib_dev *dev = sess->s.dev;
1173
1174        struct ib_reg_wr rwr;
1175        struct ib_send_wr *wr = NULL;
1176
1177        int ret, count = 0;
1178        u32 imm, buf_id;
1179
1180        const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len;
1181
1182        if (unlikely(tsize > sess->chunk_size)) {
1183                rtrs_wrn(s,
1184                          "Read request failed, message size is %zu, bigger than CHUNK_SIZE %d\n",
1185                          tsize, sess->chunk_size);
1186                return -EMSGSIZE;
1187        }
1188
1189        if (req->sg_cnt) {
1190                count = ib_dma_map_sg(dev->ib_dev, req->sglist, req->sg_cnt,
1191                                      req->dir);
1192                if (unlikely(!count)) {
1193                        rtrs_wrn(s,
1194                                  "Read request failed, dma map failed\n");
1195                        return -EINVAL;
1196                }
1197        }
1198        /* put our message into req->buf after user message*/
1199        msg = req->iu->buf + req->usr_len;
1200        msg->type = cpu_to_le16(RTRS_MSG_READ);
1201        msg->usr_len = cpu_to_le16(req->usr_len);
1202
1203        if (count) {
1204                ret = rtrs_map_sg_fr(req, count);
1205                if (ret < 0) {
1206                        rtrs_err_rl(s,
1207                                     "Read request failed, failed to map  fast reg. data, err: %d\n",
1208                                     ret);
1209                        ib_dma_unmap_sg(dev->ib_dev, req->sglist, req->sg_cnt,
1210                                        req->dir);
1211                        return ret;
1212                }
1213                rwr = (struct ib_reg_wr) {
1214                        .wr.opcode = IB_WR_REG_MR,
1215                        .wr.wr_cqe = &fast_reg_cqe,
1216                        .mr = req->mr,
1217                        .key = req->mr->rkey,
1218                        .access = (IB_ACCESS_LOCAL_WRITE |
1219                                   IB_ACCESS_REMOTE_WRITE),
1220                };
1221                wr = &rwr.wr;
1222
1223                msg->sg_cnt = cpu_to_le16(1);
1224                msg->flags = cpu_to_le16(RTRS_MSG_NEED_INVAL_F);
1225
1226                msg->desc[0].addr = cpu_to_le64(req->mr->iova);
1227                msg->desc[0].key = cpu_to_le32(req->mr->rkey);
1228                msg->desc[0].len = cpu_to_le32(req->mr->length);
1229
1230                /* Further invalidation is required */
1231                req->need_inv = !!RTRS_MSG_NEED_INVAL_F;
1232
1233        } else {
1234                msg->sg_cnt = 0;
1235                msg->flags = 0;
1236        }
1237        /*
1238         * rtrs message will be after the space reserved for disk data and
1239         * user message
1240         */
1241        imm = req->permit->mem_off + req->data_len + req->usr_len;
1242        imm = rtrs_to_io_req_imm(imm);
1243        buf_id = req->permit->mem_id;
1244
1245        req->sg_size  = sizeof(*msg);
1246        req->sg_size += le16_to_cpu(msg->sg_cnt) * sizeof(struct rtrs_sg_desc);
1247        req->sg_size += req->usr_len;
1248
1249        /*
1250         * Update stats now, after request is successfully sent it is not
1251         * safe anymore to touch it.
1252         */
1253        rtrs_clt_update_all_stats(req, READ);
1254
1255        ret = rtrs_post_send_rdma(req->con, req, &sess->rbufs[buf_id],
1256                                   req->data_len, imm, wr);
1257        if (unlikely(ret)) {
1258                rtrs_err_rl(s,
1259                            "Read request failed: error=%d path=%s [%s:%u]\n",
1260                            ret, kobject_name(&sess->kobj), sess->hca_name,
1261                            sess->hca_port);
1262                if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT)
1263                        atomic_dec(&sess->stats->inflight);
1264                req->need_inv = false;
1265                if (req->sg_cnt)
1266                        ib_dma_unmap_sg(dev->ib_dev, req->sglist,
1267                                        req->sg_cnt, req->dir);
1268        }
1269
1270        return ret;
1271}
1272
1273/**
1274 * rtrs_clt_failover_req() - Try to find an active path for a failed request
1275 * @clt: clt context
1276 * @fail_req: a failed io request.
1277 */
1278static int rtrs_clt_failover_req(struct rtrs_clt *clt,
1279                                 struct rtrs_clt_io_req *fail_req)
1280{
1281        struct rtrs_clt_sess *alive_sess;
1282        struct rtrs_clt_io_req *req;
1283        int err = -ECONNABORTED;
1284        struct path_it it;
1285
1286        rcu_read_lock();
1287        for (path_it_init(&it, clt);
1288             (alive_sess = it.next_path(&it)) && it.i < it.clt->paths_num;
1289             it.i++) {
1290                if (unlikely(READ_ONCE(alive_sess->state) !=
1291                             RTRS_CLT_CONNECTED))
1292                        continue;
1293                req = rtrs_clt_get_copy_req(alive_sess, fail_req);
1294                if (req->dir == DMA_TO_DEVICE)
1295                        err = rtrs_clt_write_req(req);
1296                else
1297                        err = rtrs_clt_read_req(req);
1298                if (unlikely(err)) {
1299                        req->in_use = false;
1300                        continue;
1301                }
1302                /* Success path */
1303                rtrs_clt_inc_failover_cnt(alive_sess->stats);
1304                break;
1305        }
1306        path_it_deinit(&it);
1307        rcu_read_unlock();
1308
1309        return err;
1310}
1311
1312static void fail_all_outstanding_reqs(struct rtrs_clt_sess *sess)
1313{
1314        struct rtrs_clt *clt = sess->clt;
1315        struct rtrs_clt_io_req *req;
1316        int i, err;
1317
1318        if (!sess->reqs)
1319                return;
1320        for (i = 0; i < sess->queue_depth; ++i) {
1321                req = &sess->reqs[i];
1322                if (!req->in_use)
1323                        continue;
1324
1325                /*
1326                 * Safely (without notification) complete failed request.
1327                 * After completion this request is still useble and can
1328                 * be failovered to another path.
1329                 */
1330                complete_rdma_req(req, -ECONNABORTED, false, true);
1331
1332                err = rtrs_clt_failover_req(clt, req);
1333                if (unlikely(err))
1334                        /* Failover failed, notify anyway */
1335                        req->conf(req->priv, err);
1336        }
1337}
1338
1339static void free_sess_reqs(struct rtrs_clt_sess *sess)
1340{
1341        struct rtrs_clt_io_req *req;
1342        int i;
1343
1344        if (!sess->reqs)
1345                return;
1346        for (i = 0; i < sess->queue_depth; ++i) {
1347                req = &sess->reqs[i];
1348                if (req->mr)
1349                        ib_dereg_mr(req->mr);
1350                kfree(req->sge);
1351                rtrs_iu_free(req->iu, sess->s.dev->ib_dev, 1);
1352        }
1353        kfree(sess->reqs);
1354        sess->reqs = NULL;
1355}
1356
1357static int alloc_sess_reqs(struct rtrs_clt_sess *sess)
1358{
1359        struct rtrs_clt_io_req *req;
1360        int i, err = -ENOMEM;
1361
1362        sess->reqs = kcalloc(sess->queue_depth, sizeof(*sess->reqs),
1363                             GFP_KERNEL);
1364        if (!sess->reqs)
1365                return -ENOMEM;
1366
1367        for (i = 0; i < sess->queue_depth; ++i) {
1368                req = &sess->reqs[i];
1369                req->iu = rtrs_iu_alloc(1, sess->max_hdr_size, GFP_KERNEL,
1370                                         sess->s.dev->ib_dev,
1371                                         DMA_TO_DEVICE,
1372                                         rtrs_clt_rdma_done);
1373                if (!req->iu)
1374                        goto out;
1375
1376                req->sge = kcalloc(2, sizeof(*req->sge), GFP_KERNEL);
1377                if (!req->sge)
1378                        goto out;
1379
1380                req->mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG,
1381                                      sess->max_pages_per_mr);
1382                if (IS_ERR(req->mr)) {
1383                        err = PTR_ERR(req->mr);
1384                        req->mr = NULL;
1385                        pr_err("Failed to alloc sess->max_pages_per_mr %d\n",
1386                               sess->max_pages_per_mr);
1387                        goto out;
1388                }
1389
1390                init_completion(&req->inv_comp);
1391        }
1392
1393        return 0;
1394
1395out:
1396        free_sess_reqs(sess);
1397
1398        return err;
1399}
1400
1401static int alloc_permits(struct rtrs_clt *clt)
1402{
1403        unsigned int chunk_bits;
1404        int err, i;
1405
1406        clt->permits_map = kcalloc(BITS_TO_LONGS(clt->queue_depth),
1407                                   sizeof(long), GFP_KERNEL);
1408        if (!clt->permits_map) {
1409                err = -ENOMEM;
1410                goto out_err;
1411        }
1412        clt->permits = kcalloc(clt->queue_depth, permit_size(clt), GFP_KERNEL);
1413        if (!clt->permits) {
1414                err = -ENOMEM;
1415                goto err_map;
1416        }
1417        chunk_bits = ilog2(clt->queue_depth - 1) + 1;
1418        for (i = 0; i < clt->queue_depth; i++) {
1419                struct rtrs_permit *permit;
1420
1421                permit = get_permit(clt, i);
1422                permit->mem_id = i;
1423                permit->mem_off = i << (MAX_IMM_PAYL_BITS - chunk_bits);
1424        }
1425
1426        return 0;
1427
1428err_map:
1429        kfree(clt->permits_map);
1430        clt->permits_map = NULL;
1431out_err:
1432        return err;
1433}
1434
1435static void free_permits(struct rtrs_clt *clt)
1436{
1437        if (clt->permits_map) {
1438                size_t sz = clt->queue_depth;
1439
1440                wait_event(clt->permits_wait,
1441                           find_first_bit(clt->permits_map, sz) >= sz);
1442        }
1443        kfree(clt->permits_map);
1444        clt->permits_map = NULL;
1445        kfree(clt->permits);
1446        clt->permits = NULL;
1447}
1448
1449static void query_fast_reg_mode(struct rtrs_clt_sess *sess)
1450{
1451        struct ib_device *ib_dev;
1452        u64 max_pages_per_mr;
1453        int mr_page_shift;
1454
1455        ib_dev = sess->s.dev->ib_dev;
1456
1457        /*
1458         * Use the smallest page size supported by the HCA, down to a
1459         * minimum of 4096 bytes. We're unlikely to build large sglists
1460         * out of smaller entries.
1461         */
1462        mr_page_shift      = max(12, ffs(ib_dev->attrs.page_size_cap) - 1);
1463        max_pages_per_mr   = ib_dev->attrs.max_mr_size;
1464        do_div(max_pages_per_mr, (1ull << mr_page_shift));
1465        sess->max_pages_per_mr =
1466                min3(sess->max_pages_per_mr, (u32)max_pages_per_mr,
1467                     ib_dev->attrs.max_fast_reg_page_list_len);
1468        sess->clt->max_segments =
1469                min(sess->max_pages_per_mr, sess->clt->max_segments);
1470}
1471
1472static bool rtrs_clt_change_state_get_old(struct rtrs_clt_sess *sess,
1473                                           enum rtrs_clt_state new_state,
1474                                           enum rtrs_clt_state *old_state)
1475{
1476        bool changed;
1477
1478        spin_lock_irq(&sess->state_wq.lock);
1479        if (old_state)
1480                *old_state = sess->state;
1481        changed = rtrs_clt_change_state(sess, new_state);
1482        spin_unlock_irq(&sess->state_wq.lock);
1483
1484        return changed;
1485}
1486
1487static void rtrs_clt_hb_err_handler(struct rtrs_con *c)
1488{
1489        struct rtrs_clt_con *con = container_of(c, typeof(*con), c);
1490
1491        rtrs_rdma_error_recovery(con);
1492}
1493
1494static void rtrs_clt_init_hb(struct rtrs_clt_sess *sess)
1495{
1496        rtrs_init_hb(&sess->s, &io_comp_cqe,
1497                      RTRS_HB_INTERVAL_MS,
1498                      RTRS_HB_MISSED_MAX,
1499                      rtrs_clt_hb_err_handler,
1500                      rtrs_wq);
1501}
1502
1503static void rtrs_clt_reconnect_work(struct work_struct *work);
1504static void rtrs_clt_close_work(struct work_struct *work);
1505
1506static struct rtrs_clt_sess *alloc_sess(struct rtrs_clt *clt,
1507                                        const struct rtrs_addr *path,
1508                                        size_t con_num, u32 nr_poll_queues)
1509{
1510        struct rtrs_clt_sess *sess;
1511        int err = -ENOMEM;
1512        int cpu;
1513        size_t total_con;
1514
1515        sess = kzalloc(sizeof(*sess), GFP_KERNEL);
1516        if (!sess)
1517                goto err;
1518
1519        /*
1520         * irqmode and poll
1521         * +1: Extra connection for user messages
1522         */
1523        total_con = con_num + nr_poll_queues + 1;
1524        sess->s.con = kcalloc(total_con, sizeof(*sess->s.con), GFP_KERNEL);
1525        if (!sess->s.con)
1526                goto err_free_sess;
1527
1528        sess->s.con_num = total_con;
1529        sess->s.irq_con_num = con_num + 1;
1530
1531        sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL);
1532        if (!sess->stats)
1533                goto err_free_con;
1534
1535        mutex_init(&sess->init_mutex);
1536        uuid_gen(&sess->s.uuid);
1537        memcpy(&sess->s.dst_addr, path->dst,
1538               rdma_addr_size((struct sockaddr *)path->dst));
1539
1540        /*
1541         * rdma_resolve_addr() passes src_addr to cma_bind_addr, which
1542         * checks the sa_family to be non-zero. If user passed src_addr=NULL
1543         * the sess->src_addr will contain only zeros, which is then fine.
1544         */
1545        if (path->src)
1546                memcpy(&sess->s.src_addr, path->src,
1547                       rdma_addr_size((struct sockaddr *)path->src));
1548        strscpy(sess->s.sessname, clt->sessname, sizeof(sess->s.sessname));
1549        sess->clt = clt;
1550        sess->max_pages_per_mr = RTRS_MAX_SEGMENTS;
1551        init_waitqueue_head(&sess->state_wq);
1552        sess->state = RTRS_CLT_CONNECTING;
1553        atomic_set(&sess->connected_cnt, 0);
1554        INIT_WORK(&sess->close_work, rtrs_clt_close_work);
1555        INIT_DELAYED_WORK(&sess->reconnect_dwork, rtrs_clt_reconnect_work);
1556        rtrs_clt_init_hb(sess);
1557
1558        sess->mp_skip_entry = alloc_percpu(typeof(*sess->mp_skip_entry));
1559        if (!sess->mp_skip_entry)
1560                goto err_free_stats;
1561
1562        for_each_possible_cpu(cpu)
1563                INIT_LIST_HEAD(per_cpu_ptr(sess->mp_skip_entry, cpu));
1564
1565        err = rtrs_clt_init_stats(sess->stats);
1566        if (err)
1567                goto err_free_percpu;
1568
1569        return sess;
1570
1571err_free_percpu:
1572        free_percpu(sess->mp_skip_entry);
1573err_free_stats:
1574        kfree(sess->stats);
1575err_free_con:
1576        kfree(sess->s.con);
1577err_free_sess:
1578        kfree(sess);
1579err:
1580        return ERR_PTR(err);
1581}
1582
1583void free_sess(struct rtrs_clt_sess *sess)
1584{
1585        free_percpu(sess->mp_skip_entry);
1586        mutex_destroy(&sess->init_mutex);
1587        kfree(sess->s.con);
1588        kfree(sess->rbufs);
1589        kfree(sess);
1590}
1591
1592static int create_con(struct rtrs_clt_sess *sess, unsigned int cid)
1593{
1594        struct rtrs_clt_con *con;
1595
1596        con = kzalloc(sizeof(*con), GFP_KERNEL);
1597        if (!con)
1598                return -ENOMEM;
1599
1600        /* Map first two connections to the first CPU */
1601        con->cpu  = (cid ? cid - 1 : 0) % nr_cpu_ids;
1602        con->c.cid = cid;
1603        con->c.sess = &sess->s;
1604        atomic_set(&con->io_cnt, 0);
1605        mutex_init(&con->con_mutex);
1606
1607        sess->s.con[cid] = &con->c;
1608
1609        return 0;
1610}
1611
1612static void destroy_con(struct rtrs_clt_con *con)
1613{
1614        struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1615
1616        sess->s.con[con->c.cid] = NULL;
1617        mutex_destroy(&con->con_mutex);
1618        kfree(con);
1619}
1620
1621static int create_con_cq_qp(struct rtrs_clt_con *con)
1622{
1623        struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1624        u32 max_send_wr, max_recv_wr, cq_num, max_send_sge, wr_limit;
1625        int err, cq_vector;
1626        struct rtrs_msg_rkey_rsp *rsp;
1627
1628        lockdep_assert_held(&con->con_mutex);
1629        if (con->c.cid == 0) {
1630                max_send_sge = 1;
1631                /* We must be the first here */
1632                if (WARN_ON(sess->s.dev))
1633                        return -EINVAL;
1634
1635                /*
1636                 * The whole session uses device from user connection.
1637                 * Be careful not to close user connection before ib dev
1638                 * is gracefully put.
1639                 */
1640                sess->s.dev = rtrs_ib_dev_find_or_add(con->c.cm_id->device,
1641                                                       &dev_pd);
1642                if (!sess->s.dev) {
1643                        rtrs_wrn(sess->clt,
1644                                  "rtrs_ib_dev_find_get_or_add(): no memory\n");
1645                        return -ENOMEM;
1646                }
1647                sess->s.dev_ref = 1;
1648                query_fast_reg_mode(sess);
1649                wr_limit = sess->s.dev->ib_dev->attrs.max_qp_wr;
1650                /*
1651                 * Two (request + registration) completion for send
1652                 * Two for recv if always_invalidate is set on server
1653                 * or one for recv.
1654                 * + 2 for drain and heartbeat
1655                 * in case qp gets into error state.
1656                 */
1657                max_send_wr =
1658                        min_t(int, wr_limit, SERVICE_CON_QUEUE_DEPTH * 2 + 2);
1659                max_recv_wr = max_send_wr;
1660        } else {
1661                /*
1662                 * Here we assume that session members are correctly set.
1663                 * This is always true if user connection (cid == 0) is
1664                 * established first.
1665                 */
1666                if (WARN_ON(!sess->s.dev))
1667                        return -EINVAL;
1668                if (WARN_ON(!sess->queue_depth))
1669                        return -EINVAL;
1670
1671                wr_limit = sess->s.dev->ib_dev->attrs.max_qp_wr;
1672                /* Shared between connections */
1673                sess->s.dev_ref++;
1674                max_send_wr = min_t(int, wr_limit,
1675                              /* QD * (REQ + RSP + FR REGS or INVS) + drain */
1676                              sess->queue_depth * 3 + 1);
1677                max_recv_wr = min_t(int, wr_limit,
1678                              sess->queue_depth * 3 + 1);
1679                max_send_sge = 2;
1680        }
1681        cq_num = max_send_wr + max_recv_wr;
1682        /* alloc iu to recv new rkey reply when server reports flags set */
1683        if (sess->flags & RTRS_MSG_NEW_RKEY_F || con->c.cid == 0) {
1684                con->rsp_ius = rtrs_iu_alloc(cq_num, sizeof(*rsp),
1685                                              GFP_KERNEL, sess->s.dev->ib_dev,
1686                                              DMA_FROM_DEVICE,
1687                                              rtrs_clt_rdma_done);
1688                if (!con->rsp_ius)
1689                        return -ENOMEM;
1690                con->queue_num = cq_num;
1691        }
1692        cq_num = max_send_wr + max_recv_wr;
1693        cq_vector = con->cpu % sess->s.dev->ib_dev->num_comp_vectors;
1694        if (con->c.cid >= sess->s.irq_con_num)
1695                err = rtrs_cq_qp_create(&sess->s, &con->c, max_send_sge,
1696                                        cq_vector, cq_num, max_send_wr,
1697                                        max_recv_wr, IB_POLL_DIRECT);
1698        else
1699                err = rtrs_cq_qp_create(&sess->s, &con->c, max_send_sge,
1700                                        cq_vector, cq_num, max_send_wr,
1701                                        max_recv_wr, IB_POLL_SOFTIRQ);
1702        /*
1703         * In case of error we do not bother to clean previous allocations,
1704         * since destroy_con_cq_qp() must be called.
1705         */
1706        return err;
1707}
1708
1709static void destroy_con_cq_qp(struct rtrs_clt_con *con)
1710{
1711        struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1712
1713        /*
1714         * Be careful here: destroy_con_cq_qp() can be called even
1715         * create_con_cq_qp() failed, see comments there.
1716         */
1717        lockdep_assert_held(&con->con_mutex);
1718        rtrs_cq_qp_destroy(&con->c);
1719        if (con->rsp_ius) {
1720                rtrs_iu_free(con->rsp_ius, sess->s.dev->ib_dev, con->queue_num);
1721                con->rsp_ius = NULL;
1722                con->queue_num = 0;
1723        }
1724        if (sess->s.dev_ref && !--sess->s.dev_ref) {
1725                rtrs_ib_dev_put(sess->s.dev);
1726                sess->s.dev = NULL;
1727        }
1728}
1729
1730static void stop_cm(struct rtrs_clt_con *con)
1731{
1732        rdma_disconnect(con->c.cm_id);
1733        if (con->c.qp)
1734                ib_drain_qp(con->c.qp);
1735}
1736
1737static void destroy_cm(struct rtrs_clt_con *con)
1738{
1739        rdma_destroy_id(con->c.cm_id);
1740        con->c.cm_id = NULL;
1741}
1742
1743static int rtrs_rdma_addr_resolved(struct rtrs_clt_con *con)
1744{
1745        struct rtrs_sess *s = con->c.sess;
1746        int err;
1747
1748        mutex_lock(&con->con_mutex);
1749        err = create_con_cq_qp(con);
1750        mutex_unlock(&con->con_mutex);
1751        if (err) {
1752                rtrs_err(s, "create_con_cq_qp(), err: %d\n", err);
1753                return err;
1754        }
1755        err = rdma_resolve_route(con->c.cm_id, RTRS_CONNECT_TIMEOUT_MS);
1756        if (err)
1757                rtrs_err(s, "Resolving route failed, err: %d\n", err);
1758
1759        return err;
1760}
1761
1762static int rtrs_rdma_route_resolved(struct rtrs_clt_con *con)
1763{
1764        struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1765        struct rtrs_clt *clt = sess->clt;
1766        struct rtrs_msg_conn_req msg;
1767        struct rdma_conn_param param;
1768
1769        int err;
1770
1771        param = (struct rdma_conn_param) {
1772                .retry_count = 7,
1773                .rnr_retry_count = 7,
1774                .private_data = &msg,
1775                .private_data_len = sizeof(msg),
1776        };
1777
1778        msg = (struct rtrs_msg_conn_req) {
1779                .magic = cpu_to_le16(RTRS_MAGIC),
1780                .version = cpu_to_le16(RTRS_PROTO_VER),
1781                .cid = cpu_to_le16(con->c.cid),
1782                .cid_num = cpu_to_le16(sess->s.con_num),
1783                .recon_cnt = cpu_to_le16(sess->s.recon_cnt),
1784        };
1785        msg.first_conn = sess->for_new_clt ? FIRST_CONN : 0;
1786        uuid_copy(&msg.sess_uuid, &sess->s.uuid);
1787        uuid_copy(&msg.paths_uuid, &clt->paths_uuid);
1788
1789        err = rdma_connect_locked(con->c.cm_id, &param);
1790        if (err)
1791                rtrs_err(clt, "rdma_connect_locked(): %d\n", err);
1792
1793        return err;
1794}
1795
1796static int rtrs_rdma_conn_established(struct rtrs_clt_con *con,
1797                                       struct rdma_cm_event *ev)
1798{
1799        struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1800        struct rtrs_clt *clt = sess->clt;
1801        const struct rtrs_msg_conn_rsp *msg;
1802        u16 version, queue_depth;
1803        int errno;
1804        u8 len;
1805
1806        msg = ev->param.conn.private_data;
1807        len = ev->param.conn.private_data_len;
1808        if (len < sizeof(*msg)) {
1809                rtrs_err(clt, "Invalid RTRS connection response\n");
1810                return -ECONNRESET;
1811        }
1812        if (le16_to_cpu(msg->magic) != RTRS_MAGIC) {
1813                rtrs_err(clt, "Invalid RTRS magic\n");
1814                return -ECONNRESET;
1815        }
1816        version = le16_to_cpu(msg->version);
1817        if (version >> 8 != RTRS_PROTO_VER_MAJOR) {
1818                rtrs_err(clt, "Unsupported major RTRS version: %d, expected %d\n",
1819                          version >> 8, RTRS_PROTO_VER_MAJOR);
1820                return -ECONNRESET;
1821        }
1822        errno = le16_to_cpu(msg->errno);
1823        if (errno) {
1824                rtrs_err(clt, "Invalid RTRS message: errno %d\n",
1825                          errno);
1826                return -ECONNRESET;
1827        }
1828        if (con->c.cid == 0) {
1829                queue_depth = le16_to_cpu(msg->queue_depth);
1830
1831                if (sess->queue_depth > 0 && queue_depth != sess->queue_depth) {
1832                        rtrs_err(clt, "Error: queue depth changed\n");
1833
1834                        /*
1835                         * Stop any more reconnection attempts
1836                         */
1837                        sess->reconnect_attempts = -1;
1838                        rtrs_err(clt,
1839                                "Disabling auto-reconnect. Trigger a manual reconnect after issue is resolved\n");
1840                        return -ECONNRESET;
1841                }
1842
1843                if (!sess->rbufs) {
1844                        kfree(sess->rbufs);
1845                        sess->rbufs = kcalloc(queue_depth, sizeof(*sess->rbufs),
1846                                              GFP_KERNEL);
1847                        if (!sess->rbufs)
1848                                return -ENOMEM;
1849                }
1850                sess->queue_depth = queue_depth;
1851                sess->max_hdr_size = le32_to_cpu(msg->max_hdr_size);
1852                sess->max_io_size = le32_to_cpu(msg->max_io_size);
1853                sess->flags = le32_to_cpu(msg->flags);
1854                sess->chunk_size = sess->max_io_size + sess->max_hdr_size;
1855
1856                /*
1857                 * Global IO size is always a minimum.
1858                 * If while a reconnection server sends us a value a bit
1859                 * higher - client does not care and uses cached minimum.
1860                 *
1861                 * Since we can have several sessions (paths) restablishing
1862                 * connections in parallel, use lock.
1863                 */
1864                mutex_lock(&clt->paths_mutex);
1865                clt->queue_depth = sess->queue_depth;
1866                clt->max_io_size = min_not_zero(sess->max_io_size,
1867                                                clt->max_io_size);
1868                mutex_unlock(&clt->paths_mutex);
1869
1870                /*
1871                 * Cache the hca_port and hca_name for sysfs
1872                 */
1873                sess->hca_port = con->c.cm_id->port_num;
1874                scnprintf(sess->hca_name, sizeof(sess->hca_name),
1875                          sess->s.dev->ib_dev->name);
1876                sess->s.src_addr = con->c.cm_id->route.addr.src_addr;
1877                /* set for_new_clt, to allow future reconnect on any path */
1878                sess->for_new_clt = 1;
1879        }
1880
1881        return 0;
1882}
1883
1884static inline void flag_success_on_conn(struct rtrs_clt_con *con)
1885{
1886        struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1887
1888        atomic_inc(&sess->connected_cnt);
1889        con->cm_err = 1;
1890}
1891
1892static int rtrs_rdma_conn_rejected(struct rtrs_clt_con *con,
1893                                    struct rdma_cm_event *ev)
1894{
1895        struct rtrs_sess *s = con->c.sess;
1896        const struct rtrs_msg_conn_rsp *msg;
1897        const char *rej_msg;
1898        int status, errno;
1899        u8 data_len;
1900
1901        status = ev->status;
1902        rej_msg = rdma_reject_msg(con->c.cm_id, status);
1903        msg = rdma_consumer_reject_data(con->c.cm_id, ev, &data_len);
1904
1905        if (msg && data_len >= sizeof(*msg)) {
1906                errno = (int16_t)le16_to_cpu(msg->errno);
1907                if (errno == -EBUSY)
1908                        rtrs_err(s,
1909                                  "Previous session is still exists on the server, please reconnect later\n");
1910                else
1911                        rtrs_err(s,
1912                                  "Connect rejected: status %d (%s), rtrs errno %d\n",
1913                                  status, rej_msg, errno);
1914        } else {
1915                rtrs_err(s,
1916                          "Connect rejected but with malformed message: status %d (%s)\n",
1917                          status, rej_msg);
1918        }
1919
1920        return -ECONNRESET;
1921}
1922
1923void rtrs_clt_close_conns(struct rtrs_clt_sess *sess, bool wait)
1924{
1925        if (rtrs_clt_change_state_get_old(sess, RTRS_CLT_CLOSING, NULL))
1926                queue_work(rtrs_wq, &sess->close_work);
1927        if (wait)
1928                flush_work(&sess->close_work);
1929}
1930
1931static inline void flag_error_on_conn(struct rtrs_clt_con *con, int cm_err)
1932{
1933        if (con->cm_err == 1) {
1934                struct rtrs_clt_sess *sess;
1935
1936                sess = to_clt_sess(con->c.sess);
1937                if (atomic_dec_and_test(&sess->connected_cnt))
1938
1939                        wake_up(&sess->state_wq);
1940        }
1941        con->cm_err = cm_err;
1942}
1943
1944static int rtrs_clt_rdma_cm_handler(struct rdma_cm_id *cm_id,
1945                                     struct rdma_cm_event *ev)
1946{
1947        struct rtrs_clt_con *con = cm_id->context;
1948        struct rtrs_sess *s = con->c.sess;
1949        struct rtrs_clt_sess *sess = to_clt_sess(s);
1950        int cm_err = 0;
1951
1952        switch (ev->event) {
1953        case RDMA_CM_EVENT_ADDR_RESOLVED:
1954                cm_err = rtrs_rdma_addr_resolved(con);
1955                break;
1956        case RDMA_CM_EVENT_ROUTE_RESOLVED:
1957                cm_err = rtrs_rdma_route_resolved(con);
1958                break;
1959        case RDMA_CM_EVENT_ESTABLISHED:
1960                cm_err = rtrs_rdma_conn_established(con, ev);
1961                if (likely(!cm_err)) {
1962                        /*
1963                         * Report success and wake up. Here we abuse state_wq,
1964                         * i.e. wake up without state change, but we set cm_err.
1965                         */
1966                        flag_success_on_conn(con);
1967                        wake_up(&sess->state_wq);
1968                        return 0;
1969                }
1970                break;
1971        case RDMA_CM_EVENT_REJECTED:
1972                cm_err = rtrs_rdma_conn_rejected(con, ev);
1973                break;
1974        case RDMA_CM_EVENT_DISCONNECTED:
1975                /* No message for disconnecting */
1976                cm_err = -ECONNRESET;
1977                break;
1978        case RDMA_CM_EVENT_CONNECT_ERROR:
1979        case RDMA_CM_EVENT_UNREACHABLE:
1980        case RDMA_CM_EVENT_ADDR_CHANGE:
1981        case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1982                rtrs_wrn(s, "CM error (CM event: %s, err: %d)\n",
1983                         rdma_event_msg(ev->event), ev->status);
1984                cm_err = -ECONNRESET;
1985                break;
1986        case RDMA_CM_EVENT_ADDR_ERROR:
1987        case RDMA_CM_EVENT_ROUTE_ERROR:
1988                rtrs_wrn(s, "CM error (CM event: %s, err: %d)\n",
1989                         rdma_event_msg(ev->event), ev->status);
1990                cm_err = -EHOSTUNREACH;
1991                break;
1992        case RDMA_CM_EVENT_DEVICE_REMOVAL:
1993                /*
1994                 * Device removal is a special case.  Queue close and return 0.
1995                 */
1996                rtrs_clt_close_conns(sess, false);
1997                return 0;
1998        default:
1999                rtrs_err(s, "Unexpected RDMA CM error (CM event: %s, err: %d)\n",
2000                         rdma_event_msg(ev->event), ev->status);
2001                cm_err = -ECONNRESET;
2002                break;
2003        }
2004
2005        if (cm_err) {
2006                /*
2007                 * cm error makes sense only on connection establishing,
2008                 * in other cases we rely on normal procedure of reconnecting.
2009                 */
2010                flag_error_on_conn(con, cm_err);
2011                rtrs_rdma_error_recovery(con);
2012        }
2013
2014        return 0;
2015}
2016
2017static int create_cm(struct rtrs_clt_con *con)
2018{
2019        struct rtrs_sess *s = con->c.sess;
2020        struct rtrs_clt_sess *sess = to_clt_sess(s);
2021        struct rdma_cm_id *cm_id;
2022        int err;
2023
2024        cm_id = rdma_create_id(&init_net, rtrs_clt_rdma_cm_handler, con,
2025                               sess->s.dst_addr.ss_family == AF_IB ?
2026                               RDMA_PS_IB : RDMA_PS_TCP, IB_QPT_RC);
2027        if (IS_ERR(cm_id)) {
2028                err = PTR_ERR(cm_id);
2029                rtrs_err(s, "Failed to create CM ID, err: %d\n", err);
2030
2031                return err;
2032        }
2033        con->c.cm_id = cm_id;
2034        con->cm_err = 0;
2035        /* allow the port to be reused */
2036        err = rdma_set_reuseaddr(cm_id, 1);
2037        if (err != 0) {
2038                rtrs_err(s, "Set address reuse failed, err: %d\n", err);
2039                goto destroy_cm;
2040        }
2041        err = rdma_resolve_addr(cm_id, (struct sockaddr *)&sess->s.src_addr,
2042                                (struct sockaddr *)&sess->s.dst_addr,
2043                                RTRS_CONNECT_TIMEOUT_MS);
2044        if (err) {
2045                rtrs_err(s, "Failed to resolve address, err: %d\n", err);
2046                goto destroy_cm;
2047        }
2048        /*
2049         * Combine connection status and session events. This is needed
2050         * for waiting two possible cases: cm_err has something meaningful
2051         * or session state was really changed to error by device removal.
2052         */
2053        err = wait_event_interruptible_timeout(
2054                        sess->state_wq,
2055                        con->cm_err || sess->state != RTRS_CLT_CONNECTING,
2056                        msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS));
2057        if (err == 0 || err == -ERESTARTSYS) {
2058                if (err == 0)
2059                        err = -ETIMEDOUT;
2060                /* Timedout or interrupted */
2061                goto errr;
2062        }
2063        if (con->cm_err < 0) {
2064                err = con->cm_err;
2065                goto errr;
2066        }
2067        if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTING) {
2068                /* Device removal */
2069                err = -ECONNABORTED;
2070                goto errr;
2071        }
2072
2073        return 0;
2074
2075errr:
2076        stop_cm(con);
2077        mutex_lock(&con->con_mutex);
2078        destroy_con_cq_qp(con);
2079        mutex_unlock(&con->con_mutex);
2080destroy_cm:
2081        destroy_cm(con);
2082
2083        return err;
2084}
2085
2086static void rtrs_clt_sess_up(struct rtrs_clt_sess *sess)
2087{
2088        struct rtrs_clt *clt = sess->clt;
2089        int up;
2090
2091        /*
2092         * We can fire RECONNECTED event only when all paths were
2093         * connected on rtrs_clt_open(), then each was disconnected
2094         * and the first one connected again.  That's why this nasty
2095         * game with counter value.
2096         */
2097
2098        mutex_lock(&clt->paths_ev_mutex);
2099        up = ++clt->paths_up;
2100        /*
2101         * Here it is safe to access paths num directly since up counter
2102         * is greater than MAX_PATHS_NUM only while rtrs_clt_open() is
2103         * in progress, thus paths removals are impossible.
2104         */
2105        if (up > MAX_PATHS_NUM && up == MAX_PATHS_NUM + clt->paths_num)
2106                clt->paths_up = clt->paths_num;
2107        else if (up == 1)
2108                clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_RECONNECTED);
2109        mutex_unlock(&clt->paths_ev_mutex);
2110
2111        /* Mark session as established */
2112        sess->established = true;
2113        sess->reconnect_attempts = 0;
2114        sess->stats->reconnects.successful_cnt++;
2115}
2116
2117static void rtrs_clt_sess_down(struct rtrs_clt_sess *sess)
2118{
2119        struct rtrs_clt *clt = sess->clt;
2120
2121        if (!sess->established)
2122                return;
2123
2124        sess->established = false;
2125        mutex_lock(&clt->paths_ev_mutex);
2126        WARN_ON(!clt->paths_up);
2127        if (--clt->paths_up == 0)
2128                clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_DISCONNECTED);
2129        mutex_unlock(&clt->paths_ev_mutex);
2130}
2131
2132static void rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_sess *sess)
2133{
2134        struct rtrs_clt_con *con;
2135        unsigned int cid;
2136
2137        WARN_ON(READ_ONCE(sess->state) == RTRS_CLT_CONNECTED);
2138
2139        /*
2140         * Possible race with rtrs_clt_open(), when DEVICE_REMOVAL comes
2141         * exactly in between.  Start destroying after it finishes.
2142         */
2143        mutex_lock(&sess->init_mutex);
2144        mutex_unlock(&sess->init_mutex);
2145
2146        /*
2147         * All IO paths must observe !CONNECTED state before we
2148         * free everything.
2149         */
2150        synchronize_rcu();
2151
2152        rtrs_stop_hb(&sess->s);
2153
2154        /*
2155         * The order it utterly crucial: firstly disconnect and complete all
2156         * rdma requests with error (thus set in_use=false for requests),
2157         * then fail outstanding requests checking in_use for each, and
2158         * eventually notify upper layer about session disconnection.
2159         */
2160
2161        for (cid = 0; cid < sess->s.con_num; cid++) {
2162                if (!sess->s.con[cid])
2163                        break;
2164                con = to_clt_con(sess->s.con[cid]);
2165                stop_cm(con);
2166        }
2167        fail_all_outstanding_reqs(sess);
2168        free_sess_reqs(sess);
2169        rtrs_clt_sess_down(sess);
2170
2171        /*
2172         * Wait for graceful shutdown, namely when peer side invokes
2173         * rdma_disconnect(). 'connected_cnt' is decremented only on
2174         * CM events, thus if other side had crashed and hb has detected
2175         * something is wrong, here we will stuck for exactly timeout ms,
2176         * since CM does not fire anything.  That is fine, we are not in
2177         * hurry.
2178         */
2179        wait_event_timeout(sess->state_wq, !atomic_read(&sess->connected_cnt),
2180                           msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS));
2181
2182        for (cid = 0; cid < sess->s.con_num; cid++) {
2183                if (!sess->s.con[cid])
2184                        break;
2185                con = to_clt_con(sess->s.con[cid]);
2186                mutex_lock(&con->con_mutex);
2187                destroy_con_cq_qp(con);
2188                mutex_unlock(&con->con_mutex);
2189                destroy_cm(con);
2190                destroy_con(con);
2191        }
2192}
2193
2194static inline bool xchg_sessions(struct rtrs_clt_sess __rcu **rcu_ppcpu_path,
2195                                 struct rtrs_clt_sess *sess,
2196                                 struct rtrs_clt_sess *next)
2197{
2198        struct rtrs_clt_sess **ppcpu_path;
2199
2200        /* Call cmpxchg() without sparse warnings */
2201        ppcpu_path = (typeof(ppcpu_path))rcu_ppcpu_path;
2202        return sess == cmpxchg(ppcpu_path, sess, next);
2203}
2204
2205static void rtrs_clt_remove_path_from_arr(struct rtrs_clt_sess *sess)
2206{
2207        struct rtrs_clt *clt = sess->clt;
2208        struct rtrs_clt_sess *next;
2209        bool wait_for_grace = false;
2210        int cpu;
2211
2212        mutex_lock(&clt->paths_mutex);
2213        list_del_rcu(&sess->s.entry);
2214
2215        /* Make sure everybody observes path removal. */
2216        synchronize_rcu();
2217
2218        /*
2219         * At this point nobody sees @sess in the list, but still we have
2220         * dangling pointer @pcpu_path which _can_ point to @sess.  Since
2221         * nobody can observe @sess in the list, we guarantee that IO path
2222         * will not assign @sess to @pcpu_path, i.e. @pcpu_path can be equal
2223         * to @sess, but can never again become @sess.
2224         */
2225
2226        /*
2227         * Decrement paths number only after grace period, because
2228         * caller of do_each_path() must firstly observe list without
2229         * path and only then decremented paths number.
2230         *
2231         * Otherwise there can be the following situation:
2232         *    o Two paths exist and IO is coming.
2233         *    o One path is removed:
2234         *      CPU#0                          CPU#1
2235         *      do_each_path():                rtrs_clt_remove_path_from_arr():
2236         *          path = get_next_path()
2237         *          ^^^                            list_del_rcu(path)
2238         *          [!CONNECTED path]              clt->paths_num--
2239         *                                              ^^^^^^^^^
2240         *          load clt->paths_num                 from 2 to 1
2241         *                    ^^^^^^^^^
2242         *                    sees 1
2243         *
2244         *      path is observed as !CONNECTED, but do_each_path() loop
2245         *      ends, because expression i < clt->paths_num is false.
2246         */
2247        clt->paths_num--;
2248
2249        /*
2250         * Get @next connection from current @sess which is going to be
2251         * removed.  If @sess is the last element, then @next is NULL.
2252         */
2253        rcu_read_lock();
2254        next = list_next_or_null_rr_rcu(&clt->paths_list, &sess->s.entry,
2255                                        typeof(*next), s.entry);
2256        rcu_read_unlock();
2257
2258        /*
2259         * @pcpu paths can still point to the path which is going to be
2260         * removed, so change the pointer manually.
2261         */
2262        for_each_possible_cpu(cpu) {
2263                struct rtrs_clt_sess __rcu **ppcpu_path;
2264
2265                ppcpu_path = per_cpu_ptr(clt->pcpu_path, cpu);
2266                if (rcu_dereference_protected(*ppcpu_path,
2267                        lockdep_is_held(&clt->paths_mutex)) != sess)
2268                        /*
2269                         * synchronize_rcu() was called just after deleting
2270                         * entry from the list, thus IO code path cannot
2271                         * change pointer back to the pointer which is going
2272                         * to be removed, we are safe here.
2273                         */
2274                        continue;
2275
2276                /*
2277                 * We race with IO code path, which also changes pointer,
2278                 * thus we have to be careful not to overwrite it.
2279                 */
2280                if (xchg_sessions(ppcpu_path, sess, next))
2281                        /*
2282                         * @ppcpu_path was successfully replaced with @next,
2283                         * that means that someone could also pick up the
2284                         * @sess and dereferencing it right now, so wait for
2285                         * a grace period is required.
2286                         */
2287                        wait_for_grace = true;
2288        }
2289        if (wait_for_grace)
2290                synchronize_rcu();
2291
2292        mutex_unlock(&clt->paths_mutex);
2293}
2294
2295static void rtrs_clt_add_path_to_arr(struct rtrs_clt_sess *sess)
2296{
2297        struct rtrs_clt *clt = sess->clt;
2298
2299        mutex_lock(&clt->paths_mutex);
2300        clt->paths_num++;
2301
2302        list_add_tail_rcu(&sess->s.entry, &clt->paths_list);
2303        mutex_unlock(&clt->paths_mutex);
2304}
2305
2306static void rtrs_clt_close_work(struct work_struct *work)
2307{
2308        struct rtrs_clt_sess *sess;
2309
2310        sess = container_of(work, struct rtrs_clt_sess, close_work);
2311
2312        cancel_delayed_work_sync(&sess->reconnect_dwork);
2313        rtrs_clt_stop_and_destroy_conns(sess);
2314        rtrs_clt_change_state_get_old(sess, RTRS_CLT_CLOSED, NULL);
2315}
2316
2317static int init_conns(struct rtrs_clt_sess *sess)
2318{
2319        unsigned int cid;
2320        int err;
2321
2322        /*
2323         * On every new session connections increase reconnect counter
2324         * to avoid clashes with previous sessions not yet closed
2325         * sessions on a server side.
2326         */
2327        sess->s.recon_cnt++;
2328
2329        /* Establish all RDMA connections  */
2330        for (cid = 0; cid < sess->s.con_num; cid++) {
2331                err = create_con(sess, cid);
2332                if (err)
2333                        goto destroy;
2334
2335                err = create_cm(to_clt_con(sess->s.con[cid]));
2336                if (err) {
2337                        destroy_con(to_clt_con(sess->s.con[cid]));
2338                        goto destroy;
2339                }
2340        }
2341        err = alloc_sess_reqs(sess);
2342        if (err)
2343                goto destroy;
2344
2345        rtrs_start_hb(&sess->s);
2346
2347        return 0;
2348
2349destroy:
2350        while (cid--) {
2351                struct rtrs_clt_con *con = to_clt_con(sess->s.con[cid]);
2352
2353                stop_cm(con);
2354
2355                mutex_lock(&con->con_mutex);
2356                destroy_con_cq_qp(con);
2357                mutex_unlock(&con->con_mutex);
2358                destroy_cm(con);
2359                destroy_con(con);
2360        }
2361        /*
2362         * If we've never taken async path and got an error, say,
2363         * doing rdma_resolve_addr(), switch to CONNECTION_ERR state
2364         * manually to keep reconnecting.
2365         */
2366        rtrs_clt_change_state_get_old(sess, RTRS_CLT_CONNECTING_ERR, NULL);
2367
2368        return err;
2369}
2370
2371static void rtrs_clt_info_req_done(struct ib_cq *cq, struct ib_wc *wc)
2372{
2373        struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
2374        struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
2375        struct rtrs_iu *iu;
2376
2377        iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
2378        rtrs_iu_free(iu, sess->s.dev->ib_dev, 1);
2379
2380        if (unlikely(wc->status != IB_WC_SUCCESS)) {
2381                rtrs_err(sess->clt, "Sess info request send failed: %s\n",
2382                          ib_wc_status_msg(wc->status));
2383                rtrs_clt_change_state_get_old(sess, RTRS_CLT_CONNECTING_ERR, NULL);
2384                return;
2385        }
2386
2387        rtrs_clt_update_wc_stats(con);
2388}
2389
2390static int process_info_rsp(struct rtrs_clt_sess *sess,
2391                            const struct rtrs_msg_info_rsp *msg)
2392{
2393        unsigned int sg_cnt, total_len;
2394        int i, sgi;
2395
2396        sg_cnt = le16_to_cpu(msg->sg_cnt);
2397        if (unlikely(!sg_cnt || (sess->queue_depth % sg_cnt))) {
2398                rtrs_err(sess->clt, "Incorrect sg_cnt %d, is not multiple\n",
2399                          sg_cnt);
2400                return -EINVAL;
2401        }
2402
2403        /*
2404         * Check if IB immediate data size is enough to hold the mem_id and
2405         * the offset inside the memory chunk.
2406         */
2407        if (unlikely((ilog2(sg_cnt - 1) + 1) +
2408                     (ilog2(sess->chunk_size - 1) + 1) >
2409                     MAX_IMM_PAYL_BITS)) {
2410                rtrs_err(sess->clt,
2411                          "RDMA immediate size (%db) not enough to encode %d buffers of size %dB\n",
2412                          MAX_IMM_PAYL_BITS, sg_cnt, sess->chunk_size);
2413                return -EINVAL;
2414        }
2415        total_len = 0;
2416        for (sgi = 0, i = 0; sgi < sg_cnt && i < sess->queue_depth; sgi++) {
2417                const struct rtrs_sg_desc *desc = &msg->desc[sgi];
2418                u32 len, rkey;
2419                u64 addr;
2420
2421                addr = le64_to_cpu(desc->addr);
2422                rkey = le32_to_cpu(desc->key);
2423                len  = le32_to_cpu(desc->len);
2424
2425                total_len += len;
2426
2427                if (unlikely(!len || (len % sess->chunk_size))) {
2428                        rtrs_err(sess->clt, "Incorrect [%d].len %d\n", sgi,
2429                                  len);
2430                        return -EINVAL;
2431                }
2432                for ( ; len && i < sess->queue_depth; i++) {
2433                        sess->rbufs[i].addr = addr;
2434                        sess->rbufs[i].rkey = rkey;
2435
2436                        len  -= sess->chunk_size;
2437                        addr += sess->chunk_size;
2438                }
2439        }
2440        /* Sanity check */
2441        if (unlikely(sgi != sg_cnt || i != sess->queue_depth)) {
2442                rtrs_err(sess->clt, "Incorrect sg vector, not fully mapped\n");
2443                return -EINVAL;
2444        }
2445        if (unlikely(total_len != sess->chunk_size * sess->queue_depth)) {
2446                rtrs_err(sess->clt, "Incorrect total_len %d\n", total_len);
2447                return -EINVAL;
2448        }
2449
2450        return 0;
2451}
2452
2453static void rtrs_clt_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc)
2454{
2455        struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
2456        struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
2457        struct rtrs_msg_info_rsp *msg;
2458        enum rtrs_clt_state state;
2459        struct rtrs_iu *iu;
2460        size_t rx_sz;
2461        int err;
2462
2463        state = RTRS_CLT_CONNECTING_ERR;
2464
2465        WARN_ON(con->c.cid);
2466        iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
2467        if (unlikely(wc->status != IB_WC_SUCCESS)) {
2468                rtrs_err(sess->clt, "Sess info response recv failed: %s\n",
2469                          ib_wc_status_msg(wc->status));
2470                goto out;
2471        }
2472        WARN_ON(wc->opcode != IB_WC_RECV);
2473
2474        if (unlikely(wc->byte_len < sizeof(*msg))) {
2475                rtrs_err(sess->clt, "Sess info response is malformed: size %d\n",
2476                          wc->byte_len);
2477                goto out;
2478        }
2479        ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr,
2480                                   iu->size, DMA_FROM_DEVICE);
2481        msg = iu->buf;
2482        if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_INFO_RSP)) {
2483                rtrs_err(sess->clt, "Sess info response is malformed: type %d\n",
2484                          le16_to_cpu(msg->type));
2485                goto out;
2486        }
2487        rx_sz  = sizeof(*msg);
2488        rx_sz += sizeof(msg->desc[0]) * le16_to_cpu(msg->sg_cnt);
2489        if (unlikely(wc->byte_len < rx_sz)) {
2490                rtrs_err(sess->clt, "Sess info response is malformed: size %d\n",
2491                          wc->byte_len);
2492                goto out;
2493        }
2494        err = process_info_rsp(sess, msg);
2495        if (unlikely(err))
2496                goto out;
2497
2498        err = post_recv_sess(sess);
2499        if (unlikely(err))
2500                goto out;
2501
2502        state = RTRS_CLT_CONNECTED;
2503
2504out:
2505        rtrs_clt_update_wc_stats(con);
2506        rtrs_iu_free(iu, sess->s.dev->ib_dev, 1);
2507        rtrs_clt_change_state_get_old(sess, state, NULL);
2508}
2509
2510static int rtrs_send_sess_info(struct rtrs_clt_sess *sess)
2511{
2512        struct rtrs_clt_con *usr_con = to_clt_con(sess->s.con[0]);
2513        struct rtrs_msg_info_req *msg;
2514        struct rtrs_iu *tx_iu, *rx_iu;
2515        size_t rx_sz;
2516        int err;
2517
2518        rx_sz  = sizeof(struct rtrs_msg_info_rsp);
2519        rx_sz += sizeof(struct rtrs_sg_desc) * sess->queue_depth;
2520
2521        tx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), GFP_KERNEL,
2522                               sess->s.dev->ib_dev, DMA_TO_DEVICE,
2523                               rtrs_clt_info_req_done);
2524        rx_iu = rtrs_iu_alloc(1, rx_sz, GFP_KERNEL, sess->s.dev->ib_dev,
2525                               DMA_FROM_DEVICE, rtrs_clt_info_rsp_done);
2526        if (unlikely(!tx_iu || !rx_iu)) {
2527                err = -ENOMEM;
2528                goto out;
2529        }
2530        /* Prepare for getting info response */
2531        err = rtrs_iu_post_recv(&usr_con->c, rx_iu);
2532        if (unlikely(err)) {
2533                rtrs_err(sess->clt, "rtrs_iu_post_recv(), err: %d\n", err);
2534                goto out;
2535        }
2536        rx_iu = NULL;
2537
2538        msg = tx_iu->buf;
2539        msg->type = cpu_to_le16(RTRS_MSG_INFO_REQ);
2540        memcpy(msg->sessname, sess->s.sessname, sizeof(msg->sessname));
2541
2542        ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr,
2543                                      tx_iu->size, DMA_TO_DEVICE);
2544
2545        /* Send info request */
2546        err = rtrs_iu_post_send(&usr_con->c, tx_iu, sizeof(*msg), NULL);
2547        if (unlikely(err)) {
2548                rtrs_err(sess->clt, "rtrs_iu_post_send(), err: %d\n", err);
2549                goto out;
2550        }
2551        tx_iu = NULL;
2552
2553        /* Wait for state change */
2554        wait_event_interruptible_timeout(sess->state_wq,
2555                                         sess->state != RTRS_CLT_CONNECTING,
2556                                         msecs_to_jiffies(
2557                                                 RTRS_CONNECT_TIMEOUT_MS));
2558        if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)) {
2559                if (READ_ONCE(sess->state) == RTRS_CLT_CONNECTING_ERR)
2560                        err = -ECONNRESET;
2561                else
2562                        err = -ETIMEDOUT;
2563        }
2564
2565out:
2566        if (tx_iu)
2567                rtrs_iu_free(tx_iu, sess->s.dev->ib_dev, 1);
2568        if (rx_iu)
2569                rtrs_iu_free(rx_iu, sess->s.dev->ib_dev, 1);
2570        if (unlikely(err))
2571                /* If we've never taken async path because of malloc problems */
2572                rtrs_clt_change_state_get_old(sess, RTRS_CLT_CONNECTING_ERR, NULL);
2573
2574        return err;
2575}
2576
2577/**
2578 * init_sess() - establishes all session connections and does handshake
2579 * @sess: client session.
2580 * In case of error full close or reconnect procedure should be taken,
2581 * because reconnect or close async works can be started.
2582 */
2583static int init_sess(struct rtrs_clt_sess *sess)
2584{
2585        int err;
2586        char str[NAME_MAX];
2587        struct rtrs_addr path = {
2588                .src = &sess->s.src_addr,
2589                .dst = &sess->s.dst_addr,
2590        };
2591
2592        rtrs_addr_to_str(&path, str, sizeof(str));
2593
2594        mutex_lock(&sess->init_mutex);
2595        err = init_conns(sess);
2596        if (err) {
2597                rtrs_err(sess->clt,
2598                         "init_conns() failed: err=%d path=%s [%s:%u]\n", err,
2599                         str, sess->hca_name, sess->hca_port);
2600                goto out;
2601        }
2602        err = rtrs_send_sess_info(sess);
2603        if (err) {
2604                rtrs_err(
2605                        sess->clt,
2606                        "rtrs_send_sess_info() failed: err=%d path=%s [%s:%u]\n",
2607                        err, str, sess->hca_name, sess->hca_port);
2608                goto out;
2609        }
2610        rtrs_clt_sess_up(sess);
2611out:
2612        mutex_unlock(&sess->init_mutex);
2613
2614        return err;
2615}
2616
2617static void rtrs_clt_reconnect_work(struct work_struct *work)
2618{
2619        struct rtrs_clt_sess *sess;
2620        struct rtrs_clt *clt;
2621        unsigned int delay_ms;
2622        int err;
2623
2624        sess = container_of(to_delayed_work(work), struct rtrs_clt_sess,
2625                            reconnect_dwork);
2626        clt = sess->clt;
2627
2628        if (READ_ONCE(sess->state) != RTRS_CLT_RECONNECTING)
2629                return;
2630
2631        if (sess->reconnect_attempts >= clt->max_reconnect_attempts) {
2632                /* Close a session completely if max attempts is reached */
2633                rtrs_clt_close_conns(sess, false);
2634                return;
2635        }
2636        sess->reconnect_attempts++;
2637
2638        /* Stop everything */
2639        rtrs_clt_stop_and_destroy_conns(sess);
2640        msleep(RTRS_RECONNECT_BACKOFF);
2641        if (rtrs_clt_change_state_get_old(sess, RTRS_CLT_CONNECTING, NULL)) {
2642                err = init_sess(sess);
2643                if (err)
2644                        goto reconnect_again;
2645        }
2646
2647        return;
2648
2649reconnect_again:
2650        if (rtrs_clt_change_state_get_old(sess, RTRS_CLT_RECONNECTING, NULL)) {
2651                sess->stats->reconnects.fail_cnt++;
2652                delay_ms = clt->reconnect_delay_sec * 1000;
2653                queue_delayed_work(rtrs_wq, &sess->reconnect_dwork,
2654                                   msecs_to_jiffies(delay_ms +
2655                                                    prandom_u32() %
2656                                                    RTRS_RECONNECT_SEED));
2657        }
2658}
2659
2660static void rtrs_clt_dev_release(struct device *dev)
2661{
2662        struct rtrs_clt *clt = container_of(dev, struct rtrs_clt, dev);
2663
2664        kfree(clt);
2665}
2666
2667static struct rtrs_clt *alloc_clt(const char *sessname, size_t paths_num,
2668                                  u16 port, size_t pdu_sz, void *priv,
2669                                  void  (*link_ev)(void *priv,
2670                                                   enum rtrs_clt_link_ev ev),
2671                                  unsigned int reconnect_delay_sec,
2672                                  unsigned int max_reconnect_attempts)
2673{
2674        struct rtrs_clt *clt;
2675        int err;
2676
2677        if (!paths_num || paths_num > MAX_PATHS_NUM)
2678                return ERR_PTR(-EINVAL);
2679
2680        if (strlen(sessname) >= sizeof(clt->sessname))
2681                return ERR_PTR(-EINVAL);
2682
2683        clt = kzalloc(sizeof(*clt), GFP_KERNEL);
2684        if (!clt)
2685                return ERR_PTR(-ENOMEM);
2686
2687        clt->pcpu_path = alloc_percpu(typeof(*clt->pcpu_path));
2688        if (!clt->pcpu_path) {
2689                kfree(clt);
2690                return ERR_PTR(-ENOMEM);
2691        }
2692
2693        uuid_gen(&clt->paths_uuid);
2694        INIT_LIST_HEAD_RCU(&clt->paths_list);
2695        clt->paths_num = paths_num;
2696        clt->paths_up = MAX_PATHS_NUM;
2697        clt->port = port;
2698        clt->pdu_sz = pdu_sz;
2699        clt->max_segments = RTRS_MAX_SEGMENTS;
2700        clt->reconnect_delay_sec = reconnect_delay_sec;
2701        clt->max_reconnect_attempts = max_reconnect_attempts;
2702        clt->priv = priv;
2703        clt->link_ev = link_ev;
2704        clt->mp_policy = MP_POLICY_MIN_INFLIGHT;
2705        strscpy(clt->sessname, sessname, sizeof(clt->sessname));
2706        init_waitqueue_head(&clt->permits_wait);
2707        mutex_init(&clt->paths_ev_mutex);
2708        mutex_init(&clt->paths_mutex);
2709
2710        clt->dev.class = rtrs_clt_dev_class;
2711        clt->dev.release = rtrs_clt_dev_release;
2712        err = dev_set_name(&clt->dev, "%s", sessname);
2713        if (err)
2714                goto err;
2715        /*
2716         * Suppress user space notification until
2717         * sysfs files are created
2718         */
2719        dev_set_uevent_suppress(&clt->dev, true);
2720        err = device_register(&clt->dev);
2721        if (err) {
2722                put_device(&clt->dev);
2723                goto err;
2724        }
2725
2726        clt->kobj_paths = kobject_create_and_add("paths", &clt->dev.kobj);
2727        if (!clt->kobj_paths) {
2728                err = -ENOMEM;
2729                goto err_dev;
2730        }
2731        err = rtrs_clt_create_sysfs_root_files(clt);
2732        if (err) {
2733                kobject_del(clt->kobj_paths);
2734                kobject_put(clt->kobj_paths);
2735                goto err_dev;
2736        }
2737        dev_set_uevent_suppress(&clt->dev, false);
2738        kobject_uevent(&clt->dev.kobj, KOBJ_ADD);
2739
2740        return clt;
2741err_dev:
2742        device_unregister(&clt->dev);
2743err:
2744        free_percpu(clt->pcpu_path);
2745        kfree(clt);
2746        return ERR_PTR(err);
2747}
2748
2749static void free_clt(struct rtrs_clt *clt)
2750{
2751        free_permits(clt);
2752        free_percpu(clt->pcpu_path);
2753        mutex_destroy(&clt->paths_ev_mutex);
2754        mutex_destroy(&clt->paths_mutex);
2755        /* release callback will free clt in last put */
2756        device_unregister(&clt->dev);
2757}
2758
2759/**
2760 * rtrs_clt_open() - Open a session to an RTRS server
2761 * @ops: holds the link event callback and the private pointer.
2762 * @sessname: name of the session
2763 * @paths: Paths to be established defined by their src and dst addresses
2764 * @paths_num: Number of elements in the @paths array
2765 * @port: port to be used by the RTRS session
2766 * @pdu_sz: Size of extra payload which can be accessed after permit allocation.
2767 * @reconnect_delay_sec: time between reconnect tries
2768 * @max_reconnect_attempts: Number of times to reconnect on error before giving
2769 *                          up, 0 for * disabled, -1 for forever
2770 * @nr_poll_queues: number of polling mode connection using IB_POLL_DIRECT flag
2771 *
2772 * Starts session establishment with the rtrs_server. The function can block
2773 * up to ~2000ms before it returns.
2774 *
2775 * Return a valid pointer on success otherwise PTR_ERR.
2776 */
2777struct rtrs_clt *rtrs_clt_open(struct rtrs_clt_ops *ops,
2778                                 const char *sessname,
2779                                 const struct rtrs_addr *paths,
2780                                 size_t paths_num, u16 port,
2781                                 size_t pdu_sz, u8 reconnect_delay_sec,
2782                                 s16 max_reconnect_attempts, u32 nr_poll_queues)
2783{
2784        struct rtrs_clt_sess *sess, *tmp;
2785        struct rtrs_clt *clt;
2786        int err, i;
2787
2788        clt = alloc_clt(sessname, paths_num, port, pdu_sz, ops->priv,
2789                        ops->link_ev,
2790                        reconnect_delay_sec,
2791                        max_reconnect_attempts);
2792        if (IS_ERR(clt)) {
2793                err = PTR_ERR(clt);
2794                goto out;
2795        }
2796        for (i = 0; i < paths_num; i++) {
2797                struct rtrs_clt_sess *sess;
2798
2799                sess = alloc_sess(clt, &paths[i], nr_cpu_ids,
2800                                  nr_poll_queues);
2801                if (IS_ERR(sess)) {
2802                        err = PTR_ERR(sess);
2803                        goto close_all_sess;
2804                }
2805                if (!i)
2806                        sess->for_new_clt = 1;
2807                list_add_tail_rcu(&sess->s.entry, &clt->paths_list);
2808
2809                err = init_sess(sess);
2810                if (err) {
2811                        list_del_rcu(&sess->s.entry);
2812                        rtrs_clt_close_conns(sess, true);
2813                        free_percpu(sess->stats->pcpu_stats);
2814                        kfree(sess->stats);
2815                        free_sess(sess);
2816                        goto close_all_sess;
2817                }
2818
2819                err = rtrs_clt_create_sess_files(sess);
2820                if (err) {
2821                        list_del_rcu(&sess->s.entry);
2822                        rtrs_clt_close_conns(sess, true);
2823                        free_percpu(sess->stats->pcpu_stats);
2824                        kfree(sess->stats);
2825                        free_sess(sess);
2826                        goto close_all_sess;
2827                }
2828        }
2829        err = alloc_permits(clt);
2830        if (err)
2831                goto close_all_sess;
2832
2833        return clt;
2834
2835close_all_sess:
2836        list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) {
2837                rtrs_clt_destroy_sess_files(sess, NULL);
2838                rtrs_clt_close_conns(sess, true);
2839                kobject_put(&sess->kobj);
2840        }
2841        rtrs_clt_destroy_sysfs_root(clt);
2842        free_clt(clt);
2843
2844out:
2845        return ERR_PTR(err);
2846}
2847EXPORT_SYMBOL(rtrs_clt_open);
2848
2849/**
2850 * rtrs_clt_close() - Close a session
2851 * @clt: Session handle. Session is freed upon return.
2852 */
2853void rtrs_clt_close(struct rtrs_clt *clt)
2854{
2855        struct rtrs_clt_sess *sess, *tmp;
2856
2857        /* Firstly forbid sysfs access */
2858        rtrs_clt_destroy_sysfs_root(clt);
2859
2860        /* Now it is safe to iterate over all paths without locks */
2861        list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) {
2862                rtrs_clt_close_conns(sess, true);
2863                rtrs_clt_destroy_sess_files(sess, NULL);
2864                kobject_put(&sess->kobj);
2865        }
2866        free_clt(clt);
2867}
2868EXPORT_SYMBOL(rtrs_clt_close);
2869
2870int rtrs_clt_reconnect_from_sysfs(struct rtrs_clt_sess *sess)
2871{
2872        enum rtrs_clt_state old_state;
2873        int err = -EBUSY;
2874        bool changed;
2875
2876        changed = rtrs_clt_change_state_get_old(sess, RTRS_CLT_RECONNECTING,
2877                                                 &old_state);
2878        if (changed) {
2879                sess->reconnect_attempts = 0;
2880                queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 0);
2881        }
2882        if (changed || old_state == RTRS_CLT_RECONNECTING) {
2883                /*
2884                 * flush_delayed_work() queues pending work for immediate
2885                 * execution, so do the flush if we have queued something
2886                 * right now or work is pending.
2887                 */
2888                flush_delayed_work(&sess->reconnect_dwork);
2889                err = (READ_ONCE(sess->state) ==
2890                       RTRS_CLT_CONNECTED ? 0 : -ENOTCONN);
2891        }
2892
2893        return err;
2894}
2895
2896int rtrs_clt_remove_path_from_sysfs(struct rtrs_clt_sess *sess,
2897                                     const struct attribute *sysfs_self)
2898{
2899        enum rtrs_clt_state old_state;
2900        bool changed;
2901
2902        /*
2903         * Continue stopping path till state was changed to DEAD or
2904         * state was observed as DEAD:
2905         * 1. State was changed to DEAD - we were fast and nobody
2906         *    invoked rtrs_clt_reconnect(), which can again start
2907         *    reconnecting.
2908         * 2. State was observed as DEAD - we have someone in parallel
2909         *    removing the path.
2910         */
2911        do {
2912                rtrs_clt_close_conns(sess, true);
2913                changed = rtrs_clt_change_state_get_old(sess,
2914                                                        RTRS_CLT_DEAD,
2915                                                        &old_state);
2916        } while (!changed && old_state != RTRS_CLT_DEAD);
2917
2918        if (likely(changed)) {
2919                rtrs_clt_remove_path_from_arr(sess);
2920                rtrs_clt_destroy_sess_files(sess, sysfs_self);
2921                kobject_put(&sess->kobj);
2922        }
2923
2924        return 0;
2925}
2926
2927void rtrs_clt_set_max_reconnect_attempts(struct rtrs_clt *clt, int value)
2928{
2929        clt->max_reconnect_attempts = (unsigned int)value;
2930}
2931
2932int rtrs_clt_get_max_reconnect_attempts(const struct rtrs_clt *clt)
2933{
2934        return (int)clt->max_reconnect_attempts;
2935}
2936
2937/**
2938 * rtrs_clt_request() - Request data transfer to/from server via RDMA.
2939 *
2940 * @dir:        READ/WRITE
2941 * @ops:        callback function to be called as confirmation, and the pointer.
2942 * @clt:        Session
2943 * @permit:     Preallocated permit
2944 * @vec:        Message that is sent to server together with the request.
2945 *              Sum of len of all @vec elements limited to <= IO_MSG_SIZE.
2946 *              Since the msg is copied internally it can be allocated on stack.
2947 * @nr:         Number of elements in @vec.
2948 * @data_len:   length of data sent to/from server
2949 * @sg:         Pages to be sent/received to/from server.
2950 * @sg_cnt:     Number of elements in the @sg
2951 *
2952 * Return:
2953 * 0:           Success
2954 * <0:          Error
2955 *
2956 * On dir=READ rtrs client will request a data transfer from Server to client.
2957 * The data that the server will respond with will be stored in @sg when
2958 * the user receives an %RTRS_CLT_RDMA_EV_RDMA_REQUEST_WRITE_COMPL event.
2959 * On dir=WRITE rtrs client will rdma write data in sg to server side.
2960 */
2961int rtrs_clt_request(int dir, struct rtrs_clt_req_ops *ops,
2962                     struct rtrs_clt *clt, struct rtrs_permit *permit,
2963                      const struct kvec *vec, size_t nr, size_t data_len,
2964                      struct scatterlist *sg, unsigned int sg_cnt)
2965{
2966        struct rtrs_clt_io_req *req;
2967        struct rtrs_clt_sess *sess;
2968
2969        enum dma_data_direction dma_dir;
2970        int err = -ECONNABORTED, i;
2971        size_t usr_len, hdr_len;
2972        struct path_it it;
2973
2974        /* Get kvec length */
2975        for (i = 0, usr_len = 0; i < nr; i++)
2976                usr_len += vec[i].iov_len;
2977
2978        if (dir == READ) {
2979                hdr_len = sizeof(struct rtrs_msg_rdma_read) +
2980                          sg_cnt * sizeof(struct rtrs_sg_desc);
2981                dma_dir = DMA_FROM_DEVICE;
2982        } else {
2983                hdr_len = sizeof(struct rtrs_msg_rdma_write);
2984                dma_dir = DMA_TO_DEVICE;
2985        }
2986
2987        rcu_read_lock();
2988        for (path_it_init(&it, clt);
2989             (sess = it.next_path(&it)) && it.i < it.clt->paths_num; it.i++) {
2990                if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED))
2991                        continue;
2992
2993                if (unlikely(usr_len + hdr_len > sess->max_hdr_size)) {
2994                        rtrs_wrn_rl(sess->clt,
2995                                     "%s request failed, user message size is %zu and header length %zu, but max size is %u\n",
2996                                     dir == READ ? "Read" : "Write",
2997                                     usr_len, hdr_len, sess->max_hdr_size);
2998                        err = -EMSGSIZE;
2999                        break;
3000                }
3001                req = rtrs_clt_get_req(sess, ops->conf_fn, permit, ops->priv,
3002                                       vec, usr_len, sg, sg_cnt, data_len,
3003                                       dma_dir);
3004                if (dir == READ)
3005                        err = rtrs_clt_read_req(req);
3006                else
3007                        err = rtrs_clt_write_req(req);
3008                if (unlikely(err)) {
3009                        req->in_use = false;
3010                        continue;
3011                }
3012                /* Success path */
3013                break;
3014        }
3015        path_it_deinit(&it);
3016        rcu_read_unlock();
3017
3018        return err;
3019}
3020EXPORT_SYMBOL(rtrs_clt_request);
3021
3022int rtrs_clt_rdma_cq_direct(struct rtrs_clt *clt, unsigned int index)
3023{
3024        /* If no path, return -1 for block layer not to try again */
3025        int cnt = -1;
3026        struct rtrs_con *con;
3027        struct rtrs_clt_sess *sess;
3028        struct path_it it;
3029
3030        rcu_read_lock();
3031        for (path_it_init(&it, clt);
3032             (sess = it.next_path(&it)) && it.i < it.clt->paths_num; it.i++) {
3033                if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)
3034                        continue;
3035
3036                con = sess->s.con[index + 1];
3037                cnt = ib_process_cq_direct(con->cq, -1);
3038                if (cnt)
3039                        break;
3040        }
3041        path_it_deinit(&it);
3042        rcu_read_unlock();
3043
3044        return cnt;
3045}
3046EXPORT_SYMBOL(rtrs_clt_rdma_cq_direct);
3047
3048/**
3049 * rtrs_clt_query() - queries RTRS session attributes
3050 *@clt: session pointer
3051 *@attr: query results for session attributes.
3052 * Returns:
3053 *    0 on success
3054 *    -ECOMM            no connection to the server
3055 */
3056int rtrs_clt_query(struct rtrs_clt *clt, struct rtrs_attrs *attr)
3057{
3058        if (!rtrs_clt_is_connected(clt))
3059                return -ECOMM;
3060
3061        attr->queue_depth      = clt->queue_depth;
3062        attr->max_segments     = clt->max_segments;
3063        /* Cap max_io_size to min of remote buffer size and the fr pages */
3064        attr->max_io_size = min_t(int, clt->max_io_size,
3065                                  clt->max_segments * SZ_4K);
3066
3067        return 0;
3068}
3069EXPORT_SYMBOL(rtrs_clt_query);
3070
3071int rtrs_clt_create_path_from_sysfs(struct rtrs_clt *clt,
3072                                     struct rtrs_addr *addr)
3073{
3074        struct rtrs_clt_sess *sess;
3075        int err;
3076
3077        sess = alloc_sess(clt, addr, nr_cpu_ids, 0);
3078        if (IS_ERR(sess))
3079                return PTR_ERR(sess);
3080
3081        /*
3082         * It is totally safe to add path in CONNECTING state: coming
3083         * IO will never grab it.  Also it is very important to add
3084         * path before init, since init fires LINK_CONNECTED event.
3085         */
3086        rtrs_clt_add_path_to_arr(sess);
3087
3088        err = init_sess(sess);
3089        if (err)
3090                goto close_sess;
3091
3092        err = rtrs_clt_create_sess_files(sess);
3093        if (err)
3094                goto close_sess;
3095
3096        return 0;
3097
3098close_sess:
3099        rtrs_clt_remove_path_from_arr(sess);
3100        rtrs_clt_close_conns(sess, true);
3101        free_percpu(sess->stats->pcpu_stats);
3102        kfree(sess->stats);
3103        free_sess(sess);
3104
3105        return err;
3106}
3107
3108static int rtrs_clt_ib_dev_init(struct rtrs_ib_dev *dev)
3109{
3110        if (!(dev->ib_dev->attrs.device_cap_flags &
3111              IB_DEVICE_MEM_MGT_EXTENSIONS)) {
3112                pr_err("Memory registrations not supported.\n");
3113                return -ENOTSUPP;
3114        }
3115
3116        return 0;
3117}
3118
3119static const struct rtrs_rdma_dev_pd_ops dev_pd_ops = {
3120        .init = rtrs_clt_ib_dev_init
3121};
3122
3123static int __init rtrs_client_init(void)
3124{
3125        rtrs_rdma_dev_pd_init(0, &dev_pd);
3126
3127        rtrs_clt_dev_class = class_create(THIS_MODULE, "rtrs-client");
3128        if (IS_ERR(rtrs_clt_dev_class)) {
3129                pr_err("Failed to create rtrs-client dev class\n");
3130                return PTR_ERR(rtrs_clt_dev_class);
3131        }
3132        rtrs_wq = alloc_workqueue("rtrs_client_wq", 0, 0);
3133        if (!rtrs_wq) {
3134                class_destroy(rtrs_clt_dev_class);
3135                return -ENOMEM;
3136        }
3137
3138        return 0;
3139}
3140
3141static void __exit rtrs_client_exit(void)
3142{
3143        destroy_workqueue(rtrs_wq);
3144        class_destroy(rtrs_clt_dev_class);
3145        rtrs_rdma_dev_pd_deinit(&dev_pd);
3146}
3147
3148module_init(rtrs_client_init);
3149module_exit(rtrs_client_exit);
3150