linux/drivers/infiniband/ulp/rtrs/rtrs-srv.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3 * RDMA Transport Layer
   4 *
   5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
   6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
   7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
   8 */
   9
  10#undef pr_fmt
  11#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
  12
  13#include <linux/module.h>
  14#include <linux/mempool.h>
  15
  16#include "rtrs-srv.h"
  17#include "rtrs-log.h"
  18#include <rdma/ib_cm.h>
  19#include <rdma/ib_verbs.h>
  20
  21MODULE_DESCRIPTION("RDMA Transport Server");
  22MODULE_LICENSE("GPL");
  23
  24/* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */
  25#define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
  26#define DEFAULT_SESS_QUEUE_DEPTH 512
  27#define MAX_HDR_SIZE PAGE_SIZE
  28
  29/* We guarantee to serve 10 paths at least */
  30#define CHUNK_POOL_SZ 10
  31
  32static struct rtrs_rdma_dev_pd dev_pd;
  33static mempool_t *chunk_pool;
  34struct class *rtrs_dev_class;
  35static struct rtrs_srv_ib_ctx ib_ctx;
  36
  37static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE;
  38static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
  39
  40static bool always_invalidate = true;
  41module_param(always_invalidate, bool, 0444);
  42MODULE_PARM_DESC(always_invalidate,
  43                 "Invalidate memory registration for contiguous memory regions before accessing.");
  44
  45module_param_named(max_chunk_size, max_chunk_size, int, 0444);
  46MODULE_PARM_DESC(max_chunk_size,
  47                 "Max size for each IO request, when change the unit is in byte (default: "
  48                 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
  49
  50module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
  51MODULE_PARM_DESC(sess_queue_depth,
  52                 "Number of buffers for pending I/O requests to allocate per session. Maximum: "
  53                 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
  54                 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
  55
  56static cpumask_t cq_affinity_mask = { CPU_BITS_ALL };
  57
  58static struct workqueue_struct *rtrs_wq;
  59
  60static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c)
  61{
  62        return container_of(c, struct rtrs_srv_con, c);
  63}
  64
  65static inline struct rtrs_srv_sess *to_srv_sess(struct rtrs_sess *s)
  66{
  67        return container_of(s, struct rtrs_srv_sess, s);
  68}
  69
  70static bool __rtrs_srv_change_state(struct rtrs_srv_sess *sess,
  71                                     enum rtrs_srv_state new_state)
  72{
  73        enum rtrs_srv_state old_state;
  74        bool changed = false;
  75
  76        lockdep_assert_held(&sess->state_lock);
  77        old_state = sess->state;
  78        switch (new_state) {
  79        case RTRS_SRV_CONNECTED:
  80                switch (old_state) {
  81                case RTRS_SRV_CONNECTING:
  82                        changed = true;
  83                        fallthrough;
  84                default:
  85                        break;
  86                }
  87                break;
  88        case RTRS_SRV_CLOSING:
  89                switch (old_state) {
  90                case RTRS_SRV_CONNECTING:
  91                case RTRS_SRV_CONNECTED:
  92                        changed = true;
  93                        fallthrough;
  94                default:
  95                        break;
  96                }
  97                break;
  98        case RTRS_SRV_CLOSED:
  99                switch (old_state) {
 100                case RTRS_SRV_CLOSING:
 101                        changed = true;
 102                        fallthrough;
 103                default:
 104                        break;
 105                }
 106                break;
 107        default:
 108                break;
 109        }
 110        if (changed)
 111                sess->state = new_state;
 112
 113        return changed;
 114}
 115
 116static bool rtrs_srv_change_state_get_old(struct rtrs_srv_sess *sess,
 117                                           enum rtrs_srv_state new_state,
 118                                           enum rtrs_srv_state *old_state)
 119{
 120        bool changed;
 121
 122        spin_lock_irq(&sess->state_lock);
 123        *old_state = sess->state;
 124        changed = __rtrs_srv_change_state(sess, new_state);
 125        spin_unlock_irq(&sess->state_lock);
 126
 127        return changed;
 128}
 129
 130static bool rtrs_srv_change_state(struct rtrs_srv_sess *sess,
 131                                   enum rtrs_srv_state new_state)
 132{
 133        enum rtrs_srv_state old_state;
 134
 135        return rtrs_srv_change_state_get_old(sess, new_state, &old_state);
 136}
 137
 138static void free_id(struct rtrs_srv_op *id)
 139{
 140        if (!id)
 141                return;
 142        kfree(id);
 143}
 144
 145static void rtrs_srv_free_ops_ids(struct rtrs_srv_sess *sess)
 146{
 147        struct rtrs_srv *srv = sess->srv;
 148        int i;
 149
 150        WARN_ON(atomic_read(&sess->ids_inflight));
 151        if (sess->ops_ids) {
 152                for (i = 0; i < srv->queue_depth; i++)
 153                        free_id(sess->ops_ids[i]);
 154                kfree(sess->ops_ids);
 155                sess->ops_ids = NULL;
 156        }
 157}
 158
 159static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc);
 160
 161static struct ib_cqe io_comp_cqe = {
 162        .done = rtrs_srv_rdma_done
 163};
 164
 165static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_sess *sess)
 166{
 167        struct rtrs_srv *srv = sess->srv;
 168        struct rtrs_srv_op *id;
 169        int i;
 170
 171        sess->ops_ids = kcalloc(srv->queue_depth, sizeof(*sess->ops_ids),
 172                                GFP_KERNEL);
 173        if (!sess->ops_ids)
 174                goto err;
 175
 176        for (i = 0; i < srv->queue_depth; ++i) {
 177                id = kzalloc(sizeof(*id), GFP_KERNEL);
 178                if (!id)
 179                        goto err;
 180
 181                sess->ops_ids[i] = id;
 182        }
 183        init_waitqueue_head(&sess->ids_waitq);
 184        atomic_set(&sess->ids_inflight, 0);
 185
 186        return 0;
 187
 188err:
 189        rtrs_srv_free_ops_ids(sess);
 190        return -ENOMEM;
 191}
 192
 193static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_sess *sess)
 194{
 195        atomic_inc(&sess->ids_inflight);
 196}
 197
 198static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_sess *sess)
 199{
 200        if (atomic_dec_and_test(&sess->ids_inflight))
 201                wake_up(&sess->ids_waitq);
 202}
 203
 204static void rtrs_srv_wait_ops_ids(struct rtrs_srv_sess *sess)
 205{
 206        wait_event(sess->ids_waitq, !atomic_read(&sess->ids_inflight));
 207}
 208
 209
 210static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc)
 211{
 212        struct rtrs_srv_con *con = cq->cq_context;
 213        struct rtrs_sess *s = con->c.sess;
 214        struct rtrs_srv_sess *sess = to_srv_sess(s);
 215
 216        if (unlikely(wc->status != IB_WC_SUCCESS)) {
 217                rtrs_err(s, "REG MR failed: %s\n",
 218                          ib_wc_status_msg(wc->status));
 219                close_sess(sess);
 220                return;
 221        }
 222}
 223
 224static struct ib_cqe local_reg_cqe = {
 225        .done = rtrs_srv_reg_mr_done
 226};
 227
 228static int rdma_write_sg(struct rtrs_srv_op *id)
 229{
 230        struct rtrs_sess *s = id->con->c.sess;
 231        struct rtrs_srv_sess *sess = to_srv_sess(s);
 232        dma_addr_t dma_addr = sess->dma_addr[id->msg_id];
 233        struct rtrs_srv_mr *srv_mr;
 234        struct rtrs_srv *srv = sess->srv;
 235        struct ib_send_wr inv_wr, imm_wr;
 236        struct ib_rdma_wr *wr = NULL;
 237        enum ib_send_flags flags;
 238        size_t sg_cnt;
 239        int err, offset;
 240        bool need_inval;
 241        u32 rkey = 0;
 242        struct ib_reg_wr rwr;
 243        struct ib_sge *plist;
 244        struct ib_sge list;
 245
 246        sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt);
 247        need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F;
 248        if (unlikely(sg_cnt != 1))
 249                return -EINVAL;
 250
 251        offset = 0;
 252
 253        wr              = &id->tx_wr;
 254        plist           = &id->tx_sg;
 255        plist->addr     = dma_addr + offset;
 256        plist->length   = le32_to_cpu(id->rd_msg->desc[0].len);
 257
 258        /* WR will fail with length error
 259         * if this is 0
 260         */
 261        if (unlikely(plist->length == 0)) {
 262                rtrs_err(s, "Invalid RDMA-Write sg list length 0\n");
 263                return -EINVAL;
 264        }
 265
 266        plist->lkey = sess->s.dev->ib_pd->local_dma_lkey;
 267        offset += plist->length;
 268
 269        wr->wr.sg_list  = plist;
 270        wr->wr.num_sge  = 1;
 271        wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr);
 272        wr->rkey        = le32_to_cpu(id->rd_msg->desc[0].key);
 273        if (rkey == 0)
 274                rkey = wr->rkey;
 275        else
 276                /* Only one key is actually used */
 277                WARN_ON_ONCE(rkey != wr->rkey);
 278
 279        wr->wr.opcode = IB_WR_RDMA_WRITE;
 280        wr->wr.ex.imm_data = 0;
 281        wr->wr.send_flags  = 0;
 282
 283        if (need_inval && always_invalidate) {
 284                wr->wr.next = &rwr.wr;
 285                rwr.wr.next = &inv_wr;
 286                inv_wr.next = &imm_wr;
 287        } else if (always_invalidate) {
 288                wr->wr.next = &rwr.wr;
 289                rwr.wr.next = &imm_wr;
 290        } else if (need_inval) {
 291                wr->wr.next = &inv_wr;
 292                inv_wr.next = &imm_wr;
 293        } else {
 294                wr->wr.next = &imm_wr;
 295        }
 296        /*
 297         * From time to time we have to post signaled sends,
 298         * or send queue will fill up and only QP reset can help.
 299         */
 300        flags = (atomic_inc_return(&id->con->wr_cnt) % srv->queue_depth) ?
 301                0 : IB_SEND_SIGNALED;
 302
 303        if (need_inval) {
 304                inv_wr.sg_list = NULL;
 305                inv_wr.num_sge = 0;
 306                inv_wr.opcode = IB_WR_SEND_WITH_INV;
 307                inv_wr.send_flags = 0;
 308                inv_wr.ex.invalidate_rkey = rkey;
 309        }
 310
 311        imm_wr.next = NULL;
 312        if (always_invalidate) {
 313                struct rtrs_msg_rkey_rsp *msg;
 314
 315                srv_mr = &sess->mrs[id->msg_id];
 316                rwr.wr.opcode = IB_WR_REG_MR;
 317                rwr.wr.num_sge = 0;
 318                rwr.mr = srv_mr->mr;
 319                rwr.wr.send_flags = 0;
 320                rwr.key = srv_mr->mr->rkey;
 321                rwr.access = (IB_ACCESS_LOCAL_WRITE |
 322                              IB_ACCESS_REMOTE_WRITE);
 323                msg = srv_mr->iu->buf;
 324                msg->buf_id = cpu_to_le16(id->msg_id);
 325                msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
 326                msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
 327
 328                list.addr   = srv_mr->iu->dma_addr;
 329                list.length = sizeof(*msg);
 330                list.lkey   = sess->s.dev->ib_pd->local_dma_lkey;
 331                imm_wr.sg_list = &list;
 332                imm_wr.num_sge = 1;
 333                imm_wr.opcode = IB_WR_SEND_WITH_IMM;
 334                ib_dma_sync_single_for_device(sess->s.dev->ib_dev,
 335                                              srv_mr->iu->dma_addr,
 336                                              srv_mr->iu->size, DMA_TO_DEVICE);
 337        } else {
 338                imm_wr.sg_list = NULL;
 339                imm_wr.num_sge = 0;
 340                imm_wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
 341        }
 342        imm_wr.send_flags = flags;
 343        imm_wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id,
 344                                                             0, need_inval));
 345
 346        imm_wr.wr_cqe   = &io_comp_cqe;
 347        ib_dma_sync_single_for_device(sess->s.dev->ib_dev, dma_addr,
 348                                      offset, DMA_BIDIRECTIONAL);
 349
 350        err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL);
 351        if (unlikely(err))
 352                rtrs_err(s,
 353                          "Posting RDMA-Write-Request to QP failed, err: %d\n",
 354                          err);
 355
 356        return err;
 357}
 358
 359/**
 360 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE
 361 *                      requests or on successful WRITE request.
 362 * @con:        the connection to send back result
 363 * @id:         the id associated with the IO
 364 * @errno:      the error number of the IO.
 365 *
 366 * Return 0 on success, errno otherwise.
 367 */
 368static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id,
 369                            int errno)
 370{
 371        struct rtrs_sess *s = con->c.sess;
 372        struct rtrs_srv_sess *sess = to_srv_sess(s);
 373        struct ib_send_wr inv_wr, imm_wr, *wr = NULL;
 374        struct ib_reg_wr rwr;
 375        struct rtrs_srv *srv = sess->srv;
 376        struct rtrs_srv_mr *srv_mr;
 377        bool need_inval = false;
 378        enum ib_send_flags flags;
 379        u32 imm;
 380        int err;
 381
 382        if (id->dir == READ) {
 383                struct rtrs_msg_rdma_read *rd_msg = id->rd_msg;
 384                size_t sg_cnt;
 385
 386                need_inval = le16_to_cpu(rd_msg->flags) &
 387                                RTRS_MSG_NEED_INVAL_F;
 388                sg_cnt = le16_to_cpu(rd_msg->sg_cnt);
 389
 390                if (need_inval) {
 391                        if (likely(sg_cnt)) {
 392                                inv_wr.sg_list = NULL;
 393                                inv_wr.num_sge = 0;
 394                                inv_wr.opcode = IB_WR_SEND_WITH_INV;
 395                                inv_wr.send_flags = 0;
 396                                /* Only one key is actually used */
 397                                inv_wr.ex.invalidate_rkey =
 398                                        le32_to_cpu(rd_msg->desc[0].key);
 399                        } else {
 400                                WARN_ON_ONCE(1);
 401                                need_inval = false;
 402                        }
 403                }
 404        }
 405
 406        if (need_inval && always_invalidate) {
 407                wr = &inv_wr;
 408                inv_wr.next = &rwr.wr;
 409                rwr.wr.next = &imm_wr;
 410        } else if (always_invalidate) {
 411                wr = &rwr.wr;
 412                rwr.wr.next = &imm_wr;
 413        } else if (need_inval) {
 414                wr = &inv_wr;
 415                inv_wr.next = &imm_wr;
 416        } else {
 417                wr = &imm_wr;
 418        }
 419        /*
 420         * From time to time we have to post signalled sends,
 421         * or send queue will fill up and only QP reset can help.
 422         */
 423        flags = (atomic_inc_return(&con->wr_cnt) % srv->queue_depth) ?
 424                0 : IB_SEND_SIGNALED;
 425        imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval);
 426        imm_wr.next = NULL;
 427        if (always_invalidate) {
 428                struct ib_sge list;
 429                struct rtrs_msg_rkey_rsp *msg;
 430
 431                srv_mr = &sess->mrs[id->msg_id];
 432                rwr.wr.next = &imm_wr;
 433                rwr.wr.opcode = IB_WR_REG_MR;
 434                rwr.wr.num_sge = 0;
 435                rwr.wr.send_flags = 0;
 436                rwr.mr = srv_mr->mr;
 437                rwr.key = srv_mr->mr->rkey;
 438                rwr.access = (IB_ACCESS_LOCAL_WRITE |
 439                              IB_ACCESS_REMOTE_WRITE);
 440                msg = srv_mr->iu->buf;
 441                msg->buf_id = cpu_to_le16(id->msg_id);
 442                msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
 443                msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
 444
 445                list.addr   = srv_mr->iu->dma_addr;
 446                list.length = sizeof(*msg);
 447                list.lkey   = sess->s.dev->ib_pd->local_dma_lkey;
 448                imm_wr.sg_list = &list;
 449                imm_wr.num_sge = 1;
 450                imm_wr.opcode = IB_WR_SEND_WITH_IMM;
 451                ib_dma_sync_single_for_device(sess->s.dev->ib_dev,
 452                                              srv_mr->iu->dma_addr,
 453                                              srv_mr->iu->size, DMA_TO_DEVICE);
 454        } else {
 455                imm_wr.sg_list = NULL;
 456                imm_wr.num_sge = 0;
 457                imm_wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
 458        }
 459        imm_wr.send_flags = flags;
 460        imm_wr.wr_cqe   = &io_comp_cqe;
 461
 462        imm_wr.ex.imm_data = cpu_to_be32(imm);
 463
 464        err = ib_post_send(id->con->c.qp, wr, NULL);
 465        if (unlikely(err))
 466                rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n",
 467                             err);
 468
 469        return err;
 470}
 471
 472void close_sess(struct rtrs_srv_sess *sess)
 473{
 474        enum rtrs_srv_state old_state;
 475
 476        if (rtrs_srv_change_state_get_old(sess, RTRS_SRV_CLOSING,
 477                                           &old_state))
 478                queue_work(rtrs_wq, &sess->close_work);
 479        WARN_ON(sess->state != RTRS_SRV_CLOSING);
 480}
 481
 482static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state)
 483{
 484        switch (state) {
 485        case RTRS_SRV_CONNECTING:
 486                return "RTRS_SRV_CONNECTING";
 487        case RTRS_SRV_CONNECTED:
 488                return "RTRS_SRV_CONNECTED";
 489        case RTRS_SRV_CLOSING:
 490                return "RTRS_SRV_CLOSING";
 491        case RTRS_SRV_CLOSED:
 492                return "RTRS_SRV_CLOSED";
 493        default:
 494                return "UNKNOWN";
 495        }
 496}
 497
 498/**
 499 * rtrs_srv_resp_rdma() - Finish an RDMA request
 500 *
 501 * @id:         Internal RTRS operation identifier
 502 * @status:     Response Code sent to the other side for this operation.
 503 *              0 = success, <=0 error
 504 * Context: any
 505 *
 506 * Finish a RDMA operation. A message is sent to the client and the
 507 * corresponding memory areas will be released.
 508 */
 509bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status)
 510{
 511        struct rtrs_srv_sess *sess;
 512        struct rtrs_srv_con *con;
 513        struct rtrs_sess *s;
 514        int err;
 515
 516        if (WARN_ON(!id))
 517                return true;
 518
 519        con = id->con;
 520        s = con->c.sess;
 521        sess = to_srv_sess(s);
 522
 523        id->status = status;
 524
 525        if (unlikely(sess->state != RTRS_SRV_CONNECTED)) {
 526                rtrs_err_rl(s,
 527                             "Sending I/O response failed,  session is disconnected, sess state %s\n",
 528                             rtrs_srv_state_str(sess->state));
 529                goto out;
 530        }
 531        if (always_invalidate) {
 532                struct rtrs_srv_mr *mr = &sess->mrs[id->msg_id];
 533
 534                ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey));
 535        }
 536        if (unlikely(atomic_sub_return(1,
 537                                       &con->sq_wr_avail) < 0)) {
 538                pr_err("IB send queue full\n");
 539                atomic_add(1, &con->sq_wr_avail);
 540                spin_lock(&con->rsp_wr_wait_lock);
 541                list_add_tail(&id->wait_list, &con->rsp_wr_wait_list);
 542                spin_unlock(&con->rsp_wr_wait_lock);
 543                return false;
 544        }
 545
 546        if (status || id->dir == WRITE || !id->rd_msg->sg_cnt)
 547                err = send_io_resp_imm(con, id, status);
 548        else
 549                err = rdma_write_sg(id);
 550
 551        if (unlikely(err)) {
 552                rtrs_err_rl(s, "IO response failed: %d\n", err);
 553                close_sess(sess);
 554        }
 555out:
 556        rtrs_srv_put_ops_ids(sess);
 557        return true;
 558}
 559EXPORT_SYMBOL(rtrs_srv_resp_rdma);
 560
 561/**
 562 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv.
 563 * @srv:        Session pointer
 564 * @priv:       The private pointer that is associated with the session.
 565 */
 566void rtrs_srv_set_sess_priv(struct rtrs_srv *srv, void *priv)
 567{
 568        srv->priv = priv;
 569}
 570EXPORT_SYMBOL(rtrs_srv_set_sess_priv);
 571
 572static void unmap_cont_bufs(struct rtrs_srv_sess *sess)
 573{
 574        int i;
 575
 576        for (i = 0; i < sess->mrs_num; i++) {
 577                struct rtrs_srv_mr *srv_mr;
 578
 579                srv_mr = &sess->mrs[i];
 580                rtrs_iu_free(srv_mr->iu, DMA_TO_DEVICE,
 581                              sess->s.dev->ib_dev, 1);
 582                ib_dereg_mr(srv_mr->mr);
 583                ib_dma_unmap_sg(sess->s.dev->ib_dev, srv_mr->sgt.sgl,
 584                                srv_mr->sgt.nents, DMA_BIDIRECTIONAL);
 585                sg_free_table(&srv_mr->sgt);
 586        }
 587        kfree(sess->mrs);
 588}
 589
 590static int map_cont_bufs(struct rtrs_srv_sess *sess)
 591{
 592        struct rtrs_srv *srv = sess->srv;
 593        struct rtrs_sess *ss = &sess->s;
 594        int i, mri, err, mrs_num;
 595        unsigned int chunk_bits;
 596        int chunks_per_mr = 1;
 597
 598        /*
 599         * Here we map queue_depth chunks to MR.  Firstly we have to
 600         * figure out how many chunks can we map per MR.
 601         */
 602        if (always_invalidate) {
 603                /*
 604                 * in order to do invalidate for each chunks of memory, we needs
 605                 * more memory regions.
 606                 */
 607                mrs_num = srv->queue_depth;
 608        } else {
 609                chunks_per_mr =
 610                        sess->s.dev->ib_dev->attrs.max_fast_reg_page_list_len;
 611                mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr);
 612                chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num);
 613        }
 614
 615        sess->mrs = kcalloc(mrs_num, sizeof(*sess->mrs), GFP_KERNEL);
 616        if (!sess->mrs)
 617                return -ENOMEM;
 618
 619        sess->mrs_num = mrs_num;
 620
 621        for (mri = 0; mri < mrs_num; mri++) {
 622                struct rtrs_srv_mr *srv_mr = &sess->mrs[mri];
 623                struct sg_table *sgt = &srv_mr->sgt;
 624                struct scatterlist *s;
 625                struct ib_mr *mr;
 626                int nr, chunks;
 627
 628                chunks = chunks_per_mr * mri;
 629                if (!always_invalidate)
 630                        chunks_per_mr = min_t(int, chunks_per_mr,
 631                                              srv->queue_depth - chunks);
 632
 633                err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL);
 634                if (err)
 635                        goto err;
 636
 637                for_each_sg(sgt->sgl, s, chunks_per_mr, i)
 638                        sg_set_page(s, srv->chunks[chunks + i],
 639                                    max_chunk_size, 0);
 640
 641                nr = ib_dma_map_sg(sess->s.dev->ib_dev, sgt->sgl,
 642                                   sgt->nents, DMA_BIDIRECTIONAL);
 643                if (nr < sgt->nents) {
 644                        err = nr < 0 ? nr : -EINVAL;
 645                        goto free_sg;
 646                }
 647                mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG,
 648                                 sgt->nents);
 649                if (IS_ERR(mr)) {
 650                        err = PTR_ERR(mr);
 651                        goto unmap_sg;
 652                }
 653                nr = ib_map_mr_sg(mr, sgt->sgl, sgt->nents,
 654                                  NULL, max_chunk_size);
 655                if (nr < 0 || nr < sgt->nents) {
 656                        err = nr < 0 ? nr : -EINVAL;
 657                        goto dereg_mr;
 658                }
 659
 660                if (always_invalidate) {
 661                        srv_mr->iu = rtrs_iu_alloc(1,
 662                                        sizeof(struct rtrs_msg_rkey_rsp),
 663                                        GFP_KERNEL, sess->s.dev->ib_dev,
 664                                        DMA_TO_DEVICE, rtrs_srv_rdma_done);
 665                        if (!srv_mr->iu) {
 666                                err = -ENOMEM;
 667                                rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err);
 668                                goto free_iu;
 669                        }
 670                }
 671                /* Eventually dma addr for each chunk can be cached */
 672                for_each_sg(sgt->sgl, s, sgt->orig_nents, i)
 673                        sess->dma_addr[chunks + i] = sg_dma_address(s);
 674
 675                ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey));
 676                srv_mr->mr = mr;
 677
 678                continue;
 679err:
 680                while (mri--) {
 681                        srv_mr = &sess->mrs[mri];
 682                        sgt = &srv_mr->sgt;
 683                        mr = srv_mr->mr;
 684free_iu:
 685                        rtrs_iu_free(srv_mr->iu, DMA_TO_DEVICE,
 686                                      sess->s.dev->ib_dev, 1);
 687dereg_mr:
 688                        ib_dereg_mr(mr);
 689unmap_sg:
 690                        ib_dma_unmap_sg(sess->s.dev->ib_dev, sgt->sgl,
 691                                        sgt->nents, DMA_BIDIRECTIONAL);
 692free_sg:
 693                        sg_free_table(sgt);
 694                }
 695                kfree(sess->mrs);
 696
 697                return err;
 698        }
 699
 700        chunk_bits = ilog2(srv->queue_depth - 1) + 1;
 701        sess->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits);
 702
 703        return 0;
 704}
 705
 706static void rtrs_srv_hb_err_handler(struct rtrs_con *c)
 707{
 708        close_sess(to_srv_sess(c->sess));
 709}
 710
 711static void rtrs_srv_init_hb(struct rtrs_srv_sess *sess)
 712{
 713        rtrs_init_hb(&sess->s, &io_comp_cqe,
 714                      RTRS_HB_INTERVAL_MS,
 715                      RTRS_HB_MISSED_MAX,
 716                      rtrs_srv_hb_err_handler,
 717                      rtrs_wq);
 718}
 719
 720static void rtrs_srv_start_hb(struct rtrs_srv_sess *sess)
 721{
 722        rtrs_start_hb(&sess->s);
 723}
 724
 725static void rtrs_srv_stop_hb(struct rtrs_srv_sess *sess)
 726{
 727        rtrs_stop_hb(&sess->s);
 728}
 729
 730static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc)
 731{
 732        struct rtrs_srv_con *con = cq->cq_context;
 733        struct rtrs_sess *s = con->c.sess;
 734        struct rtrs_srv_sess *sess = to_srv_sess(s);
 735        struct rtrs_iu *iu;
 736
 737        iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
 738        rtrs_iu_free(iu, DMA_TO_DEVICE, sess->s.dev->ib_dev, 1);
 739
 740        if (unlikely(wc->status != IB_WC_SUCCESS)) {
 741                rtrs_err(s, "Sess info response send failed: %s\n",
 742                          ib_wc_status_msg(wc->status));
 743                close_sess(sess);
 744                return;
 745        }
 746        WARN_ON(wc->opcode != IB_WC_SEND);
 747}
 748
 749static void rtrs_srv_sess_up(struct rtrs_srv_sess *sess)
 750{
 751        struct rtrs_srv *srv = sess->srv;
 752        struct rtrs_srv_ctx *ctx = srv->ctx;
 753        int up;
 754
 755        mutex_lock(&srv->paths_ev_mutex);
 756        up = ++srv->paths_up;
 757        if (up == 1)
 758                ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL);
 759        mutex_unlock(&srv->paths_ev_mutex);
 760
 761        /* Mark session as established */
 762        sess->established = true;
 763}
 764
 765static void rtrs_srv_sess_down(struct rtrs_srv_sess *sess)
 766{
 767        struct rtrs_srv *srv = sess->srv;
 768        struct rtrs_srv_ctx *ctx = srv->ctx;
 769
 770        if (!sess->established)
 771                return;
 772
 773        sess->established = false;
 774        mutex_lock(&srv->paths_ev_mutex);
 775        WARN_ON(!srv->paths_up);
 776        if (--srv->paths_up == 0)
 777                ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv);
 778        mutex_unlock(&srv->paths_ev_mutex);
 779}
 780
 781static int post_recv_sess(struct rtrs_srv_sess *sess);
 782
 783static int process_info_req(struct rtrs_srv_con *con,
 784                            struct rtrs_msg_info_req *msg)
 785{
 786        struct rtrs_sess *s = con->c.sess;
 787        struct rtrs_srv_sess *sess = to_srv_sess(s);
 788        struct ib_send_wr *reg_wr = NULL;
 789        struct rtrs_msg_info_rsp *rsp;
 790        struct rtrs_iu *tx_iu;
 791        struct ib_reg_wr *rwr;
 792        int mri, err;
 793        size_t tx_sz;
 794
 795        err = post_recv_sess(sess);
 796        if (unlikely(err)) {
 797                rtrs_err(s, "post_recv_sess(), err: %d\n", err);
 798                return err;
 799        }
 800        rwr = kcalloc(sess->mrs_num, sizeof(*rwr), GFP_KERNEL);
 801        if (unlikely(!rwr))
 802                return -ENOMEM;
 803        strlcpy(sess->s.sessname, msg->sessname, sizeof(sess->s.sessname));
 804
 805        tx_sz  = sizeof(*rsp);
 806        tx_sz += sizeof(rsp->desc[0]) * sess->mrs_num;
 807        tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, sess->s.dev->ib_dev,
 808                               DMA_TO_DEVICE, rtrs_srv_info_rsp_done);
 809        if (unlikely(!tx_iu)) {
 810                err = -ENOMEM;
 811                goto rwr_free;
 812        }
 813
 814        rsp = tx_iu->buf;
 815        rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP);
 816        rsp->sg_cnt = cpu_to_le16(sess->mrs_num);
 817
 818        for (mri = 0; mri < sess->mrs_num; mri++) {
 819                struct ib_mr *mr = sess->mrs[mri].mr;
 820
 821                rsp->desc[mri].addr = cpu_to_le64(mr->iova);
 822                rsp->desc[mri].key  = cpu_to_le32(mr->rkey);
 823                rsp->desc[mri].len  = cpu_to_le32(mr->length);
 824
 825                /*
 826                 * Fill in reg MR request and chain them *backwards*
 827                 */
 828                rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL;
 829                rwr[mri].wr.opcode = IB_WR_REG_MR;
 830                rwr[mri].wr.wr_cqe = &local_reg_cqe;
 831                rwr[mri].wr.num_sge = 0;
 832                rwr[mri].wr.send_flags = mri ? 0 : IB_SEND_SIGNALED;
 833                rwr[mri].mr = mr;
 834                rwr[mri].key = mr->rkey;
 835                rwr[mri].access = (IB_ACCESS_LOCAL_WRITE |
 836                                   IB_ACCESS_REMOTE_WRITE);
 837                reg_wr = &rwr[mri].wr;
 838        }
 839
 840        err = rtrs_srv_create_sess_files(sess);
 841        if (unlikely(err))
 842                goto iu_free;
 843        kobject_get(&sess->kobj);
 844        get_device(&sess->srv->dev);
 845        rtrs_srv_change_state(sess, RTRS_SRV_CONNECTED);
 846        rtrs_srv_start_hb(sess);
 847
 848        /*
 849         * We do not account number of established connections at the current
 850         * moment, we rely on the client, which should send info request when
 851         * all connections are successfully established.  Thus, simply notify
 852         * listener with a proper event if we are the first path.
 853         */
 854        rtrs_srv_sess_up(sess);
 855
 856        ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr,
 857                                      tx_iu->size, DMA_TO_DEVICE);
 858
 859        /* Send info response */
 860        err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr);
 861        if (unlikely(err)) {
 862                rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err);
 863iu_free:
 864                rtrs_iu_free(tx_iu, DMA_TO_DEVICE, sess->s.dev->ib_dev, 1);
 865        }
 866rwr_free:
 867        kfree(rwr);
 868
 869        return err;
 870}
 871
 872static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc)
 873{
 874        struct rtrs_srv_con *con = cq->cq_context;
 875        struct rtrs_sess *s = con->c.sess;
 876        struct rtrs_srv_sess *sess = to_srv_sess(s);
 877        struct rtrs_msg_info_req *msg;
 878        struct rtrs_iu *iu;
 879        int err;
 880
 881        WARN_ON(con->c.cid);
 882
 883        iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
 884        if (unlikely(wc->status != IB_WC_SUCCESS)) {
 885                rtrs_err(s, "Sess info request receive failed: %s\n",
 886                          ib_wc_status_msg(wc->status));
 887                goto close;
 888        }
 889        WARN_ON(wc->opcode != IB_WC_RECV);
 890
 891        if (unlikely(wc->byte_len < sizeof(*msg))) {
 892                rtrs_err(s, "Sess info request is malformed: size %d\n",
 893                          wc->byte_len);
 894                goto close;
 895        }
 896        ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr,
 897                                   iu->size, DMA_FROM_DEVICE);
 898        msg = iu->buf;
 899        if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ)) {
 900                rtrs_err(s, "Sess info request is malformed: type %d\n",
 901                          le16_to_cpu(msg->type));
 902                goto close;
 903        }
 904        err = process_info_req(con, msg);
 905        if (unlikely(err))
 906                goto close;
 907
 908out:
 909        rtrs_iu_free(iu, DMA_FROM_DEVICE, sess->s.dev->ib_dev, 1);
 910        return;
 911close:
 912        close_sess(sess);
 913        goto out;
 914}
 915
 916static int post_recv_info_req(struct rtrs_srv_con *con)
 917{
 918        struct rtrs_sess *s = con->c.sess;
 919        struct rtrs_srv_sess *sess = to_srv_sess(s);
 920        struct rtrs_iu *rx_iu;
 921        int err;
 922
 923        rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req),
 924                               GFP_KERNEL, sess->s.dev->ib_dev,
 925                               DMA_FROM_DEVICE, rtrs_srv_info_req_done);
 926        if (unlikely(!rx_iu))
 927                return -ENOMEM;
 928        /* Prepare for getting info response */
 929        err = rtrs_iu_post_recv(&con->c, rx_iu);
 930        if (unlikely(err)) {
 931                rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err);
 932                rtrs_iu_free(rx_iu, DMA_FROM_DEVICE, sess->s.dev->ib_dev, 1);
 933                return err;
 934        }
 935
 936        return 0;
 937}
 938
 939static int post_recv_io(struct rtrs_srv_con *con, size_t q_size)
 940{
 941        int i, err;
 942
 943        for (i = 0; i < q_size; i++) {
 944                err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
 945                if (unlikely(err))
 946                        return err;
 947        }
 948
 949        return 0;
 950}
 951
 952static int post_recv_sess(struct rtrs_srv_sess *sess)
 953{
 954        struct rtrs_srv *srv = sess->srv;
 955        struct rtrs_sess *s = &sess->s;
 956        size_t q_size;
 957        int err, cid;
 958
 959        for (cid = 0; cid < sess->s.con_num; cid++) {
 960                if (cid == 0)
 961                        q_size = SERVICE_CON_QUEUE_DEPTH;
 962                else
 963                        q_size = srv->queue_depth;
 964
 965                err = post_recv_io(to_srv_con(sess->s.con[cid]), q_size);
 966                if (unlikely(err)) {
 967                        rtrs_err(s, "post_recv_io(), err: %d\n", err);
 968                        return err;
 969                }
 970        }
 971
 972        return 0;
 973}
 974
 975static void process_read(struct rtrs_srv_con *con,
 976                         struct rtrs_msg_rdma_read *msg,
 977                         u32 buf_id, u32 off)
 978{
 979        struct rtrs_sess *s = con->c.sess;
 980        struct rtrs_srv_sess *sess = to_srv_sess(s);
 981        struct rtrs_srv *srv = sess->srv;
 982        struct rtrs_srv_ctx *ctx = srv->ctx;
 983        struct rtrs_srv_op *id;
 984
 985        size_t usr_len, data_len;
 986        void *data;
 987        int ret;
 988
 989        if (unlikely(sess->state != RTRS_SRV_CONNECTED)) {
 990                rtrs_err_rl(s,
 991                             "Processing read request failed,  session is disconnected, sess state %s\n",
 992                             rtrs_srv_state_str(sess->state));
 993                return;
 994        }
 995        if (unlikely(msg->sg_cnt != 1 && msg->sg_cnt != 0)) {
 996                rtrs_err_rl(s,
 997                            "Processing read request failed, invalid message\n");
 998                return;
 999        }
1000        rtrs_srv_get_ops_ids(sess);
1001        rtrs_srv_update_rdma_stats(sess->stats, off, READ);
1002        id = sess->ops_ids[buf_id];
1003        id->con         = con;
1004        id->dir         = READ;
1005        id->msg_id      = buf_id;
1006        id->rd_msg      = msg;
1007        usr_len = le16_to_cpu(msg->usr_len);
1008        data_len = off - usr_len;
1009        data = page_address(srv->chunks[buf_id]);
1010        ret = ctx->ops.rdma_ev(srv, srv->priv, id, READ, data, data_len,
1011                           data + data_len, usr_len);
1012
1013        if (unlikely(ret)) {
1014                rtrs_err_rl(s,
1015                             "Processing read request failed, user module cb reported for msg_id %d, err: %d\n",
1016                             buf_id, ret);
1017                goto send_err_msg;
1018        }
1019
1020        return;
1021
1022send_err_msg:
1023        ret = send_io_resp_imm(con, id, ret);
1024        if (ret < 0) {
1025                rtrs_err_rl(s,
1026                             "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n",
1027                             buf_id, ret);
1028                close_sess(sess);
1029        }
1030        rtrs_srv_put_ops_ids(sess);
1031}
1032
1033static void process_write(struct rtrs_srv_con *con,
1034                          struct rtrs_msg_rdma_write *req,
1035                          u32 buf_id, u32 off)
1036{
1037        struct rtrs_sess *s = con->c.sess;
1038        struct rtrs_srv_sess *sess = to_srv_sess(s);
1039        struct rtrs_srv *srv = sess->srv;
1040        struct rtrs_srv_ctx *ctx = srv->ctx;
1041        struct rtrs_srv_op *id;
1042
1043        size_t data_len, usr_len;
1044        void *data;
1045        int ret;
1046
1047        if (unlikely(sess->state != RTRS_SRV_CONNECTED)) {
1048                rtrs_err_rl(s,
1049                             "Processing write request failed,  session is disconnected, sess state %s\n",
1050                             rtrs_srv_state_str(sess->state));
1051                return;
1052        }
1053        rtrs_srv_get_ops_ids(sess);
1054        rtrs_srv_update_rdma_stats(sess->stats, off, WRITE);
1055        id = sess->ops_ids[buf_id];
1056        id->con    = con;
1057        id->dir    = WRITE;
1058        id->msg_id = buf_id;
1059
1060        usr_len = le16_to_cpu(req->usr_len);
1061        data_len = off - usr_len;
1062        data = page_address(srv->chunks[buf_id]);
1063        ret = ctx->ops.rdma_ev(srv, srv->priv, id, WRITE, data, data_len,
1064                           data + data_len, usr_len);
1065        if (unlikely(ret)) {
1066                rtrs_err_rl(s,
1067                             "Processing write request failed, user module callback reports err: %d\n",
1068                             ret);
1069                goto send_err_msg;
1070        }
1071
1072        return;
1073
1074send_err_msg:
1075        ret = send_io_resp_imm(con, id, ret);
1076        if (ret < 0) {
1077                rtrs_err_rl(s,
1078                             "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n",
1079                             buf_id, ret);
1080                close_sess(sess);
1081        }
1082        rtrs_srv_put_ops_ids(sess);
1083}
1084
1085static void process_io_req(struct rtrs_srv_con *con, void *msg,
1086                           u32 id, u32 off)
1087{
1088        struct rtrs_sess *s = con->c.sess;
1089        struct rtrs_srv_sess *sess = to_srv_sess(s);
1090        struct rtrs_msg_rdma_hdr *hdr;
1091        unsigned int type;
1092
1093        ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, sess->dma_addr[id],
1094                                   max_chunk_size, DMA_BIDIRECTIONAL);
1095        hdr = msg;
1096        type = le16_to_cpu(hdr->type);
1097
1098        switch (type) {
1099        case RTRS_MSG_WRITE:
1100                process_write(con, msg, id, off);
1101                break;
1102        case RTRS_MSG_READ:
1103                process_read(con, msg, id, off);
1104                break;
1105        default:
1106                rtrs_err(s,
1107                          "Processing I/O request failed, unknown message type received: 0x%02x\n",
1108                          type);
1109                goto err;
1110        }
1111
1112        return;
1113
1114err:
1115        close_sess(sess);
1116}
1117
1118static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc)
1119{
1120        struct rtrs_srv_mr *mr =
1121                container_of(wc->wr_cqe, typeof(*mr), inv_cqe);
1122        struct rtrs_srv_con *con = cq->cq_context;
1123        struct rtrs_sess *s = con->c.sess;
1124        struct rtrs_srv_sess *sess = to_srv_sess(s);
1125        struct rtrs_srv *srv = sess->srv;
1126        u32 msg_id, off;
1127        void *data;
1128
1129        if (unlikely(wc->status != IB_WC_SUCCESS)) {
1130                rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n",
1131                          ib_wc_status_msg(wc->status));
1132                close_sess(sess);
1133        }
1134        msg_id = mr->msg_id;
1135        off = mr->msg_off;
1136        data = page_address(srv->chunks[msg_id]) + off;
1137        process_io_req(con, data, msg_id, off);
1138}
1139
1140static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con,
1141                              struct rtrs_srv_mr *mr)
1142{
1143        struct ib_send_wr wr = {
1144                .opcode             = IB_WR_LOCAL_INV,
1145                .wr_cqe             = &mr->inv_cqe,
1146                .send_flags         = IB_SEND_SIGNALED,
1147                .ex.invalidate_rkey = mr->mr->rkey,
1148        };
1149        mr->inv_cqe.done = rtrs_srv_inv_rkey_done;
1150
1151        return ib_post_send(con->c.qp, &wr, NULL);
1152}
1153
1154static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con)
1155{
1156        spin_lock(&con->rsp_wr_wait_lock);
1157        while (!list_empty(&con->rsp_wr_wait_list)) {
1158                struct rtrs_srv_op *id;
1159                int ret;
1160
1161                id = list_entry(con->rsp_wr_wait_list.next,
1162                                struct rtrs_srv_op, wait_list);
1163                list_del(&id->wait_list);
1164
1165                spin_unlock(&con->rsp_wr_wait_lock);
1166                ret = rtrs_srv_resp_rdma(id, id->status);
1167                spin_lock(&con->rsp_wr_wait_lock);
1168
1169                if (!ret) {
1170                        list_add(&id->wait_list, &con->rsp_wr_wait_list);
1171                        break;
1172                }
1173        }
1174        spin_unlock(&con->rsp_wr_wait_lock);
1175}
1176
1177static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc)
1178{
1179        struct rtrs_srv_con *con = cq->cq_context;
1180        struct rtrs_sess *s = con->c.sess;
1181        struct rtrs_srv_sess *sess = to_srv_sess(s);
1182        struct rtrs_srv *srv = sess->srv;
1183        u32 imm_type, imm_payload;
1184        int err;
1185
1186        if (unlikely(wc->status != IB_WC_SUCCESS)) {
1187                if (wc->status != IB_WC_WR_FLUSH_ERR) {
1188                        rtrs_err(s,
1189                                  "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n",
1190                                  ib_wc_status_msg(wc->status), wc->wr_cqe,
1191                                  wc->opcode, wc->vendor_err, wc->byte_len);
1192                        close_sess(sess);
1193                }
1194                return;
1195        }
1196
1197        switch (wc->opcode) {
1198        case IB_WC_RECV_RDMA_WITH_IMM:
1199                /*
1200                 * post_recv() RDMA write completions of IO reqs (read/write)
1201                 * and hb
1202                 */
1203                if (WARN_ON(wc->wr_cqe != &io_comp_cqe))
1204                        return;
1205                err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
1206                if (unlikely(err)) {
1207                        rtrs_err(s, "rtrs_post_recv(), err: %d\n", err);
1208                        close_sess(sess);
1209                        break;
1210                }
1211                rtrs_from_imm(be32_to_cpu(wc->ex.imm_data),
1212                               &imm_type, &imm_payload);
1213                if (likely(imm_type == RTRS_IO_REQ_IMM)) {
1214                        u32 msg_id, off;
1215                        void *data;
1216
1217                        msg_id = imm_payload >> sess->mem_bits;
1218                        off = imm_payload & ((1 << sess->mem_bits) - 1);
1219                        if (unlikely(msg_id >= srv->queue_depth ||
1220                                     off >= max_chunk_size)) {
1221                                rtrs_err(s, "Wrong msg_id %u, off %u\n",
1222                                          msg_id, off);
1223                                close_sess(sess);
1224                                return;
1225                        }
1226                        if (always_invalidate) {
1227                                struct rtrs_srv_mr *mr = &sess->mrs[msg_id];
1228
1229                                mr->msg_off = off;
1230                                mr->msg_id = msg_id;
1231                                err = rtrs_srv_inv_rkey(con, mr);
1232                                if (unlikely(err)) {
1233                                        rtrs_err(s, "rtrs_post_recv(), err: %d\n",
1234                                                  err);
1235                                        close_sess(sess);
1236                                        break;
1237                                }
1238                        } else {
1239                                data = page_address(srv->chunks[msg_id]) + off;
1240                                process_io_req(con, data, msg_id, off);
1241                        }
1242                } else if (imm_type == RTRS_HB_MSG_IMM) {
1243                        WARN_ON(con->c.cid);
1244                        rtrs_send_hb_ack(&sess->s);
1245                } else if (imm_type == RTRS_HB_ACK_IMM) {
1246                        WARN_ON(con->c.cid);
1247                        sess->s.hb_missed_cnt = 0;
1248                } else {
1249                        rtrs_wrn(s, "Unknown IMM type %u\n", imm_type);
1250                }
1251                break;
1252        case IB_WC_RDMA_WRITE:
1253        case IB_WC_SEND:
1254                /*
1255                 * post_send() RDMA write completions of IO reqs (read/write)
1256                 * and hb
1257                 */
1258                atomic_add(srv->queue_depth, &con->sq_wr_avail);
1259
1260                if (unlikely(!list_empty_careful(&con->rsp_wr_wait_list)))
1261                        rtrs_rdma_process_wr_wait_list(con);
1262
1263                break;
1264        default:
1265                rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode);
1266                return;
1267        }
1268}
1269
1270/**
1271 * rtrs_srv_get_sess_name() - Get rtrs_srv peer hostname.
1272 * @srv:        Session
1273 * @sessname:   Sessname buffer
1274 * @len:        Length of sessname buffer
1275 */
1276int rtrs_srv_get_sess_name(struct rtrs_srv *srv, char *sessname, size_t len)
1277{
1278        struct rtrs_srv_sess *sess;
1279        int err = -ENOTCONN;
1280
1281        mutex_lock(&srv->paths_mutex);
1282        list_for_each_entry(sess, &srv->paths_list, s.entry) {
1283                if (sess->state != RTRS_SRV_CONNECTED)
1284                        continue;
1285                strlcpy(sessname, sess->s.sessname,
1286                       min_t(size_t, sizeof(sess->s.sessname), len));
1287                err = 0;
1288                break;
1289        }
1290        mutex_unlock(&srv->paths_mutex);
1291
1292        return err;
1293}
1294EXPORT_SYMBOL(rtrs_srv_get_sess_name);
1295
1296/**
1297 * rtrs_srv_get_sess_qdepth() - Get rtrs_srv qdepth.
1298 * @srv:        Session
1299 */
1300int rtrs_srv_get_queue_depth(struct rtrs_srv *srv)
1301{
1302        return srv->queue_depth;
1303}
1304EXPORT_SYMBOL(rtrs_srv_get_queue_depth);
1305
1306static int find_next_bit_ring(struct rtrs_srv_sess *sess)
1307{
1308        struct ib_device *ib_dev = sess->s.dev->ib_dev;
1309        int v;
1310
1311        v = cpumask_next(sess->cur_cq_vector, &cq_affinity_mask);
1312        if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors)
1313                v = cpumask_first(&cq_affinity_mask);
1314        return v;
1315}
1316
1317static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_sess *sess)
1318{
1319        sess->cur_cq_vector = find_next_bit_ring(sess);
1320
1321        return sess->cur_cq_vector;
1322}
1323
1324static void rtrs_srv_dev_release(struct device *dev)
1325{
1326        struct rtrs_srv *srv = container_of(dev, struct rtrs_srv, dev);
1327
1328        kfree(srv);
1329}
1330
1331static struct rtrs_srv *__alloc_srv(struct rtrs_srv_ctx *ctx,
1332                                     const uuid_t *paths_uuid)
1333{
1334        struct rtrs_srv *srv;
1335        int i;
1336
1337        srv = kzalloc(sizeof(*srv), GFP_KERNEL);
1338        if  (!srv)
1339                return NULL;
1340
1341        refcount_set(&srv->refcount, 1);
1342        INIT_LIST_HEAD(&srv->paths_list);
1343        mutex_init(&srv->paths_mutex);
1344        mutex_init(&srv->paths_ev_mutex);
1345        uuid_copy(&srv->paths_uuid, paths_uuid);
1346        srv->queue_depth = sess_queue_depth;
1347        srv->ctx = ctx;
1348        device_initialize(&srv->dev);
1349        srv->dev.release = rtrs_srv_dev_release;
1350
1351        srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks),
1352                              GFP_KERNEL);
1353        if (!srv->chunks)
1354                goto err_free_srv;
1355
1356        for (i = 0; i < srv->queue_depth; i++) {
1357                srv->chunks[i] = mempool_alloc(chunk_pool, GFP_KERNEL);
1358                if (!srv->chunks[i])
1359                        goto err_free_chunks;
1360        }
1361        list_add(&srv->ctx_list, &ctx->srv_list);
1362
1363        return srv;
1364
1365err_free_chunks:
1366        while (i--)
1367                mempool_free(srv->chunks[i], chunk_pool);
1368        kfree(srv->chunks);
1369
1370err_free_srv:
1371        kfree(srv);
1372
1373        return NULL;
1374}
1375
1376static void free_srv(struct rtrs_srv *srv)
1377{
1378        int i;
1379
1380        WARN_ON(refcount_read(&srv->refcount));
1381        for (i = 0; i < srv->queue_depth; i++)
1382                mempool_free(srv->chunks[i], chunk_pool);
1383        kfree(srv->chunks);
1384        mutex_destroy(&srv->paths_mutex);
1385        mutex_destroy(&srv->paths_ev_mutex);
1386        /* last put to release the srv structure */
1387        put_device(&srv->dev);
1388}
1389
1390static inline struct rtrs_srv *__find_srv_and_get(struct rtrs_srv_ctx *ctx,
1391                                                   const uuid_t *paths_uuid)
1392{
1393        struct rtrs_srv *srv;
1394
1395        list_for_each_entry(srv, &ctx->srv_list, ctx_list) {
1396                if (uuid_equal(&srv->paths_uuid, paths_uuid) &&
1397                    refcount_inc_not_zero(&srv->refcount))
1398                        return srv;
1399        }
1400
1401        return NULL;
1402}
1403
1404static struct rtrs_srv *get_or_create_srv(struct rtrs_srv_ctx *ctx,
1405                                           const uuid_t *paths_uuid)
1406{
1407        struct rtrs_srv *srv;
1408
1409        mutex_lock(&ctx->srv_mutex);
1410        srv = __find_srv_and_get(ctx, paths_uuid);
1411        if (!srv)
1412                srv = __alloc_srv(ctx, paths_uuid);
1413        mutex_unlock(&ctx->srv_mutex);
1414
1415        return srv;
1416}
1417
1418static void put_srv(struct rtrs_srv *srv)
1419{
1420        if (refcount_dec_and_test(&srv->refcount)) {
1421                struct rtrs_srv_ctx *ctx = srv->ctx;
1422
1423                WARN_ON(srv->dev.kobj.state_in_sysfs);
1424
1425                mutex_lock(&ctx->srv_mutex);
1426                list_del(&srv->ctx_list);
1427                mutex_unlock(&ctx->srv_mutex);
1428                free_srv(srv);
1429        }
1430}
1431
1432static void __add_path_to_srv(struct rtrs_srv *srv,
1433                              struct rtrs_srv_sess *sess)
1434{
1435        list_add_tail(&sess->s.entry, &srv->paths_list);
1436        srv->paths_num++;
1437        WARN_ON(srv->paths_num >= MAX_PATHS_NUM);
1438}
1439
1440static void del_path_from_srv(struct rtrs_srv_sess *sess)
1441{
1442        struct rtrs_srv *srv = sess->srv;
1443
1444        if (WARN_ON(!srv))
1445                return;
1446
1447        mutex_lock(&srv->paths_mutex);
1448        list_del(&sess->s.entry);
1449        WARN_ON(!srv->paths_num);
1450        srv->paths_num--;
1451        mutex_unlock(&srv->paths_mutex);
1452}
1453
1454/* return true if addresses are the same, error other wise */
1455static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b)
1456{
1457        switch (a->sa_family) {
1458        case AF_IB:
1459                return memcmp(&((struct sockaddr_ib *)a)->sib_addr,
1460                              &((struct sockaddr_ib *)b)->sib_addr,
1461                              sizeof(struct ib_addr)) &&
1462                        (b->sa_family == AF_IB);
1463        case AF_INET:
1464                return memcmp(&((struct sockaddr_in *)a)->sin_addr,
1465                              &((struct sockaddr_in *)b)->sin_addr,
1466                              sizeof(struct in_addr)) &&
1467                        (b->sa_family == AF_INET);
1468        case AF_INET6:
1469                return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr,
1470                              &((struct sockaddr_in6 *)b)->sin6_addr,
1471                              sizeof(struct in6_addr)) &&
1472                        (b->sa_family == AF_INET6);
1473        default:
1474                return -ENOENT;
1475        }
1476}
1477
1478static bool __is_path_w_addr_exists(struct rtrs_srv *srv,
1479                                    struct rdma_addr *addr)
1480{
1481        struct rtrs_srv_sess *sess;
1482
1483        list_for_each_entry(sess, &srv->paths_list, s.entry)
1484                if (!sockaddr_cmp((struct sockaddr *)&sess->s.dst_addr,
1485                                  (struct sockaddr *)&addr->dst_addr) &&
1486                    !sockaddr_cmp((struct sockaddr *)&sess->s.src_addr,
1487                                  (struct sockaddr *)&addr->src_addr))
1488                        return true;
1489
1490        return false;
1491}
1492
1493static void free_sess(struct rtrs_srv_sess *sess)
1494{
1495        if (sess->kobj.state_in_sysfs)
1496                kobject_put(&sess->kobj);
1497        else
1498                kfree(sess);
1499}
1500
1501static void rtrs_srv_close_work(struct work_struct *work)
1502{
1503        struct rtrs_srv_sess *sess;
1504        struct rtrs_srv_con *con;
1505        int i;
1506
1507        sess = container_of(work, typeof(*sess), close_work);
1508
1509        rtrs_srv_destroy_sess_files(sess);
1510        rtrs_srv_stop_hb(sess);
1511
1512        for (i = 0; i < sess->s.con_num; i++) {
1513                if (!sess->s.con[i])
1514                        continue;
1515                con = to_srv_con(sess->s.con[i]);
1516                rdma_disconnect(con->c.cm_id);
1517                ib_drain_qp(con->c.qp);
1518        }
1519        /* Wait for all inflights */
1520        rtrs_srv_wait_ops_ids(sess);
1521
1522        /* Notify upper layer if we are the last path */
1523        rtrs_srv_sess_down(sess);
1524
1525        unmap_cont_bufs(sess);
1526        rtrs_srv_free_ops_ids(sess);
1527
1528        for (i = 0; i < sess->s.con_num; i++) {
1529                if (!sess->s.con[i])
1530                        continue;
1531                con = to_srv_con(sess->s.con[i]);
1532                rtrs_cq_qp_destroy(&con->c);
1533                rdma_destroy_id(con->c.cm_id);
1534                kfree(con);
1535        }
1536        rtrs_ib_dev_put(sess->s.dev);
1537
1538        del_path_from_srv(sess);
1539        put_srv(sess->srv);
1540        sess->srv = NULL;
1541        rtrs_srv_change_state(sess, RTRS_SRV_CLOSED);
1542
1543        kfree(sess->dma_addr);
1544        kfree(sess->s.con);
1545        free_sess(sess);
1546}
1547
1548static int rtrs_rdma_do_accept(struct rtrs_srv_sess *sess,
1549                               struct rdma_cm_id *cm_id)
1550{
1551        struct rtrs_srv *srv = sess->srv;
1552        struct rtrs_msg_conn_rsp msg;
1553        struct rdma_conn_param param;
1554        int err;
1555
1556        param = (struct rdma_conn_param) {
1557                .rnr_retry_count = 7,
1558                .private_data = &msg,
1559                .private_data_len = sizeof(msg),
1560        };
1561
1562        msg = (struct rtrs_msg_conn_rsp) {
1563                .magic = cpu_to_le16(RTRS_MAGIC),
1564                .version = cpu_to_le16(RTRS_PROTO_VER),
1565                .queue_depth = cpu_to_le16(srv->queue_depth),
1566                .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE),
1567                .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE),
1568        };
1569
1570        if (always_invalidate)
1571                msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F);
1572
1573        err = rdma_accept(cm_id, &param);
1574        if (err)
1575                pr_err("rdma_accept(), err: %d\n", err);
1576
1577        return err;
1578}
1579
1580static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno)
1581{
1582        struct rtrs_msg_conn_rsp msg;
1583        int err;
1584
1585        msg = (struct rtrs_msg_conn_rsp) {
1586                .magic = cpu_to_le16(RTRS_MAGIC),
1587                .version = cpu_to_le16(RTRS_PROTO_VER),
1588                .errno = cpu_to_le16(errno),
1589        };
1590
1591        err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED);
1592        if (err)
1593                pr_err("rdma_reject(), err: %d\n", err);
1594
1595        /* Bounce errno back */
1596        return errno;
1597}
1598
1599static struct rtrs_srv_sess *
1600__find_sess(struct rtrs_srv *srv, const uuid_t *sess_uuid)
1601{
1602        struct rtrs_srv_sess *sess;
1603
1604        list_for_each_entry(sess, &srv->paths_list, s.entry) {
1605                if (uuid_equal(&sess->s.uuid, sess_uuid))
1606                        return sess;
1607        }
1608
1609        return NULL;
1610}
1611
1612static int create_con(struct rtrs_srv_sess *sess,
1613                      struct rdma_cm_id *cm_id,
1614                      unsigned int cid)
1615{
1616        struct rtrs_srv *srv = sess->srv;
1617        struct rtrs_sess *s = &sess->s;
1618        struct rtrs_srv_con *con;
1619
1620        u16 cq_size, wr_queue_size;
1621        int err, cq_vector;
1622
1623        con = kzalloc(sizeof(*con), GFP_KERNEL);
1624        if (!con) {
1625                err = -ENOMEM;
1626                goto err;
1627        }
1628
1629        spin_lock_init(&con->rsp_wr_wait_lock);
1630        INIT_LIST_HEAD(&con->rsp_wr_wait_list);
1631        con->c.cm_id = cm_id;
1632        con->c.sess = &sess->s;
1633        con->c.cid = cid;
1634        atomic_set(&con->wr_cnt, 0);
1635
1636        if (con->c.cid == 0) {
1637                /*
1638                 * All receive and all send (each requiring invalidate)
1639                 * + 2 for drain and heartbeat
1640                 */
1641                wr_queue_size = SERVICE_CON_QUEUE_DEPTH * 3 + 2;
1642                cq_size = wr_queue_size;
1643        } else {
1644                /*
1645                 * If we have all receive requests posted and
1646                 * all write requests posted and each read request
1647                 * requires an invalidate request + drain
1648                 * and qp gets into error state.
1649                 */
1650                cq_size = srv->queue_depth * 3 + 1;
1651                /*
1652                 * In theory we might have queue_depth * 32
1653                 * outstanding requests if an unsafe global key is used
1654                 * and we have queue_depth read requests each consisting
1655                 * of 32 different addresses. div 3 for mlx5.
1656                 */
1657                wr_queue_size = sess->s.dev->ib_dev->attrs.max_qp_wr / 3;
1658        }
1659        atomic_set(&con->sq_wr_avail, wr_queue_size);
1660        cq_vector = rtrs_srv_get_next_cq_vector(sess);
1661
1662        /* TODO: SOFTIRQ can be faster, but be careful with softirq context */
1663        err = rtrs_cq_qp_create(&sess->s, &con->c, 1, cq_vector, cq_size,
1664                                 wr_queue_size, IB_POLL_WORKQUEUE);
1665        if (err) {
1666                rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err);
1667                goto free_con;
1668        }
1669        if (con->c.cid == 0) {
1670                err = post_recv_info_req(con);
1671                if (err)
1672                        goto free_cqqp;
1673        }
1674        WARN_ON(sess->s.con[cid]);
1675        sess->s.con[cid] = &con->c;
1676
1677        /*
1678         * Change context from server to current connection.  The other
1679         * way is to use cm_id->qp->qp_context, which does not work on OFED.
1680         */
1681        cm_id->context = &con->c;
1682
1683        return 0;
1684
1685free_cqqp:
1686        rtrs_cq_qp_destroy(&con->c);
1687free_con:
1688        kfree(con);
1689
1690err:
1691        return err;
1692}
1693
1694static struct rtrs_srv_sess *__alloc_sess(struct rtrs_srv *srv,
1695                                           struct rdma_cm_id *cm_id,
1696                                           unsigned int con_num,
1697                                           unsigned int recon_cnt,
1698                                           const uuid_t *uuid)
1699{
1700        struct rtrs_srv_sess *sess;
1701        int err = -ENOMEM;
1702
1703        if (srv->paths_num >= MAX_PATHS_NUM) {
1704                err = -ECONNRESET;
1705                goto err;
1706        }
1707        if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) {
1708                err = -EEXIST;
1709                pr_err("Path with same addr exists\n");
1710                goto err;
1711        }
1712        sess = kzalloc(sizeof(*sess), GFP_KERNEL);
1713        if (!sess)
1714                goto err;
1715
1716        sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL);
1717        if (!sess->stats)
1718                goto err_free_sess;
1719
1720        sess->stats->sess = sess;
1721
1722        sess->dma_addr = kcalloc(srv->queue_depth, sizeof(*sess->dma_addr),
1723                                 GFP_KERNEL);
1724        if (!sess->dma_addr)
1725                goto err_free_stats;
1726
1727        sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL);
1728        if (!sess->s.con)
1729                goto err_free_dma_addr;
1730
1731        sess->state = RTRS_SRV_CONNECTING;
1732        sess->srv = srv;
1733        sess->cur_cq_vector = -1;
1734        sess->s.dst_addr = cm_id->route.addr.dst_addr;
1735        sess->s.src_addr = cm_id->route.addr.src_addr;
1736        sess->s.con_num = con_num;
1737        sess->s.recon_cnt = recon_cnt;
1738        uuid_copy(&sess->s.uuid, uuid);
1739        spin_lock_init(&sess->state_lock);
1740        INIT_WORK(&sess->close_work, rtrs_srv_close_work);
1741        rtrs_srv_init_hb(sess);
1742
1743        sess->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd);
1744        if (!sess->s.dev) {
1745                err = -ENOMEM;
1746                goto err_free_con;
1747        }
1748        err = map_cont_bufs(sess);
1749        if (err)
1750                goto err_put_dev;
1751
1752        err = rtrs_srv_alloc_ops_ids(sess);
1753        if (err)
1754                goto err_unmap_bufs;
1755
1756        __add_path_to_srv(srv, sess);
1757
1758        return sess;
1759
1760err_unmap_bufs:
1761        unmap_cont_bufs(sess);
1762err_put_dev:
1763        rtrs_ib_dev_put(sess->s.dev);
1764err_free_con:
1765        kfree(sess->s.con);
1766err_free_dma_addr:
1767        kfree(sess->dma_addr);
1768err_free_stats:
1769        kfree(sess->stats);
1770err_free_sess:
1771        kfree(sess);
1772err:
1773        return ERR_PTR(err);
1774}
1775
1776static int rtrs_rdma_connect(struct rdma_cm_id *cm_id,
1777                              const struct rtrs_msg_conn_req *msg,
1778                              size_t len)
1779{
1780        struct rtrs_srv_ctx *ctx = cm_id->context;
1781        struct rtrs_srv_sess *sess;
1782        struct rtrs_srv *srv;
1783
1784        u16 version, con_num, cid;
1785        u16 recon_cnt;
1786        int err;
1787
1788        if (len < sizeof(*msg)) {
1789                pr_err("Invalid RTRS connection request\n");
1790                goto reject_w_econnreset;
1791        }
1792        if (le16_to_cpu(msg->magic) != RTRS_MAGIC) {
1793                pr_err("Invalid RTRS magic\n");
1794                goto reject_w_econnreset;
1795        }
1796        version = le16_to_cpu(msg->version);
1797        if (version >> 8 != RTRS_PROTO_VER_MAJOR) {
1798                pr_err("Unsupported major RTRS version: %d, expected %d\n",
1799                       version >> 8, RTRS_PROTO_VER_MAJOR);
1800                goto reject_w_econnreset;
1801        }
1802        con_num = le16_to_cpu(msg->cid_num);
1803        if (con_num > 4096) {
1804                /* Sanity check */
1805                pr_err("Too many connections requested: %d\n", con_num);
1806                goto reject_w_econnreset;
1807        }
1808        cid = le16_to_cpu(msg->cid);
1809        if (cid >= con_num) {
1810                /* Sanity check */
1811                pr_err("Incorrect cid: %d >= %d\n", cid, con_num);
1812                goto reject_w_econnreset;
1813        }
1814        recon_cnt = le16_to_cpu(msg->recon_cnt);
1815        srv = get_or_create_srv(ctx, &msg->paths_uuid);
1816        if (!srv) {
1817                err = -ENOMEM;
1818                goto reject_w_err;
1819        }
1820        mutex_lock(&srv->paths_mutex);
1821        sess = __find_sess(srv, &msg->sess_uuid);
1822        if (sess) {
1823                struct rtrs_sess *s = &sess->s;
1824
1825                /* Session already holds a reference */
1826                put_srv(srv);
1827
1828                if (sess->state != RTRS_SRV_CONNECTING) {
1829                        rtrs_err(s, "Session in wrong state: %s\n",
1830                                  rtrs_srv_state_str(sess->state));
1831                        mutex_unlock(&srv->paths_mutex);
1832                        goto reject_w_econnreset;
1833                }
1834                /*
1835                 * Sanity checks
1836                 */
1837                if (con_num != s->con_num || cid >= s->con_num) {
1838                        rtrs_err(s, "Incorrect request: %d, %d\n",
1839                                  cid, con_num);
1840                        mutex_unlock(&srv->paths_mutex);
1841                        goto reject_w_econnreset;
1842                }
1843                if (s->con[cid]) {
1844                        rtrs_err(s, "Connection already exists: %d\n",
1845                                  cid);
1846                        mutex_unlock(&srv->paths_mutex);
1847                        goto reject_w_econnreset;
1848                }
1849        } else {
1850                sess = __alloc_sess(srv, cm_id, con_num, recon_cnt,
1851                                    &msg->sess_uuid);
1852                if (IS_ERR(sess)) {
1853                        mutex_unlock(&srv->paths_mutex);
1854                        put_srv(srv);
1855                        err = PTR_ERR(sess);
1856                        goto reject_w_err;
1857                }
1858        }
1859        err = create_con(sess, cm_id, cid);
1860        if (err) {
1861                (void)rtrs_rdma_do_reject(cm_id, err);
1862                /*
1863                 * Since session has other connections we follow normal way
1864                 * through workqueue, but still return an error to tell cma.c
1865                 * to call rdma_destroy_id() for current connection.
1866                 */
1867                goto close_and_return_err;
1868        }
1869        err = rtrs_rdma_do_accept(sess, cm_id);
1870        if (err) {
1871                (void)rtrs_rdma_do_reject(cm_id, err);
1872                /*
1873                 * Since current connection was successfully added to the
1874                 * session we follow normal way through workqueue to close the
1875                 * session, thus return 0 to tell cma.c we call
1876                 * rdma_destroy_id() ourselves.
1877                 */
1878                err = 0;
1879                goto close_and_return_err;
1880        }
1881        mutex_unlock(&srv->paths_mutex);
1882
1883        return 0;
1884
1885reject_w_err:
1886        return rtrs_rdma_do_reject(cm_id, err);
1887
1888reject_w_econnreset:
1889        return rtrs_rdma_do_reject(cm_id, -ECONNRESET);
1890
1891close_and_return_err:
1892        close_sess(sess);
1893        mutex_unlock(&srv->paths_mutex);
1894
1895        return err;
1896}
1897
1898static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id,
1899                                     struct rdma_cm_event *ev)
1900{
1901        struct rtrs_srv_sess *sess = NULL;
1902        struct rtrs_sess *s = NULL;
1903
1904        if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
1905                struct rtrs_con *c = cm_id->context;
1906
1907                s = c->sess;
1908                sess = to_srv_sess(s);
1909        }
1910
1911        switch (ev->event) {
1912        case RDMA_CM_EVENT_CONNECT_REQUEST:
1913                /*
1914                 * In case of error cma.c will destroy cm_id,
1915                 * see cma_process_remove()
1916                 */
1917                return rtrs_rdma_connect(cm_id, ev->param.conn.private_data,
1918                                          ev->param.conn.private_data_len);
1919        case RDMA_CM_EVENT_ESTABLISHED:
1920                /* Nothing here */
1921                break;
1922        case RDMA_CM_EVENT_REJECTED:
1923        case RDMA_CM_EVENT_CONNECT_ERROR:
1924        case RDMA_CM_EVENT_UNREACHABLE:
1925                rtrs_err(s, "CM error (CM event: %s, err: %d)\n",
1926                          rdma_event_msg(ev->event), ev->status);
1927                close_sess(sess);
1928                break;
1929        case RDMA_CM_EVENT_DISCONNECTED:
1930        case RDMA_CM_EVENT_ADDR_CHANGE:
1931        case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1932                close_sess(sess);
1933                break;
1934        case RDMA_CM_EVENT_DEVICE_REMOVAL:
1935                close_sess(sess);
1936                break;
1937        default:
1938                pr_err("Ignoring unexpected CM event %s, err %d\n",
1939                       rdma_event_msg(ev->event), ev->status);
1940                break;
1941        }
1942
1943        return 0;
1944}
1945
1946static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx,
1947                                            struct sockaddr *addr,
1948                                            enum rdma_ucm_port_space ps)
1949{
1950        struct rdma_cm_id *cm_id;
1951        int ret;
1952
1953        cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler,
1954                               ctx, ps, IB_QPT_RC);
1955        if (IS_ERR(cm_id)) {
1956                ret = PTR_ERR(cm_id);
1957                pr_err("Creating id for RDMA connection failed, err: %d\n",
1958                       ret);
1959                goto err_out;
1960        }
1961        ret = rdma_bind_addr(cm_id, addr);
1962        if (ret) {
1963                pr_err("Binding RDMA address failed, err: %d\n", ret);
1964                goto err_cm;
1965        }
1966        ret = rdma_listen(cm_id, 64);
1967        if (ret) {
1968                pr_err("Listening on RDMA connection failed, err: %d\n",
1969                       ret);
1970                goto err_cm;
1971        }
1972
1973        return cm_id;
1974
1975err_cm:
1976        rdma_destroy_id(cm_id);
1977err_out:
1978
1979        return ERR_PTR(ret);
1980}
1981
1982static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port)
1983{
1984        struct sockaddr_in6 sin = {
1985                .sin6_family    = AF_INET6,
1986                .sin6_addr      = IN6ADDR_ANY_INIT,
1987                .sin6_port      = htons(port),
1988        };
1989        struct sockaddr_ib sib = {
1990                .sib_family                     = AF_IB,
1991                .sib_sid        = cpu_to_be64(RDMA_IB_IP_PS_IB | port),
1992                .sib_sid_mask   = cpu_to_be64(0xffffffffffffffffULL),
1993                .sib_pkey       = cpu_to_be16(0xffff),
1994        };
1995        struct rdma_cm_id *cm_ip, *cm_ib;
1996        int ret;
1997
1998        /*
1999         * We accept both IPoIB and IB connections, so we need to keep
2000         * two cm id's, one for each socket type and port space.
2001         * If the cm initialization of one of the id's fails, we abort
2002         * everything.
2003         */
2004        cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP);
2005        if (IS_ERR(cm_ip))
2006                return PTR_ERR(cm_ip);
2007
2008        cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB);
2009        if (IS_ERR(cm_ib)) {
2010                ret = PTR_ERR(cm_ib);
2011                goto free_cm_ip;
2012        }
2013
2014        ctx->cm_id_ip = cm_ip;
2015        ctx->cm_id_ib = cm_ib;
2016
2017        return 0;
2018
2019free_cm_ip:
2020        rdma_destroy_id(cm_ip);
2021
2022        return ret;
2023}
2024
2025static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops)
2026{
2027        struct rtrs_srv_ctx *ctx;
2028
2029        ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
2030        if (!ctx)
2031                return NULL;
2032
2033        ctx->ops = *ops;
2034        mutex_init(&ctx->srv_mutex);
2035        INIT_LIST_HEAD(&ctx->srv_list);
2036
2037        return ctx;
2038}
2039
2040static void free_srv_ctx(struct rtrs_srv_ctx *ctx)
2041{
2042        WARN_ON(!list_empty(&ctx->srv_list));
2043        mutex_destroy(&ctx->srv_mutex);
2044        kfree(ctx);
2045}
2046
2047static int rtrs_srv_add_one(struct ib_device *device)
2048{
2049        struct rtrs_srv_ctx *ctx;
2050        int ret = 0;
2051
2052        mutex_lock(&ib_ctx.ib_dev_mutex);
2053        if (ib_ctx.ib_dev_count)
2054                goto out;
2055
2056        /*
2057         * Since our CM IDs are NOT bound to any ib device we will create them
2058         * only once
2059         */
2060        ctx = ib_ctx.srv_ctx;
2061        ret = rtrs_srv_rdma_init(ctx, ib_ctx.port);
2062        if (ret) {
2063                /*
2064                 * We errored out here.
2065                 * According to the ib code, if we encounter an error here then the
2066                 * error code is ignored, and no more calls to our ops are made.
2067                 */
2068                pr_err("Failed to initialize RDMA connection");
2069                goto err_out;
2070        }
2071
2072out:
2073        /*
2074         * Keep a track on the number of ib devices added
2075         */
2076        ib_ctx.ib_dev_count++;
2077
2078err_out:
2079        mutex_unlock(&ib_ctx.ib_dev_mutex);
2080        return ret;
2081}
2082
2083static void rtrs_srv_remove_one(struct ib_device *device, void *client_data)
2084{
2085        struct rtrs_srv_ctx *ctx;
2086
2087        mutex_lock(&ib_ctx.ib_dev_mutex);
2088        ib_ctx.ib_dev_count--;
2089
2090        if (ib_ctx.ib_dev_count)
2091                goto out;
2092
2093        /*
2094         * Since our CM IDs are NOT bound to any ib device we will remove them
2095         * only once, when the last device is removed
2096         */
2097        ctx = ib_ctx.srv_ctx;
2098        rdma_destroy_id(ctx->cm_id_ip);
2099        rdma_destroy_id(ctx->cm_id_ib);
2100
2101out:
2102        mutex_unlock(&ib_ctx.ib_dev_mutex);
2103}
2104
2105static struct ib_client rtrs_srv_client = {
2106        .name   = "rtrs_server",
2107        .add    = rtrs_srv_add_one,
2108        .remove = rtrs_srv_remove_one
2109};
2110
2111/**
2112 * rtrs_srv_open() - open RTRS server context
2113 * @ops:                callback functions
2114 * @port:               port to listen on
2115 *
2116 * Creates server context with specified callbacks.
2117 *
2118 * Return a valid pointer on success otherwise PTR_ERR.
2119 */
2120struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port)
2121{
2122        struct rtrs_srv_ctx *ctx;
2123        int err;
2124
2125        ctx = alloc_srv_ctx(ops);
2126        if (!ctx)
2127                return ERR_PTR(-ENOMEM);
2128
2129        mutex_init(&ib_ctx.ib_dev_mutex);
2130        ib_ctx.srv_ctx = ctx;
2131        ib_ctx.port = port;
2132
2133        err = ib_register_client(&rtrs_srv_client);
2134        if (err) {
2135                free_srv_ctx(ctx);
2136                return ERR_PTR(err);
2137        }
2138
2139        return ctx;
2140}
2141EXPORT_SYMBOL(rtrs_srv_open);
2142
2143static void close_sessions(struct rtrs_srv *srv)
2144{
2145        struct rtrs_srv_sess *sess;
2146
2147        mutex_lock(&srv->paths_mutex);
2148        list_for_each_entry(sess, &srv->paths_list, s.entry)
2149                close_sess(sess);
2150        mutex_unlock(&srv->paths_mutex);
2151}
2152
2153static void close_ctx(struct rtrs_srv_ctx *ctx)
2154{
2155        struct rtrs_srv *srv;
2156
2157        mutex_lock(&ctx->srv_mutex);
2158        list_for_each_entry(srv, &ctx->srv_list, ctx_list)
2159                close_sessions(srv);
2160        mutex_unlock(&ctx->srv_mutex);
2161        flush_workqueue(rtrs_wq);
2162}
2163
2164/**
2165 * rtrs_srv_close() - close RTRS server context
2166 * @ctx: pointer to server context
2167 *
2168 * Closes RTRS server context with all client sessions.
2169 */
2170void rtrs_srv_close(struct rtrs_srv_ctx *ctx)
2171{
2172        ib_unregister_client(&rtrs_srv_client);
2173        mutex_destroy(&ib_ctx.ib_dev_mutex);
2174        close_ctx(ctx);
2175        free_srv_ctx(ctx);
2176}
2177EXPORT_SYMBOL(rtrs_srv_close);
2178
2179static int check_module_params(void)
2180{
2181        if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) {
2182                pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n",
2183                       sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH);
2184                return -EINVAL;
2185        }
2186        if (max_chunk_size < 4096 || !is_power_of_2(max_chunk_size)) {
2187                pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n",
2188                       max_chunk_size, 4096);
2189                return -EINVAL;
2190        }
2191
2192        /*
2193         * Check if IB immediate data size is enough to hold the mem_id and the
2194         * offset inside the memory chunk
2195         */
2196        if ((ilog2(sess_queue_depth - 1) + 1) +
2197            (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) {
2198                pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n",
2199                       MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size);
2200                return -EINVAL;
2201        }
2202
2203        return 0;
2204}
2205
2206static int __init rtrs_server_init(void)
2207{
2208        int err;
2209
2210        pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n",
2211                KBUILD_MODNAME, RTRS_PROTO_VER_STRING,
2212                max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE,
2213                sess_queue_depth, always_invalidate);
2214
2215        rtrs_rdma_dev_pd_init(0, &dev_pd);
2216
2217        err = check_module_params();
2218        if (err) {
2219                pr_err("Failed to load module, invalid module parameters, err: %d\n",
2220                       err);
2221                return err;
2222        }
2223        chunk_pool = mempool_create_page_pool(sess_queue_depth * CHUNK_POOL_SZ,
2224                                              get_order(max_chunk_size));
2225        if (!chunk_pool)
2226                return -ENOMEM;
2227        rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server");
2228        if (IS_ERR(rtrs_dev_class)) {
2229                err = PTR_ERR(rtrs_dev_class);
2230                goto out_chunk_pool;
2231        }
2232        rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0);
2233        if (!rtrs_wq) {
2234                err = -ENOMEM;
2235                goto out_dev_class;
2236        }
2237
2238        return 0;
2239
2240out_dev_class:
2241        class_destroy(rtrs_dev_class);
2242out_chunk_pool:
2243        mempool_destroy(chunk_pool);
2244
2245        return err;
2246}
2247
2248static void __exit rtrs_server_exit(void)
2249{
2250        destroy_workqueue(rtrs_wq);
2251        class_destroy(rtrs_dev_class);
2252        mempool_destroy(chunk_pool);
2253        rtrs_rdma_dev_pd_deinit(&dev_pd);
2254}
2255
2256module_init(rtrs_server_init);
2257module_exit(rtrs_server_exit);
2258