linux/drivers/infiniband/ulp/rtrs/rtrs-srv.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3 * RDMA Transport Layer
   4 *
   5 * Copyright (c) 2014 - 2018 ProfitBricks GmbH. All rights reserved.
   6 * Copyright (c) 2018 - 2019 1&1 IONOS Cloud GmbH. All rights reserved.
   7 * Copyright (c) 2019 - 2020 1&1 IONOS SE. All rights reserved.
   8 */
   9
  10#undef pr_fmt
  11#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
  12
  13#include <linux/module.h>
  14#include <linux/mempool.h>
  15
  16#include "rtrs-srv.h"
  17#include "rtrs-log.h"
  18#include <rdma/ib_cm.h>
  19#include <rdma/ib_verbs.h>
  20
  21MODULE_DESCRIPTION("RDMA Transport Server");
  22MODULE_LICENSE("GPL");
  23
  24/* Must be power of 2, see mask from mr->page_size in ib_sg_to_pages() */
  25#define DEFAULT_MAX_CHUNK_SIZE (128 << 10)
  26#define DEFAULT_SESS_QUEUE_DEPTH 512
  27#define MAX_HDR_SIZE PAGE_SIZE
  28
  29/* We guarantee to serve 10 paths at least */
  30#define CHUNK_POOL_SZ 10
  31
  32static struct rtrs_rdma_dev_pd dev_pd;
  33static mempool_t *chunk_pool;
  34struct class *rtrs_dev_class;
  35static struct rtrs_srv_ib_ctx ib_ctx;
  36
  37static int __read_mostly max_chunk_size = DEFAULT_MAX_CHUNK_SIZE;
  38static int __read_mostly sess_queue_depth = DEFAULT_SESS_QUEUE_DEPTH;
  39
  40static bool always_invalidate = true;
  41module_param(always_invalidate, bool, 0444);
  42MODULE_PARM_DESC(always_invalidate,
  43                 "Invalidate memory registration for contiguous memory regions before accessing.");
  44
  45module_param_named(max_chunk_size, max_chunk_size, int, 0444);
  46MODULE_PARM_DESC(max_chunk_size,
  47                 "Max size for each IO request, when change the unit is in byte (default: "
  48                 __stringify(DEFAULT_MAX_CHUNK_SIZE) "KB)");
  49
  50module_param_named(sess_queue_depth, sess_queue_depth, int, 0444);
  51MODULE_PARM_DESC(sess_queue_depth,
  52                 "Number of buffers for pending I/O requests to allocate per session. Maximum: "
  53                 __stringify(MAX_SESS_QUEUE_DEPTH) " (default: "
  54                 __stringify(DEFAULT_SESS_QUEUE_DEPTH) ")");
  55
  56static cpumask_t cq_affinity_mask = { CPU_BITS_ALL };
  57
  58static struct workqueue_struct *rtrs_wq;
  59
  60static inline struct rtrs_srv_con *to_srv_con(struct rtrs_con *c)
  61{
  62        return container_of(c, struct rtrs_srv_con, c);
  63}
  64
  65static inline struct rtrs_srv_sess *to_srv_sess(struct rtrs_sess *s)
  66{
  67        return container_of(s, struct rtrs_srv_sess, s);
  68}
  69
  70static bool rtrs_srv_change_state(struct rtrs_srv_sess *sess,
  71                                  enum rtrs_srv_state new_state)
  72{
  73        enum rtrs_srv_state old_state;
  74        bool changed = false;
  75
  76        spin_lock_irq(&sess->state_lock);
  77        old_state = sess->state;
  78        switch (new_state) {
  79        case RTRS_SRV_CONNECTED:
  80                if (old_state == RTRS_SRV_CONNECTING)
  81                        changed = true;
  82                break;
  83        case RTRS_SRV_CLOSING:
  84                if (old_state == RTRS_SRV_CONNECTING ||
  85                    old_state == RTRS_SRV_CONNECTED)
  86                        changed = true;
  87                break;
  88        case RTRS_SRV_CLOSED:
  89                if (old_state == RTRS_SRV_CLOSING)
  90                        changed = true;
  91                break;
  92        default:
  93                break;
  94        }
  95        if (changed)
  96                sess->state = new_state;
  97        spin_unlock_irq(&sess->state_lock);
  98
  99        return changed;
 100}
 101
 102static void free_id(struct rtrs_srv_op *id)
 103{
 104        if (!id)
 105                return;
 106        kfree(id);
 107}
 108
 109static void rtrs_srv_free_ops_ids(struct rtrs_srv_sess *sess)
 110{
 111        struct rtrs_srv *srv = sess->srv;
 112        int i;
 113
 114        if (sess->ops_ids) {
 115                for (i = 0; i < srv->queue_depth; i++)
 116                        free_id(sess->ops_ids[i]);
 117                kfree(sess->ops_ids);
 118                sess->ops_ids = NULL;
 119        }
 120}
 121
 122static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc);
 123
 124static struct ib_cqe io_comp_cqe = {
 125        .done = rtrs_srv_rdma_done
 126};
 127
 128static inline void rtrs_srv_inflight_ref_release(struct percpu_ref *ref)
 129{
 130        struct rtrs_srv_sess *sess = container_of(ref, struct rtrs_srv_sess, ids_inflight_ref);
 131
 132        percpu_ref_exit(&sess->ids_inflight_ref);
 133        complete(&sess->complete_done);
 134}
 135
 136static int rtrs_srv_alloc_ops_ids(struct rtrs_srv_sess *sess)
 137{
 138        struct rtrs_srv *srv = sess->srv;
 139        struct rtrs_srv_op *id;
 140        int i, ret;
 141
 142        sess->ops_ids = kcalloc(srv->queue_depth, sizeof(*sess->ops_ids),
 143                                GFP_KERNEL);
 144        if (!sess->ops_ids)
 145                goto err;
 146
 147        for (i = 0; i < srv->queue_depth; ++i) {
 148                id = kzalloc(sizeof(*id), GFP_KERNEL);
 149                if (!id)
 150                        goto err;
 151
 152                sess->ops_ids[i] = id;
 153        }
 154
 155        ret = percpu_ref_init(&sess->ids_inflight_ref,
 156                              rtrs_srv_inflight_ref_release, 0, GFP_KERNEL);
 157        if (ret) {
 158                pr_err("Percpu reference init failed\n");
 159                goto err;
 160        }
 161        init_completion(&sess->complete_done);
 162
 163        return 0;
 164
 165err:
 166        rtrs_srv_free_ops_ids(sess);
 167        return -ENOMEM;
 168}
 169
 170static inline void rtrs_srv_get_ops_ids(struct rtrs_srv_sess *sess)
 171{
 172        percpu_ref_get(&sess->ids_inflight_ref);
 173}
 174
 175static inline void rtrs_srv_put_ops_ids(struct rtrs_srv_sess *sess)
 176{
 177        percpu_ref_put(&sess->ids_inflight_ref);
 178}
 179
 180static void rtrs_srv_reg_mr_done(struct ib_cq *cq, struct ib_wc *wc)
 181{
 182        struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
 183        struct rtrs_sess *s = con->c.sess;
 184        struct rtrs_srv_sess *sess = to_srv_sess(s);
 185
 186        if (unlikely(wc->status != IB_WC_SUCCESS)) {
 187                rtrs_err(s, "REG MR failed: %s\n",
 188                          ib_wc_status_msg(wc->status));
 189                close_sess(sess);
 190                return;
 191        }
 192}
 193
 194static struct ib_cqe local_reg_cqe = {
 195        .done = rtrs_srv_reg_mr_done
 196};
 197
 198static int rdma_write_sg(struct rtrs_srv_op *id)
 199{
 200        struct rtrs_sess *s = id->con->c.sess;
 201        struct rtrs_srv_sess *sess = to_srv_sess(s);
 202        dma_addr_t dma_addr = sess->dma_addr[id->msg_id];
 203        struct rtrs_srv_mr *srv_mr;
 204        struct rtrs_srv *srv = sess->srv;
 205        struct ib_send_wr inv_wr;
 206        struct ib_rdma_wr imm_wr;
 207        struct ib_rdma_wr *wr = NULL;
 208        enum ib_send_flags flags;
 209        size_t sg_cnt;
 210        int err, offset;
 211        bool need_inval;
 212        u32 rkey = 0;
 213        struct ib_reg_wr rwr;
 214        struct ib_sge *plist;
 215        struct ib_sge list;
 216
 217        sg_cnt = le16_to_cpu(id->rd_msg->sg_cnt);
 218        need_inval = le16_to_cpu(id->rd_msg->flags) & RTRS_MSG_NEED_INVAL_F;
 219        if (unlikely(sg_cnt != 1))
 220                return -EINVAL;
 221
 222        offset = 0;
 223
 224        wr              = &id->tx_wr;
 225        plist           = &id->tx_sg;
 226        plist->addr     = dma_addr + offset;
 227        plist->length   = le32_to_cpu(id->rd_msg->desc[0].len);
 228
 229        /* WR will fail with length error
 230         * if this is 0
 231         */
 232        if (unlikely(plist->length == 0)) {
 233                rtrs_err(s, "Invalid RDMA-Write sg list length 0\n");
 234                return -EINVAL;
 235        }
 236
 237        plist->lkey = sess->s.dev->ib_pd->local_dma_lkey;
 238        offset += plist->length;
 239
 240        wr->wr.sg_list  = plist;
 241        wr->wr.num_sge  = 1;
 242        wr->remote_addr = le64_to_cpu(id->rd_msg->desc[0].addr);
 243        wr->rkey        = le32_to_cpu(id->rd_msg->desc[0].key);
 244        if (rkey == 0)
 245                rkey = wr->rkey;
 246        else
 247                /* Only one key is actually used */
 248                WARN_ON_ONCE(rkey != wr->rkey);
 249
 250        wr->wr.opcode = IB_WR_RDMA_WRITE;
 251        wr->wr.wr_cqe   = &io_comp_cqe;
 252        wr->wr.ex.imm_data = 0;
 253        wr->wr.send_flags  = 0;
 254
 255        if (need_inval && always_invalidate) {
 256                wr->wr.next = &rwr.wr;
 257                rwr.wr.next = &inv_wr;
 258                inv_wr.next = &imm_wr.wr;
 259        } else if (always_invalidate) {
 260                wr->wr.next = &rwr.wr;
 261                rwr.wr.next = &imm_wr.wr;
 262        } else if (need_inval) {
 263                wr->wr.next = &inv_wr;
 264                inv_wr.next = &imm_wr.wr;
 265        } else {
 266                wr->wr.next = &imm_wr.wr;
 267        }
 268        /*
 269         * From time to time we have to post signaled sends,
 270         * or send queue will fill up and only QP reset can help.
 271         */
 272        flags = (atomic_inc_return(&id->con->wr_cnt) % srv->queue_depth) ?
 273                0 : IB_SEND_SIGNALED;
 274
 275        if (need_inval) {
 276                inv_wr.sg_list = NULL;
 277                inv_wr.num_sge = 0;
 278                inv_wr.opcode = IB_WR_SEND_WITH_INV;
 279                inv_wr.wr_cqe   = &io_comp_cqe;
 280                inv_wr.send_flags = 0;
 281                inv_wr.ex.invalidate_rkey = rkey;
 282        }
 283
 284        imm_wr.wr.next = NULL;
 285        if (always_invalidate) {
 286                struct rtrs_msg_rkey_rsp *msg;
 287
 288                srv_mr = &sess->mrs[id->msg_id];
 289                rwr.wr.opcode = IB_WR_REG_MR;
 290                rwr.wr.wr_cqe = &local_reg_cqe;
 291                rwr.wr.num_sge = 0;
 292                rwr.mr = srv_mr->mr;
 293                rwr.wr.send_flags = 0;
 294                rwr.key = srv_mr->mr->rkey;
 295                rwr.access = (IB_ACCESS_LOCAL_WRITE |
 296                              IB_ACCESS_REMOTE_WRITE);
 297                msg = srv_mr->iu->buf;
 298                msg->buf_id = cpu_to_le16(id->msg_id);
 299                msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
 300                msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
 301
 302                list.addr   = srv_mr->iu->dma_addr;
 303                list.length = sizeof(*msg);
 304                list.lkey   = sess->s.dev->ib_pd->local_dma_lkey;
 305                imm_wr.wr.sg_list = &list;
 306                imm_wr.wr.num_sge = 1;
 307                imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM;
 308                ib_dma_sync_single_for_device(sess->s.dev->ib_dev,
 309                                              srv_mr->iu->dma_addr,
 310                                              srv_mr->iu->size, DMA_TO_DEVICE);
 311        } else {
 312                imm_wr.wr.sg_list = NULL;
 313                imm_wr.wr.num_sge = 0;
 314                imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
 315        }
 316        imm_wr.wr.send_flags = flags;
 317        imm_wr.wr.ex.imm_data = cpu_to_be32(rtrs_to_io_rsp_imm(id->msg_id,
 318                                                             0, need_inval));
 319
 320        imm_wr.wr.wr_cqe   = &io_comp_cqe;
 321        ib_dma_sync_single_for_device(sess->s.dev->ib_dev, dma_addr,
 322                                      offset, DMA_BIDIRECTIONAL);
 323
 324        err = ib_post_send(id->con->c.qp, &id->tx_wr.wr, NULL);
 325        if (unlikely(err))
 326                rtrs_err(s,
 327                          "Posting RDMA-Write-Request to QP failed, err: %d\n",
 328                          err);
 329
 330        return err;
 331}
 332
 333/**
 334 * send_io_resp_imm() - respond to client with empty IMM on failed READ/WRITE
 335 *                      requests or on successful WRITE request.
 336 * @con:        the connection to send back result
 337 * @id:         the id associated with the IO
 338 * @errno:      the error number of the IO.
 339 *
 340 * Return 0 on success, errno otherwise.
 341 */
 342static int send_io_resp_imm(struct rtrs_srv_con *con, struct rtrs_srv_op *id,
 343                            int errno)
 344{
 345        struct rtrs_sess *s = con->c.sess;
 346        struct rtrs_srv_sess *sess = to_srv_sess(s);
 347        struct ib_send_wr inv_wr, *wr = NULL;
 348        struct ib_rdma_wr imm_wr;
 349        struct ib_reg_wr rwr;
 350        struct rtrs_srv *srv = sess->srv;
 351        struct rtrs_srv_mr *srv_mr;
 352        bool need_inval = false;
 353        enum ib_send_flags flags;
 354        u32 imm;
 355        int err;
 356
 357        if (id->dir == READ) {
 358                struct rtrs_msg_rdma_read *rd_msg = id->rd_msg;
 359                size_t sg_cnt;
 360
 361                need_inval = le16_to_cpu(rd_msg->flags) &
 362                                RTRS_MSG_NEED_INVAL_F;
 363                sg_cnt = le16_to_cpu(rd_msg->sg_cnt);
 364
 365                if (need_inval) {
 366                        if (likely(sg_cnt)) {
 367                                inv_wr.wr_cqe   = &io_comp_cqe;
 368                                inv_wr.sg_list = NULL;
 369                                inv_wr.num_sge = 0;
 370                                inv_wr.opcode = IB_WR_SEND_WITH_INV;
 371                                inv_wr.send_flags = 0;
 372                                /* Only one key is actually used */
 373                                inv_wr.ex.invalidate_rkey =
 374                                        le32_to_cpu(rd_msg->desc[0].key);
 375                        } else {
 376                                WARN_ON_ONCE(1);
 377                                need_inval = false;
 378                        }
 379                }
 380        }
 381
 382        if (need_inval && always_invalidate) {
 383                wr = &inv_wr;
 384                inv_wr.next = &rwr.wr;
 385                rwr.wr.next = &imm_wr.wr;
 386        } else if (always_invalidate) {
 387                wr = &rwr.wr;
 388                rwr.wr.next = &imm_wr.wr;
 389        } else if (need_inval) {
 390                wr = &inv_wr;
 391                inv_wr.next = &imm_wr.wr;
 392        } else {
 393                wr = &imm_wr.wr;
 394        }
 395        /*
 396         * From time to time we have to post signalled sends,
 397         * or send queue will fill up and only QP reset can help.
 398         */
 399        flags = (atomic_inc_return(&con->wr_cnt) % srv->queue_depth) ?
 400                0 : IB_SEND_SIGNALED;
 401        imm = rtrs_to_io_rsp_imm(id->msg_id, errno, need_inval);
 402        imm_wr.wr.next = NULL;
 403        if (always_invalidate) {
 404                struct ib_sge list;
 405                struct rtrs_msg_rkey_rsp *msg;
 406
 407                srv_mr = &sess->mrs[id->msg_id];
 408                rwr.wr.next = &imm_wr.wr;
 409                rwr.wr.opcode = IB_WR_REG_MR;
 410                rwr.wr.wr_cqe = &local_reg_cqe;
 411                rwr.wr.num_sge = 0;
 412                rwr.wr.send_flags = 0;
 413                rwr.mr = srv_mr->mr;
 414                rwr.key = srv_mr->mr->rkey;
 415                rwr.access = (IB_ACCESS_LOCAL_WRITE |
 416                              IB_ACCESS_REMOTE_WRITE);
 417                msg = srv_mr->iu->buf;
 418                msg->buf_id = cpu_to_le16(id->msg_id);
 419                msg->type = cpu_to_le16(RTRS_MSG_RKEY_RSP);
 420                msg->rkey = cpu_to_le32(srv_mr->mr->rkey);
 421
 422                list.addr   = srv_mr->iu->dma_addr;
 423                list.length = sizeof(*msg);
 424                list.lkey   = sess->s.dev->ib_pd->local_dma_lkey;
 425                imm_wr.wr.sg_list = &list;
 426                imm_wr.wr.num_sge = 1;
 427                imm_wr.wr.opcode = IB_WR_SEND_WITH_IMM;
 428                ib_dma_sync_single_for_device(sess->s.dev->ib_dev,
 429                                              srv_mr->iu->dma_addr,
 430                                              srv_mr->iu->size, DMA_TO_DEVICE);
 431        } else {
 432                imm_wr.wr.sg_list = NULL;
 433                imm_wr.wr.num_sge = 0;
 434                imm_wr.wr.opcode = IB_WR_RDMA_WRITE_WITH_IMM;
 435        }
 436        imm_wr.wr.send_flags = flags;
 437        imm_wr.wr.wr_cqe   = &io_comp_cqe;
 438
 439        imm_wr.wr.ex.imm_data = cpu_to_be32(imm);
 440
 441        err = ib_post_send(id->con->c.qp, wr, NULL);
 442        if (unlikely(err))
 443                rtrs_err_rl(s, "Posting RDMA-Reply to QP failed, err: %d\n",
 444                             err);
 445
 446        return err;
 447}
 448
 449void close_sess(struct rtrs_srv_sess *sess)
 450{
 451        if (rtrs_srv_change_state(sess, RTRS_SRV_CLOSING))
 452                queue_work(rtrs_wq, &sess->close_work);
 453        WARN_ON(sess->state != RTRS_SRV_CLOSING);
 454}
 455
 456static inline const char *rtrs_srv_state_str(enum rtrs_srv_state state)
 457{
 458        switch (state) {
 459        case RTRS_SRV_CONNECTING:
 460                return "RTRS_SRV_CONNECTING";
 461        case RTRS_SRV_CONNECTED:
 462                return "RTRS_SRV_CONNECTED";
 463        case RTRS_SRV_CLOSING:
 464                return "RTRS_SRV_CLOSING";
 465        case RTRS_SRV_CLOSED:
 466                return "RTRS_SRV_CLOSED";
 467        default:
 468                return "UNKNOWN";
 469        }
 470}
 471
 472/**
 473 * rtrs_srv_resp_rdma() - Finish an RDMA request
 474 *
 475 * @id:         Internal RTRS operation identifier
 476 * @status:     Response Code sent to the other side for this operation.
 477 *              0 = success, <=0 error
 478 * Context: any
 479 *
 480 * Finish a RDMA operation. A message is sent to the client and the
 481 * corresponding memory areas will be released.
 482 */
 483bool rtrs_srv_resp_rdma(struct rtrs_srv_op *id, int status)
 484{
 485        struct rtrs_srv_sess *sess;
 486        struct rtrs_srv_con *con;
 487        struct rtrs_sess *s;
 488        int err;
 489
 490        if (WARN_ON(!id))
 491                return true;
 492
 493        con = id->con;
 494        s = con->c.sess;
 495        sess = to_srv_sess(s);
 496
 497        id->status = status;
 498
 499        if (unlikely(sess->state != RTRS_SRV_CONNECTED)) {
 500                rtrs_err_rl(s,
 501                            "Sending I/O response failed,  session %s is disconnected, sess state %s\n",
 502                            kobject_name(&sess->kobj),
 503                            rtrs_srv_state_str(sess->state));
 504                goto out;
 505        }
 506        if (always_invalidate) {
 507                struct rtrs_srv_mr *mr = &sess->mrs[id->msg_id];
 508
 509                ib_update_fast_reg_key(mr->mr, ib_inc_rkey(mr->mr->rkey));
 510        }
 511        if (unlikely(atomic_sub_return(1,
 512                                       &con->sq_wr_avail) < 0)) {
 513                rtrs_err(s, "IB send queue full: sess=%s cid=%d\n",
 514                         kobject_name(&sess->kobj),
 515                         con->c.cid);
 516                atomic_add(1, &con->sq_wr_avail);
 517                spin_lock(&con->rsp_wr_wait_lock);
 518                list_add_tail(&id->wait_list, &con->rsp_wr_wait_list);
 519                spin_unlock(&con->rsp_wr_wait_lock);
 520                return false;
 521        }
 522
 523        if (status || id->dir == WRITE || !id->rd_msg->sg_cnt)
 524                err = send_io_resp_imm(con, id, status);
 525        else
 526                err = rdma_write_sg(id);
 527
 528        if (unlikely(err)) {
 529                rtrs_err_rl(s, "IO response failed: %d: sess=%s\n", err,
 530                            kobject_name(&sess->kobj));
 531                close_sess(sess);
 532        }
 533out:
 534        rtrs_srv_put_ops_ids(sess);
 535        return true;
 536}
 537EXPORT_SYMBOL(rtrs_srv_resp_rdma);
 538
 539/**
 540 * rtrs_srv_set_sess_priv() - Set private pointer in rtrs_srv.
 541 * @srv:        Session pointer
 542 * @priv:       The private pointer that is associated with the session.
 543 */
 544void rtrs_srv_set_sess_priv(struct rtrs_srv *srv, void *priv)
 545{
 546        srv->priv = priv;
 547}
 548EXPORT_SYMBOL(rtrs_srv_set_sess_priv);
 549
 550static void unmap_cont_bufs(struct rtrs_srv_sess *sess)
 551{
 552        int i;
 553
 554        for (i = 0; i < sess->mrs_num; i++) {
 555                struct rtrs_srv_mr *srv_mr;
 556
 557                srv_mr = &sess->mrs[i];
 558                rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1);
 559                ib_dereg_mr(srv_mr->mr);
 560                ib_dma_unmap_sg(sess->s.dev->ib_dev, srv_mr->sgt.sgl,
 561                                srv_mr->sgt.nents, DMA_BIDIRECTIONAL);
 562                sg_free_table(&srv_mr->sgt);
 563        }
 564        kfree(sess->mrs);
 565}
 566
 567static int map_cont_bufs(struct rtrs_srv_sess *sess)
 568{
 569        struct rtrs_srv *srv = sess->srv;
 570        struct rtrs_sess *ss = &sess->s;
 571        int i, mri, err, mrs_num;
 572        unsigned int chunk_bits;
 573        int chunks_per_mr = 1;
 574
 575        /*
 576         * Here we map queue_depth chunks to MR.  Firstly we have to
 577         * figure out how many chunks can we map per MR.
 578         */
 579        if (always_invalidate) {
 580                /*
 581                 * in order to do invalidate for each chunks of memory, we needs
 582                 * more memory regions.
 583                 */
 584                mrs_num = srv->queue_depth;
 585        } else {
 586                chunks_per_mr =
 587                        sess->s.dev->ib_dev->attrs.max_fast_reg_page_list_len;
 588                mrs_num = DIV_ROUND_UP(srv->queue_depth, chunks_per_mr);
 589                chunks_per_mr = DIV_ROUND_UP(srv->queue_depth, mrs_num);
 590        }
 591
 592        sess->mrs = kcalloc(mrs_num, sizeof(*sess->mrs), GFP_KERNEL);
 593        if (!sess->mrs)
 594                return -ENOMEM;
 595
 596        sess->mrs_num = mrs_num;
 597
 598        for (mri = 0; mri < mrs_num; mri++) {
 599                struct rtrs_srv_mr *srv_mr = &sess->mrs[mri];
 600                struct sg_table *sgt = &srv_mr->sgt;
 601                struct scatterlist *s;
 602                struct ib_mr *mr;
 603                int nr, chunks;
 604
 605                chunks = chunks_per_mr * mri;
 606                if (!always_invalidate)
 607                        chunks_per_mr = min_t(int, chunks_per_mr,
 608                                              srv->queue_depth - chunks);
 609
 610                err = sg_alloc_table(sgt, chunks_per_mr, GFP_KERNEL);
 611                if (err)
 612                        goto err;
 613
 614                for_each_sg(sgt->sgl, s, chunks_per_mr, i)
 615                        sg_set_page(s, srv->chunks[chunks + i],
 616                                    max_chunk_size, 0);
 617
 618                nr = ib_dma_map_sg(sess->s.dev->ib_dev, sgt->sgl,
 619                                   sgt->nents, DMA_BIDIRECTIONAL);
 620                if (nr < sgt->nents) {
 621                        err = nr < 0 ? nr : -EINVAL;
 622                        goto free_sg;
 623                }
 624                mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG,
 625                                 sgt->nents);
 626                if (IS_ERR(mr)) {
 627                        err = PTR_ERR(mr);
 628                        goto unmap_sg;
 629                }
 630                nr = ib_map_mr_sg(mr, sgt->sgl, sgt->nents,
 631                                  NULL, max_chunk_size);
 632                if (nr < 0 || nr < sgt->nents) {
 633                        err = nr < 0 ? nr : -EINVAL;
 634                        goto dereg_mr;
 635                }
 636
 637                if (always_invalidate) {
 638                        srv_mr->iu = rtrs_iu_alloc(1,
 639                                        sizeof(struct rtrs_msg_rkey_rsp),
 640                                        GFP_KERNEL, sess->s.dev->ib_dev,
 641                                        DMA_TO_DEVICE, rtrs_srv_rdma_done);
 642                        if (!srv_mr->iu) {
 643                                err = -ENOMEM;
 644                                rtrs_err(ss, "rtrs_iu_alloc(), err: %d\n", err);
 645                                goto dereg_mr;
 646                        }
 647                }
 648                /* Eventually dma addr for each chunk can be cached */
 649                for_each_sg(sgt->sgl, s, sgt->orig_nents, i)
 650                        sess->dma_addr[chunks + i] = sg_dma_address(s);
 651
 652                ib_update_fast_reg_key(mr, ib_inc_rkey(mr->rkey));
 653                srv_mr->mr = mr;
 654
 655                continue;
 656err:
 657                while (mri--) {
 658                        srv_mr = &sess->mrs[mri];
 659                        sgt = &srv_mr->sgt;
 660                        mr = srv_mr->mr;
 661                        rtrs_iu_free(srv_mr->iu, sess->s.dev->ib_dev, 1);
 662dereg_mr:
 663                        ib_dereg_mr(mr);
 664unmap_sg:
 665                        ib_dma_unmap_sg(sess->s.dev->ib_dev, sgt->sgl,
 666                                        sgt->nents, DMA_BIDIRECTIONAL);
 667free_sg:
 668                        sg_free_table(sgt);
 669                }
 670                kfree(sess->mrs);
 671
 672                return err;
 673        }
 674
 675        chunk_bits = ilog2(srv->queue_depth - 1) + 1;
 676        sess->mem_bits = (MAX_IMM_PAYL_BITS - chunk_bits);
 677
 678        return 0;
 679}
 680
 681static void rtrs_srv_hb_err_handler(struct rtrs_con *c)
 682{
 683        close_sess(to_srv_sess(c->sess));
 684}
 685
 686static void rtrs_srv_init_hb(struct rtrs_srv_sess *sess)
 687{
 688        rtrs_init_hb(&sess->s, &io_comp_cqe,
 689                      RTRS_HB_INTERVAL_MS,
 690                      RTRS_HB_MISSED_MAX,
 691                      rtrs_srv_hb_err_handler,
 692                      rtrs_wq);
 693}
 694
 695static void rtrs_srv_start_hb(struct rtrs_srv_sess *sess)
 696{
 697        rtrs_start_hb(&sess->s);
 698}
 699
 700static void rtrs_srv_stop_hb(struct rtrs_srv_sess *sess)
 701{
 702        rtrs_stop_hb(&sess->s);
 703}
 704
 705static void rtrs_srv_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc)
 706{
 707        struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
 708        struct rtrs_sess *s = con->c.sess;
 709        struct rtrs_srv_sess *sess = to_srv_sess(s);
 710        struct rtrs_iu *iu;
 711
 712        iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
 713        rtrs_iu_free(iu, sess->s.dev->ib_dev, 1);
 714
 715        if (unlikely(wc->status != IB_WC_SUCCESS)) {
 716                rtrs_err(s, "Sess info response send failed: %s\n",
 717                          ib_wc_status_msg(wc->status));
 718                close_sess(sess);
 719                return;
 720        }
 721        WARN_ON(wc->opcode != IB_WC_SEND);
 722}
 723
 724static void rtrs_srv_sess_up(struct rtrs_srv_sess *sess)
 725{
 726        struct rtrs_srv *srv = sess->srv;
 727        struct rtrs_srv_ctx *ctx = srv->ctx;
 728        int up;
 729
 730        mutex_lock(&srv->paths_ev_mutex);
 731        up = ++srv->paths_up;
 732        if (up == 1)
 733                ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_CONNECTED, NULL);
 734        mutex_unlock(&srv->paths_ev_mutex);
 735
 736        /* Mark session as established */
 737        sess->established = true;
 738}
 739
 740static void rtrs_srv_sess_down(struct rtrs_srv_sess *sess)
 741{
 742        struct rtrs_srv *srv = sess->srv;
 743        struct rtrs_srv_ctx *ctx = srv->ctx;
 744
 745        if (!sess->established)
 746                return;
 747
 748        sess->established = false;
 749        mutex_lock(&srv->paths_ev_mutex);
 750        WARN_ON(!srv->paths_up);
 751        if (--srv->paths_up == 0)
 752                ctx->ops.link_ev(srv, RTRS_SRV_LINK_EV_DISCONNECTED, srv->priv);
 753        mutex_unlock(&srv->paths_ev_mutex);
 754}
 755
 756static bool exist_sessname(struct rtrs_srv_ctx *ctx,
 757                           const char *sessname, const uuid_t *path_uuid)
 758{
 759        struct rtrs_srv *srv;
 760        struct rtrs_srv_sess *sess;
 761        bool found = false;
 762
 763        mutex_lock(&ctx->srv_mutex);
 764        list_for_each_entry(srv, &ctx->srv_list, ctx_list) {
 765                mutex_lock(&srv->paths_mutex);
 766
 767                /* when a client with same uuid and same sessname tried to add a path */
 768                if (uuid_equal(&srv->paths_uuid, path_uuid)) {
 769                        mutex_unlock(&srv->paths_mutex);
 770                        continue;
 771                }
 772
 773                list_for_each_entry(sess, &srv->paths_list, s.entry) {
 774                        if (strlen(sess->s.sessname) == strlen(sessname) &&
 775                            !strcmp(sess->s.sessname, sessname)) {
 776                                found = true;
 777                                break;
 778                        }
 779                }
 780                mutex_unlock(&srv->paths_mutex);
 781                if (found)
 782                        break;
 783        }
 784        mutex_unlock(&ctx->srv_mutex);
 785        return found;
 786}
 787
 788static int post_recv_sess(struct rtrs_srv_sess *sess);
 789static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno);
 790
 791static int process_info_req(struct rtrs_srv_con *con,
 792                            struct rtrs_msg_info_req *msg)
 793{
 794        struct rtrs_sess *s = con->c.sess;
 795        struct rtrs_srv_sess *sess = to_srv_sess(s);
 796        struct ib_send_wr *reg_wr = NULL;
 797        struct rtrs_msg_info_rsp *rsp;
 798        struct rtrs_iu *tx_iu;
 799        struct ib_reg_wr *rwr;
 800        int mri, err;
 801        size_t tx_sz;
 802
 803        err = post_recv_sess(sess);
 804        if (unlikely(err)) {
 805                rtrs_err(s, "post_recv_sess(), err: %d\n", err);
 806                return err;
 807        }
 808
 809        if (exist_sessname(sess->srv->ctx,
 810                           msg->sessname, &sess->srv->paths_uuid)) {
 811                rtrs_err(s, "sessname is duplicated: %s\n", msg->sessname);
 812                return -EPERM;
 813        }
 814        strscpy(sess->s.sessname, msg->sessname, sizeof(sess->s.sessname));
 815
 816        rwr = kcalloc(sess->mrs_num, sizeof(*rwr), GFP_KERNEL);
 817        if (unlikely(!rwr))
 818                return -ENOMEM;
 819
 820        tx_sz  = sizeof(*rsp);
 821        tx_sz += sizeof(rsp->desc[0]) * sess->mrs_num;
 822        tx_iu = rtrs_iu_alloc(1, tx_sz, GFP_KERNEL, sess->s.dev->ib_dev,
 823                               DMA_TO_DEVICE, rtrs_srv_info_rsp_done);
 824        if (unlikely(!tx_iu)) {
 825                err = -ENOMEM;
 826                goto rwr_free;
 827        }
 828
 829        rsp = tx_iu->buf;
 830        rsp->type = cpu_to_le16(RTRS_MSG_INFO_RSP);
 831        rsp->sg_cnt = cpu_to_le16(sess->mrs_num);
 832
 833        for (mri = 0; mri < sess->mrs_num; mri++) {
 834                struct ib_mr *mr = sess->mrs[mri].mr;
 835
 836                rsp->desc[mri].addr = cpu_to_le64(mr->iova);
 837                rsp->desc[mri].key  = cpu_to_le32(mr->rkey);
 838                rsp->desc[mri].len  = cpu_to_le32(mr->length);
 839
 840                /*
 841                 * Fill in reg MR request and chain them *backwards*
 842                 */
 843                rwr[mri].wr.next = mri ? &rwr[mri - 1].wr : NULL;
 844                rwr[mri].wr.opcode = IB_WR_REG_MR;
 845                rwr[mri].wr.wr_cqe = &local_reg_cqe;
 846                rwr[mri].wr.num_sge = 0;
 847                rwr[mri].wr.send_flags = 0;
 848                rwr[mri].mr = mr;
 849                rwr[mri].key = mr->rkey;
 850                rwr[mri].access = (IB_ACCESS_LOCAL_WRITE |
 851                                   IB_ACCESS_REMOTE_WRITE);
 852                reg_wr = &rwr[mri].wr;
 853        }
 854
 855        err = rtrs_srv_create_sess_files(sess);
 856        if (unlikely(err))
 857                goto iu_free;
 858        kobject_get(&sess->kobj);
 859        get_device(&sess->srv->dev);
 860        rtrs_srv_change_state(sess, RTRS_SRV_CONNECTED);
 861        rtrs_srv_start_hb(sess);
 862
 863        /*
 864         * We do not account number of established connections at the current
 865         * moment, we rely on the client, which should send info request when
 866         * all connections are successfully established.  Thus, simply notify
 867         * listener with a proper event if we are the first path.
 868         */
 869        rtrs_srv_sess_up(sess);
 870
 871        ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr,
 872                                      tx_iu->size, DMA_TO_DEVICE);
 873
 874        /* Send info response */
 875        err = rtrs_iu_post_send(&con->c, tx_iu, tx_sz, reg_wr);
 876        if (unlikely(err)) {
 877                rtrs_err(s, "rtrs_iu_post_send(), err: %d\n", err);
 878iu_free:
 879                rtrs_iu_free(tx_iu, sess->s.dev->ib_dev, 1);
 880        }
 881rwr_free:
 882        kfree(rwr);
 883
 884        return err;
 885}
 886
 887static void rtrs_srv_info_req_done(struct ib_cq *cq, struct ib_wc *wc)
 888{
 889        struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
 890        struct rtrs_sess *s = con->c.sess;
 891        struct rtrs_srv_sess *sess = to_srv_sess(s);
 892        struct rtrs_msg_info_req *msg;
 893        struct rtrs_iu *iu;
 894        int err;
 895
 896        WARN_ON(con->c.cid);
 897
 898        iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
 899        if (unlikely(wc->status != IB_WC_SUCCESS)) {
 900                rtrs_err(s, "Sess info request receive failed: %s\n",
 901                          ib_wc_status_msg(wc->status));
 902                goto close;
 903        }
 904        WARN_ON(wc->opcode != IB_WC_RECV);
 905
 906        if (unlikely(wc->byte_len < sizeof(*msg))) {
 907                rtrs_err(s, "Sess info request is malformed: size %d\n",
 908                          wc->byte_len);
 909                goto close;
 910        }
 911        ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr,
 912                                   iu->size, DMA_FROM_DEVICE);
 913        msg = iu->buf;
 914        if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_INFO_REQ)) {
 915                rtrs_err(s, "Sess info request is malformed: type %d\n",
 916                          le16_to_cpu(msg->type));
 917                goto close;
 918        }
 919        err = process_info_req(con, msg);
 920        if (unlikely(err))
 921                goto close;
 922
 923out:
 924        rtrs_iu_free(iu, sess->s.dev->ib_dev, 1);
 925        return;
 926close:
 927        close_sess(sess);
 928        goto out;
 929}
 930
 931static int post_recv_info_req(struct rtrs_srv_con *con)
 932{
 933        struct rtrs_sess *s = con->c.sess;
 934        struct rtrs_srv_sess *sess = to_srv_sess(s);
 935        struct rtrs_iu *rx_iu;
 936        int err;
 937
 938        rx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req),
 939                               GFP_KERNEL, sess->s.dev->ib_dev,
 940                               DMA_FROM_DEVICE, rtrs_srv_info_req_done);
 941        if (unlikely(!rx_iu))
 942                return -ENOMEM;
 943        /* Prepare for getting info response */
 944        err = rtrs_iu_post_recv(&con->c, rx_iu);
 945        if (unlikely(err)) {
 946                rtrs_err(s, "rtrs_iu_post_recv(), err: %d\n", err);
 947                rtrs_iu_free(rx_iu, sess->s.dev->ib_dev, 1);
 948                return err;
 949        }
 950
 951        return 0;
 952}
 953
 954static int post_recv_io(struct rtrs_srv_con *con, size_t q_size)
 955{
 956        int i, err;
 957
 958        for (i = 0; i < q_size; i++) {
 959                err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
 960                if (unlikely(err))
 961                        return err;
 962        }
 963
 964        return 0;
 965}
 966
 967static int post_recv_sess(struct rtrs_srv_sess *sess)
 968{
 969        struct rtrs_srv *srv = sess->srv;
 970        struct rtrs_sess *s = &sess->s;
 971        size_t q_size;
 972        int err, cid;
 973
 974        for (cid = 0; cid < sess->s.con_num; cid++) {
 975                if (cid == 0)
 976                        q_size = SERVICE_CON_QUEUE_DEPTH;
 977                else
 978                        q_size = srv->queue_depth;
 979
 980                err = post_recv_io(to_srv_con(sess->s.con[cid]), q_size);
 981                if (unlikely(err)) {
 982                        rtrs_err(s, "post_recv_io(), err: %d\n", err);
 983                        return err;
 984                }
 985        }
 986
 987        return 0;
 988}
 989
 990static void process_read(struct rtrs_srv_con *con,
 991                         struct rtrs_msg_rdma_read *msg,
 992                         u32 buf_id, u32 off)
 993{
 994        struct rtrs_sess *s = con->c.sess;
 995        struct rtrs_srv_sess *sess = to_srv_sess(s);
 996        struct rtrs_srv *srv = sess->srv;
 997        struct rtrs_srv_ctx *ctx = srv->ctx;
 998        struct rtrs_srv_op *id;
 999
1000        size_t usr_len, data_len;
1001        void *data;
1002        int ret;
1003
1004        if (unlikely(sess->state != RTRS_SRV_CONNECTED)) {
1005                rtrs_err_rl(s,
1006                             "Processing read request failed,  session is disconnected, sess state %s\n",
1007                             rtrs_srv_state_str(sess->state));
1008                return;
1009        }
1010        if (unlikely(msg->sg_cnt != 1 && msg->sg_cnt != 0)) {
1011                rtrs_err_rl(s,
1012                            "Processing read request failed, invalid message\n");
1013                return;
1014        }
1015        rtrs_srv_get_ops_ids(sess);
1016        rtrs_srv_update_rdma_stats(sess->stats, off, READ);
1017        id = sess->ops_ids[buf_id];
1018        id->con         = con;
1019        id->dir         = READ;
1020        id->msg_id      = buf_id;
1021        id->rd_msg      = msg;
1022        usr_len = le16_to_cpu(msg->usr_len);
1023        data_len = off - usr_len;
1024        data = page_address(srv->chunks[buf_id]);
1025        ret = ctx->ops.rdma_ev(srv->priv, id, READ, data, data_len,
1026                           data + data_len, usr_len);
1027
1028        if (unlikely(ret)) {
1029                rtrs_err_rl(s,
1030                             "Processing read request failed, user module cb reported for msg_id %d, err: %d\n",
1031                             buf_id, ret);
1032                goto send_err_msg;
1033        }
1034
1035        return;
1036
1037send_err_msg:
1038        ret = send_io_resp_imm(con, id, ret);
1039        if (ret < 0) {
1040                rtrs_err_rl(s,
1041                             "Sending err msg for failed RDMA-Write-Req failed, msg_id %d, err: %d\n",
1042                             buf_id, ret);
1043                close_sess(sess);
1044        }
1045        rtrs_srv_put_ops_ids(sess);
1046}
1047
1048static void process_write(struct rtrs_srv_con *con,
1049                          struct rtrs_msg_rdma_write *req,
1050                          u32 buf_id, u32 off)
1051{
1052        struct rtrs_sess *s = con->c.sess;
1053        struct rtrs_srv_sess *sess = to_srv_sess(s);
1054        struct rtrs_srv *srv = sess->srv;
1055        struct rtrs_srv_ctx *ctx = srv->ctx;
1056        struct rtrs_srv_op *id;
1057
1058        size_t data_len, usr_len;
1059        void *data;
1060        int ret;
1061
1062        if (unlikely(sess->state != RTRS_SRV_CONNECTED)) {
1063                rtrs_err_rl(s,
1064                             "Processing write request failed,  session is disconnected, sess state %s\n",
1065                             rtrs_srv_state_str(sess->state));
1066                return;
1067        }
1068        rtrs_srv_get_ops_ids(sess);
1069        rtrs_srv_update_rdma_stats(sess->stats, off, WRITE);
1070        id = sess->ops_ids[buf_id];
1071        id->con    = con;
1072        id->dir    = WRITE;
1073        id->msg_id = buf_id;
1074
1075        usr_len = le16_to_cpu(req->usr_len);
1076        data_len = off - usr_len;
1077        data = page_address(srv->chunks[buf_id]);
1078        ret = ctx->ops.rdma_ev(srv->priv, id, WRITE, data, data_len,
1079                           data + data_len, usr_len);
1080        if (unlikely(ret)) {
1081                rtrs_err_rl(s,
1082                             "Processing write request failed, user module callback reports err: %d\n",
1083                             ret);
1084                goto send_err_msg;
1085        }
1086
1087        return;
1088
1089send_err_msg:
1090        ret = send_io_resp_imm(con, id, ret);
1091        if (ret < 0) {
1092                rtrs_err_rl(s,
1093                             "Processing write request failed, sending I/O response failed, msg_id %d, err: %d\n",
1094                             buf_id, ret);
1095                close_sess(sess);
1096        }
1097        rtrs_srv_put_ops_ids(sess);
1098}
1099
1100static void process_io_req(struct rtrs_srv_con *con, void *msg,
1101                           u32 id, u32 off)
1102{
1103        struct rtrs_sess *s = con->c.sess;
1104        struct rtrs_srv_sess *sess = to_srv_sess(s);
1105        struct rtrs_msg_rdma_hdr *hdr;
1106        unsigned int type;
1107
1108        ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, sess->dma_addr[id],
1109                                   max_chunk_size, DMA_BIDIRECTIONAL);
1110        hdr = msg;
1111        type = le16_to_cpu(hdr->type);
1112
1113        switch (type) {
1114        case RTRS_MSG_WRITE:
1115                process_write(con, msg, id, off);
1116                break;
1117        case RTRS_MSG_READ:
1118                process_read(con, msg, id, off);
1119                break;
1120        default:
1121                rtrs_err(s,
1122                          "Processing I/O request failed, unknown message type received: 0x%02x\n",
1123                          type);
1124                goto err;
1125        }
1126
1127        return;
1128
1129err:
1130        close_sess(sess);
1131}
1132
1133static void rtrs_srv_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc)
1134{
1135        struct rtrs_srv_mr *mr =
1136                container_of(wc->wr_cqe, typeof(*mr), inv_cqe);
1137        struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
1138        struct rtrs_sess *s = con->c.sess;
1139        struct rtrs_srv_sess *sess = to_srv_sess(s);
1140        struct rtrs_srv *srv = sess->srv;
1141        u32 msg_id, off;
1142        void *data;
1143
1144        if (unlikely(wc->status != IB_WC_SUCCESS)) {
1145                rtrs_err(s, "Failed IB_WR_LOCAL_INV: %s\n",
1146                          ib_wc_status_msg(wc->status));
1147                close_sess(sess);
1148        }
1149        msg_id = mr->msg_id;
1150        off = mr->msg_off;
1151        data = page_address(srv->chunks[msg_id]) + off;
1152        process_io_req(con, data, msg_id, off);
1153}
1154
1155static int rtrs_srv_inv_rkey(struct rtrs_srv_con *con,
1156                              struct rtrs_srv_mr *mr)
1157{
1158        struct ib_send_wr wr = {
1159                .opcode             = IB_WR_LOCAL_INV,
1160                .wr_cqe             = &mr->inv_cqe,
1161                .send_flags         = IB_SEND_SIGNALED,
1162                .ex.invalidate_rkey = mr->mr->rkey,
1163        };
1164        mr->inv_cqe.done = rtrs_srv_inv_rkey_done;
1165
1166        return ib_post_send(con->c.qp, &wr, NULL);
1167}
1168
1169static void rtrs_rdma_process_wr_wait_list(struct rtrs_srv_con *con)
1170{
1171        spin_lock(&con->rsp_wr_wait_lock);
1172        while (!list_empty(&con->rsp_wr_wait_list)) {
1173                struct rtrs_srv_op *id;
1174                int ret;
1175
1176                id = list_entry(con->rsp_wr_wait_list.next,
1177                                struct rtrs_srv_op, wait_list);
1178                list_del(&id->wait_list);
1179
1180                spin_unlock(&con->rsp_wr_wait_lock);
1181                ret = rtrs_srv_resp_rdma(id, id->status);
1182                spin_lock(&con->rsp_wr_wait_lock);
1183
1184                if (!ret) {
1185                        list_add(&id->wait_list, &con->rsp_wr_wait_list);
1186                        break;
1187                }
1188        }
1189        spin_unlock(&con->rsp_wr_wait_lock);
1190}
1191
1192static void rtrs_srv_rdma_done(struct ib_cq *cq, struct ib_wc *wc)
1193{
1194        struct rtrs_srv_con *con = to_srv_con(wc->qp->qp_context);
1195        struct rtrs_sess *s = con->c.sess;
1196        struct rtrs_srv_sess *sess = to_srv_sess(s);
1197        struct rtrs_srv *srv = sess->srv;
1198        u32 imm_type, imm_payload;
1199        int err;
1200
1201        if (unlikely(wc->status != IB_WC_SUCCESS)) {
1202                if (wc->status != IB_WC_WR_FLUSH_ERR) {
1203                        rtrs_err(s,
1204                                  "%s (wr_cqe: %p, type: %d, vendor_err: 0x%x, len: %u)\n",
1205                                  ib_wc_status_msg(wc->status), wc->wr_cqe,
1206                                  wc->opcode, wc->vendor_err, wc->byte_len);
1207                        close_sess(sess);
1208                }
1209                return;
1210        }
1211
1212        switch (wc->opcode) {
1213        case IB_WC_RECV_RDMA_WITH_IMM:
1214                /*
1215                 * post_recv() RDMA write completions of IO reqs (read/write)
1216                 * and hb
1217                 */
1218                if (WARN_ON(wc->wr_cqe != &io_comp_cqe))
1219                        return;
1220                err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
1221                if (unlikely(err)) {
1222                        rtrs_err(s, "rtrs_post_recv(), err: %d\n", err);
1223                        close_sess(sess);
1224                        break;
1225                }
1226                rtrs_from_imm(be32_to_cpu(wc->ex.imm_data),
1227                               &imm_type, &imm_payload);
1228                if (likely(imm_type == RTRS_IO_REQ_IMM)) {
1229                        u32 msg_id, off;
1230                        void *data;
1231
1232                        msg_id = imm_payload >> sess->mem_bits;
1233                        off = imm_payload & ((1 << sess->mem_bits) - 1);
1234                        if (unlikely(msg_id >= srv->queue_depth ||
1235                                     off >= max_chunk_size)) {
1236                                rtrs_err(s, "Wrong msg_id %u, off %u\n",
1237                                          msg_id, off);
1238                                close_sess(sess);
1239                                return;
1240                        }
1241                        if (always_invalidate) {
1242                                struct rtrs_srv_mr *mr = &sess->mrs[msg_id];
1243
1244                                mr->msg_off = off;
1245                                mr->msg_id = msg_id;
1246                                err = rtrs_srv_inv_rkey(con, mr);
1247                                if (unlikely(err)) {
1248                                        rtrs_err(s, "rtrs_post_recv(), err: %d\n",
1249                                                  err);
1250                                        close_sess(sess);
1251                                        break;
1252                                }
1253                        } else {
1254                                data = page_address(srv->chunks[msg_id]) + off;
1255                                process_io_req(con, data, msg_id, off);
1256                        }
1257                } else if (imm_type == RTRS_HB_MSG_IMM) {
1258                        WARN_ON(con->c.cid);
1259                        rtrs_send_hb_ack(&sess->s);
1260                } else if (imm_type == RTRS_HB_ACK_IMM) {
1261                        WARN_ON(con->c.cid);
1262                        sess->s.hb_missed_cnt = 0;
1263                } else {
1264                        rtrs_wrn(s, "Unknown IMM type %u\n", imm_type);
1265                }
1266                break;
1267        case IB_WC_RDMA_WRITE:
1268        case IB_WC_SEND:
1269                /*
1270                 * post_send() RDMA write completions of IO reqs (read/write)
1271                 */
1272                atomic_add(srv->queue_depth, &con->sq_wr_avail);
1273
1274                if (unlikely(!list_empty_careful(&con->rsp_wr_wait_list)))
1275                        rtrs_rdma_process_wr_wait_list(con);
1276
1277                break;
1278        default:
1279                rtrs_wrn(s, "Unexpected WC type: %d\n", wc->opcode);
1280                return;
1281        }
1282}
1283
1284/**
1285 * rtrs_srv_get_sess_name() - Get rtrs_srv peer hostname.
1286 * @srv:        Session
1287 * @sessname:   Sessname buffer
1288 * @len:        Length of sessname buffer
1289 */
1290int rtrs_srv_get_sess_name(struct rtrs_srv *srv, char *sessname, size_t len)
1291{
1292        struct rtrs_srv_sess *sess;
1293        int err = -ENOTCONN;
1294
1295        mutex_lock(&srv->paths_mutex);
1296        list_for_each_entry(sess, &srv->paths_list, s.entry) {
1297                if (sess->state != RTRS_SRV_CONNECTED)
1298                        continue;
1299                strscpy(sessname, sess->s.sessname,
1300                       min_t(size_t, sizeof(sess->s.sessname), len));
1301                err = 0;
1302                break;
1303        }
1304        mutex_unlock(&srv->paths_mutex);
1305
1306        return err;
1307}
1308EXPORT_SYMBOL(rtrs_srv_get_sess_name);
1309
1310/**
1311 * rtrs_srv_get_queue_depth() - Get rtrs_srv qdepth.
1312 * @srv:        Session
1313 */
1314int rtrs_srv_get_queue_depth(struct rtrs_srv *srv)
1315{
1316        return srv->queue_depth;
1317}
1318EXPORT_SYMBOL(rtrs_srv_get_queue_depth);
1319
1320static int find_next_bit_ring(struct rtrs_srv_sess *sess)
1321{
1322        struct ib_device *ib_dev = sess->s.dev->ib_dev;
1323        int v;
1324
1325        v = cpumask_next(sess->cur_cq_vector, &cq_affinity_mask);
1326        if (v >= nr_cpu_ids || v >= ib_dev->num_comp_vectors)
1327                v = cpumask_first(&cq_affinity_mask);
1328        return v;
1329}
1330
1331static int rtrs_srv_get_next_cq_vector(struct rtrs_srv_sess *sess)
1332{
1333        sess->cur_cq_vector = find_next_bit_ring(sess);
1334
1335        return sess->cur_cq_vector;
1336}
1337
1338static void rtrs_srv_dev_release(struct device *dev)
1339{
1340        struct rtrs_srv *srv = container_of(dev, struct rtrs_srv, dev);
1341
1342        kfree(srv);
1343}
1344
1345static void free_srv(struct rtrs_srv *srv)
1346{
1347        int i;
1348
1349        WARN_ON(refcount_read(&srv->refcount));
1350        for (i = 0; i < srv->queue_depth; i++)
1351                mempool_free(srv->chunks[i], chunk_pool);
1352        kfree(srv->chunks);
1353        mutex_destroy(&srv->paths_mutex);
1354        mutex_destroy(&srv->paths_ev_mutex);
1355        /* last put to release the srv structure */
1356        put_device(&srv->dev);
1357}
1358
1359static struct rtrs_srv *get_or_create_srv(struct rtrs_srv_ctx *ctx,
1360                                          const uuid_t *paths_uuid,
1361                                          bool first_conn)
1362{
1363        struct rtrs_srv *srv;
1364        int i;
1365
1366        mutex_lock(&ctx->srv_mutex);
1367        list_for_each_entry(srv, &ctx->srv_list, ctx_list) {
1368                if (uuid_equal(&srv->paths_uuid, paths_uuid) &&
1369                    refcount_inc_not_zero(&srv->refcount)) {
1370                        mutex_unlock(&ctx->srv_mutex);
1371                        return srv;
1372                }
1373        }
1374        mutex_unlock(&ctx->srv_mutex);
1375        /*
1376         * If this request is not the first connection request from the
1377         * client for this session then fail and return error.
1378         */
1379        if (!first_conn) {
1380                pr_err_ratelimited("Error: Not the first connection request for this session\n");
1381                return ERR_PTR(-ENXIO);
1382        }
1383
1384        /* need to allocate a new srv */
1385        srv = kzalloc(sizeof(*srv), GFP_KERNEL);
1386        if  (!srv)
1387                return ERR_PTR(-ENOMEM);
1388
1389        INIT_LIST_HEAD(&srv->paths_list);
1390        mutex_init(&srv->paths_mutex);
1391        mutex_init(&srv->paths_ev_mutex);
1392        uuid_copy(&srv->paths_uuid, paths_uuid);
1393        srv->queue_depth = sess_queue_depth;
1394        srv->ctx = ctx;
1395        device_initialize(&srv->dev);
1396        srv->dev.release = rtrs_srv_dev_release;
1397
1398        srv->chunks = kcalloc(srv->queue_depth, sizeof(*srv->chunks),
1399                              GFP_KERNEL);
1400        if (!srv->chunks)
1401                goto err_free_srv;
1402
1403        for (i = 0; i < srv->queue_depth; i++) {
1404                srv->chunks[i] = mempool_alloc(chunk_pool, GFP_KERNEL);
1405                if (!srv->chunks[i])
1406                        goto err_free_chunks;
1407        }
1408        refcount_set(&srv->refcount, 1);
1409        mutex_lock(&ctx->srv_mutex);
1410        list_add(&srv->ctx_list, &ctx->srv_list);
1411        mutex_unlock(&ctx->srv_mutex);
1412
1413        return srv;
1414
1415err_free_chunks:
1416        while (i--)
1417                mempool_free(srv->chunks[i], chunk_pool);
1418        kfree(srv->chunks);
1419
1420err_free_srv:
1421        kfree(srv);
1422        return ERR_PTR(-ENOMEM);
1423}
1424
1425static void put_srv(struct rtrs_srv *srv)
1426{
1427        if (refcount_dec_and_test(&srv->refcount)) {
1428                struct rtrs_srv_ctx *ctx = srv->ctx;
1429
1430                WARN_ON(srv->dev.kobj.state_in_sysfs);
1431
1432                mutex_lock(&ctx->srv_mutex);
1433                list_del(&srv->ctx_list);
1434                mutex_unlock(&ctx->srv_mutex);
1435                free_srv(srv);
1436        }
1437}
1438
1439static void __add_path_to_srv(struct rtrs_srv *srv,
1440                              struct rtrs_srv_sess *sess)
1441{
1442        list_add_tail(&sess->s.entry, &srv->paths_list);
1443        srv->paths_num++;
1444        WARN_ON(srv->paths_num >= MAX_PATHS_NUM);
1445}
1446
1447static void del_path_from_srv(struct rtrs_srv_sess *sess)
1448{
1449        struct rtrs_srv *srv = sess->srv;
1450
1451        if (WARN_ON(!srv))
1452                return;
1453
1454        mutex_lock(&srv->paths_mutex);
1455        list_del(&sess->s.entry);
1456        WARN_ON(!srv->paths_num);
1457        srv->paths_num--;
1458        mutex_unlock(&srv->paths_mutex);
1459}
1460
1461/* return true if addresses are the same, error other wise */
1462static int sockaddr_cmp(const struct sockaddr *a, const struct sockaddr *b)
1463{
1464        switch (a->sa_family) {
1465        case AF_IB:
1466                return memcmp(&((struct sockaddr_ib *)a)->sib_addr,
1467                              &((struct sockaddr_ib *)b)->sib_addr,
1468                              sizeof(struct ib_addr)) &&
1469                        (b->sa_family == AF_IB);
1470        case AF_INET:
1471                return memcmp(&((struct sockaddr_in *)a)->sin_addr,
1472                              &((struct sockaddr_in *)b)->sin_addr,
1473                              sizeof(struct in_addr)) &&
1474                        (b->sa_family == AF_INET);
1475        case AF_INET6:
1476                return memcmp(&((struct sockaddr_in6 *)a)->sin6_addr,
1477                              &((struct sockaddr_in6 *)b)->sin6_addr,
1478                              sizeof(struct in6_addr)) &&
1479                        (b->sa_family == AF_INET6);
1480        default:
1481                return -ENOENT;
1482        }
1483}
1484
1485static bool __is_path_w_addr_exists(struct rtrs_srv *srv,
1486                                    struct rdma_addr *addr)
1487{
1488        struct rtrs_srv_sess *sess;
1489
1490        list_for_each_entry(sess, &srv->paths_list, s.entry)
1491                if (!sockaddr_cmp((struct sockaddr *)&sess->s.dst_addr,
1492                                  (struct sockaddr *)&addr->dst_addr) &&
1493                    !sockaddr_cmp((struct sockaddr *)&sess->s.src_addr,
1494                                  (struct sockaddr *)&addr->src_addr))
1495                        return true;
1496
1497        return false;
1498}
1499
1500static void free_sess(struct rtrs_srv_sess *sess)
1501{
1502        if (sess->kobj.state_in_sysfs) {
1503                kobject_del(&sess->kobj);
1504                kobject_put(&sess->kobj);
1505        } else {
1506                kfree(sess->stats);
1507                kfree(sess);
1508        }
1509}
1510
1511static void rtrs_srv_close_work(struct work_struct *work)
1512{
1513        struct rtrs_srv_sess *sess;
1514        struct rtrs_srv_con *con;
1515        int i;
1516
1517        sess = container_of(work, typeof(*sess), close_work);
1518
1519        rtrs_srv_destroy_sess_files(sess);
1520        rtrs_srv_stop_hb(sess);
1521
1522        for (i = 0; i < sess->s.con_num; i++) {
1523                if (!sess->s.con[i])
1524                        continue;
1525                con = to_srv_con(sess->s.con[i]);
1526                rdma_disconnect(con->c.cm_id);
1527                ib_drain_qp(con->c.qp);
1528        }
1529
1530        /*
1531         * Degrade ref count to the usual model with a single shared
1532         * atomic_t counter
1533         */
1534        percpu_ref_kill(&sess->ids_inflight_ref);
1535
1536        /* Wait for all completion */
1537        wait_for_completion(&sess->complete_done);
1538
1539        /* Notify upper layer if we are the last path */
1540        rtrs_srv_sess_down(sess);
1541
1542        unmap_cont_bufs(sess);
1543        rtrs_srv_free_ops_ids(sess);
1544
1545        for (i = 0; i < sess->s.con_num; i++) {
1546                if (!sess->s.con[i])
1547                        continue;
1548                con = to_srv_con(sess->s.con[i]);
1549                rtrs_cq_qp_destroy(&con->c);
1550                rdma_destroy_id(con->c.cm_id);
1551                kfree(con);
1552        }
1553        rtrs_ib_dev_put(sess->s.dev);
1554
1555        del_path_from_srv(sess);
1556        put_srv(sess->srv);
1557        sess->srv = NULL;
1558        rtrs_srv_change_state(sess, RTRS_SRV_CLOSED);
1559
1560        kfree(sess->dma_addr);
1561        kfree(sess->s.con);
1562        free_sess(sess);
1563}
1564
1565static int rtrs_rdma_do_accept(struct rtrs_srv_sess *sess,
1566                               struct rdma_cm_id *cm_id)
1567{
1568        struct rtrs_srv *srv = sess->srv;
1569        struct rtrs_msg_conn_rsp msg;
1570        struct rdma_conn_param param;
1571        int err;
1572
1573        param = (struct rdma_conn_param) {
1574                .rnr_retry_count = 7,
1575                .private_data = &msg,
1576                .private_data_len = sizeof(msg),
1577        };
1578
1579        msg = (struct rtrs_msg_conn_rsp) {
1580                .magic = cpu_to_le16(RTRS_MAGIC),
1581                .version = cpu_to_le16(RTRS_PROTO_VER),
1582                .queue_depth = cpu_to_le16(srv->queue_depth),
1583                .max_io_size = cpu_to_le32(max_chunk_size - MAX_HDR_SIZE),
1584                .max_hdr_size = cpu_to_le32(MAX_HDR_SIZE),
1585        };
1586
1587        if (always_invalidate)
1588                msg.flags = cpu_to_le32(RTRS_MSG_NEW_RKEY_F);
1589
1590        err = rdma_accept(cm_id, &param);
1591        if (err)
1592                pr_err("rdma_accept(), err: %d\n", err);
1593
1594        return err;
1595}
1596
1597static int rtrs_rdma_do_reject(struct rdma_cm_id *cm_id, int errno)
1598{
1599        struct rtrs_msg_conn_rsp msg;
1600        int err;
1601
1602        msg = (struct rtrs_msg_conn_rsp) {
1603                .magic = cpu_to_le16(RTRS_MAGIC),
1604                .version = cpu_to_le16(RTRS_PROTO_VER),
1605                .errno = cpu_to_le16(errno),
1606        };
1607
1608        err = rdma_reject(cm_id, &msg, sizeof(msg), IB_CM_REJ_CONSUMER_DEFINED);
1609        if (err)
1610                pr_err("rdma_reject(), err: %d\n", err);
1611
1612        /* Bounce errno back */
1613        return errno;
1614}
1615
1616static struct rtrs_srv_sess *
1617__find_sess(struct rtrs_srv *srv, const uuid_t *sess_uuid)
1618{
1619        struct rtrs_srv_sess *sess;
1620
1621        list_for_each_entry(sess, &srv->paths_list, s.entry) {
1622                if (uuid_equal(&sess->s.uuid, sess_uuid))
1623                        return sess;
1624        }
1625
1626        return NULL;
1627}
1628
1629static int create_con(struct rtrs_srv_sess *sess,
1630                      struct rdma_cm_id *cm_id,
1631                      unsigned int cid)
1632{
1633        struct rtrs_srv *srv = sess->srv;
1634        struct rtrs_sess *s = &sess->s;
1635        struct rtrs_srv_con *con;
1636
1637        u32 cq_num, max_send_wr, max_recv_wr, wr_limit;
1638        int err, cq_vector;
1639
1640        con = kzalloc(sizeof(*con), GFP_KERNEL);
1641        if (!con) {
1642                err = -ENOMEM;
1643                goto err;
1644        }
1645
1646        spin_lock_init(&con->rsp_wr_wait_lock);
1647        INIT_LIST_HEAD(&con->rsp_wr_wait_list);
1648        con->c.cm_id = cm_id;
1649        con->c.sess = &sess->s;
1650        con->c.cid = cid;
1651        atomic_set(&con->wr_cnt, 1);
1652        wr_limit = sess->s.dev->ib_dev->attrs.max_qp_wr;
1653
1654        if (con->c.cid == 0) {
1655                /*
1656                 * All receive and all send (each requiring invalidate)
1657                 * + 2 for drain and heartbeat
1658                 */
1659                max_send_wr = min_t(int, wr_limit,
1660                                    SERVICE_CON_QUEUE_DEPTH * 2 + 2);
1661                max_recv_wr = max_send_wr;
1662        } else {
1663                /* when always_invlaidate enalbed, we need linv+rinv+mr+imm */
1664                if (always_invalidate)
1665                        max_send_wr =
1666                                min_t(int, wr_limit,
1667                                      srv->queue_depth * (1 + 4) + 1);
1668                else
1669                        max_send_wr =
1670                                min_t(int, wr_limit,
1671                                      srv->queue_depth * (1 + 2) + 1);
1672
1673                max_recv_wr = srv->queue_depth + 1;
1674                /*
1675                 * If we have all receive requests posted and
1676                 * all write requests posted and each read request
1677                 * requires an invalidate request + drain
1678                 * and qp gets into error state.
1679                 */
1680        }
1681        cq_num = max_send_wr + max_recv_wr;
1682        atomic_set(&con->sq_wr_avail, max_send_wr);
1683        cq_vector = rtrs_srv_get_next_cq_vector(sess);
1684
1685        /* TODO: SOFTIRQ can be faster, but be careful with softirq context */
1686        err = rtrs_cq_qp_create(&sess->s, &con->c, 1, cq_vector, cq_num,
1687                                 max_send_wr, max_recv_wr,
1688                                 IB_POLL_WORKQUEUE);
1689        if (err) {
1690                rtrs_err(s, "rtrs_cq_qp_create(), err: %d\n", err);
1691                goto free_con;
1692        }
1693        if (con->c.cid == 0) {
1694                err = post_recv_info_req(con);
1695                if (err)
1696                        goto free_cqqp;
1697        }
1698        WARN_ON(sess->s.con[cid]);
1699        sess->s.con[cid] = &con->c;
1700
1701        /*
1702         * Change context from server to current connection.  The other
1703         * way is to use cm_id->qp->qp_context, which does not work on OFED.
1704         */
1705        cm_id->context = &con->c;
1706
1707        return 0;
1708
1709free_cqqp:
1710        rtrs_cq_qp_destroy(&con->c);
1711free_con:
1712        kfree(con);
1713
1714err:
1715        return err;
1716}
1717
1718static struct rtrs_srv_sess *__alloc_sess(struct rtrs_srv *srv,
1719                                           struct rdma_cm_id *cm_id,
1720                                           unsigned int con_num,
1721                                           unsigned int recon_cnt,
1722                                           const uuid_t *uuid)
1723{
1724        struct rtrs_srv_sess *sess;
1725        int err = -ENOMEM;
1726        char str[NAME_MAX];
1727        struct rtrs_addr path;
1728
1729        if (srv->paths_num >= MAX_PATHS_NUM) {
1730                err = -ECONNRESET;
1731                goto err;
1732        }
1733        if (__is_path_w_addr_exists(srv, &cm_id->route.addr)) {
1734                err = -EEXIST;
1735                pr_err("Path with same addr exists\n");
1736                goto err;
1737        }
1738        sess = kzalloc(sizeof(*sess), GFP_KERNEL);
1739        if (!sess)
1740                goto err;
1741
1742        sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL);
1743        if (!sess->stats)
1744                goto err_free_sess;
1745
1746        sess->stats->sess = sess;
1747
1748        sess->dma_addr = kcalloc(srv->queue_depth, sizeof(*sess->dma_addr),
1749                                 GFP_KERNEL);
1750        if (!sess->dma_addr)
1751                goto err_free_stats;
1752
1753        sess->s.con = kcalloc(con_num, sizeof(*sess->s.con), GFP_KERNEL);
1754        if (!sess->s.con)
1755                goto err_free_dma_addr;
1756
1757        sess->state = RTRS_SRV_CONNECTING;
1758        sess->srv = srv;
1759        sess->cur_cq_vector = -1;
1760        sess->s.dst_addr = cm_id->route.addr.dst_addr;
1761        sess->s.src_addr = cm_id->route.addr.src_addr;
1762
1763        /* temporary until receiving session-name from client */
1764        path.src = &sess->s.src_addr;
1765        path.dst = &sess->s.dst_addr;
1766        rtrs_addr_to_str(&path, str, sizeof(str));
1767        strscpy(sess->s.sessname, str, sizeof(sess->s.sessname));
1768
1769        sess->s.con_num = con_num;
1770        sess->s.recon_cnt = recon_cnt;
1771        uuid_copy(&sess->s.uuid, uuid);
1772        spin_lock_init(&sess->state_lock);
1773        INIT_WORK(&sess->close_work, rtrs_srv_close_work);
1774        rtrs_srv_init_hb(sess);
1775
1776        sess->s.dev = rtrs_ib_dev_find_or_add(cm_id->device, &dev_pd);
1777        if (!sess->s.dev) {
1778                err = -ENOMEM;
1779                goto err_free_con;
1780        }
1781        err = map_cont_bufs(sess);
1782        if (err)
1783                goto err_put_dev;
1784
1785        err = rtrs_srv_alloc_ops_ids(sess);
1786        if (err)
1787                goto err_unmap_bufs;
1788
1789        __add_path_to_srv(srv, sess);
1790
1791        return sess;
1792
1793err_unmap_bufs:
1794        unmap_cont_bufs(sess);
1795err_put_dev:
1796        rtrs_ib_dev_put(sess->s.dev);
1797err_free_con:
1798        kfree(sess->s.con);
1799err_free_dma_addr:
1800        kfree(sess->dma_addr);
1801err_free_stats:
1802        kfree(sess->stats);
1803err_free_sess:
1804        kfree(sess);
1805err:
1806        return ERR_PTR(err);
1807}
1808
1809static int rtrs_rdma_connect(struct rdma_cm_id *cm_id,
1810                              const struct rtrs_msg_conn_req *msg,
1811                              size_t len)
1812{
1813        struct rtrs_srv_ctx *ctx = cm_id->context;
1814        struct rtrs_srv_sess *sess;
1815        struct rtrs_srv *srv;
1816
1817        u16 version, con_num, cid;
1818        u16 recon_cnt;
1819        int err = -ECONNRESET;
1820
1821        if (len < sizeof(*msg)) {
1822                pr_err("Invalid RTRS connection request\n");
1823                goto reject_w_err;
1824        }
1825        if (le16_to_cpu(msg->magic) != RTRS_MAGIC) {
1826                pr_err("Invalid RTRS magic\n");
1827                goto reject_w_err;
1828        }
1829        version = le16_to_cpu(msg->version);
1830        if (version >> 8 != RTRS_PROTO_VER_MAJOR) {
1831                pr_err("Unsupported major RTRS version: %d, expected %d\n",
1832                       version >> 8, RTRS_PROTO_VER_MAJOR);
1833                goto reject_w_err;
1834        }
1835        con_num = le16_to_cpu(msg->cid_num);
1836        if (con_num > 4096) {
1837                /* Sanity check */
1838                pr_err("Too many connections requested: %d\n", con_num);
1839                goto reject_w_err;
1840        }
1841        cid = le16_to_cpu(msg->cid);
1842        if (cid >= con_num) {
1843                /* Sanity check */
1844                pr_err("Incorrect cid: %d >= %d\n", cid, con_num);
1845                goto reject_w_err;
1846        }
1847        recon_cnt = le16_to_cpu(msg->recon_cnt);
1848        srv = get_or_create_srv(ctx, &msg->paths_uuid, msg->first_conn);
1849        if (IS_ERR(srv)) {
1850                err = PTR_ERR(srv);
1851                pr_err("get_or_create_srv(), error %d\n", err);
1852                goto reject_w_err;
1853        }
1854        mutex_lock(&srv->paths_mutex);
1855        sess = __find_sess(srv, &msg->sess_uuid);
1856        if (sess) {
1857                struct rtrs_sess *s = &sess->s;
1858
1859                /* Session already holds a reference */
1860                put_srv(srv);
1861
1862                if (sess->state != RTRS_SRV_CONNECTING) {
1863                        rtrs_err(s, "Session in wrong state: %s\n",
1864                                  rtrs_srv_state_str(sess->state));
1865                        mutex_unlock(&srv->paths_mutex);
1866                        goto reject_w_err;
1867                }
1868                /*
1869                 * Sanity checks
1870                 */
1871                if (con_num != s->con_num || cid >= s->con_num) {
1872                        rtrs_err(s, "Incorrect request: %d, %d\n",
1873                                  cid, con_num);
1874                        mutex_unlock(&srv->paths_mutex);
1875                        goto reject_w_err;
1876                }
1877                if (s->con[cid]) {
1878                        rtrs_err(s, "Connection already exists: %d\n",
1879                                  cid);
1880                        mutex_unlock(&srv->paths_mutex);
1881                        goto reject_w_err;
1882                }
1883        } else {
1884                sess = __alloc_sess(srv, cm_id, con_num, recon_cnt,
1885                                    &msg->sess_uuid);
1886                if (IS_ERR(sess)) {
1887                        mutex_unlock(&srv->paths_mutex);
1888                        put_srv(srv);
1889                        err = PTR_ERR(sess);
1890                        pr_err("RTRS server session allocation failed: %d\n", err);
1891                        goto reject_w_err;
1892                }
1893        }
1894        err = create_con(sess, cm_id, cid);
1895        if (err) {
1896                rtrs_err((&sess->s), "create_con(), error %d\n", err);
1897                (void)rtrs_rdma_do_reject(cm_id, err);
1898                /*
1899                 * Since session has other connections we follow normal way
1900                 * through workqueue, but still return an error to tell cma.c
1901                 * to call rdma_destroy_id() for current connection.
1902                 */
1903                goto close_and_return_err;
1904        }
1905        err = rtrs_rdma_do_accept(sess, cm_id);
1906        if (err) {
1907                rtrs_err((&sess->s), "rtrs_rdma_do_accept(), error %d\n", err);
1908                (void)rtrs_rdma_do_reject(cm_id, err);
1909                /*
1910                 * Since current connection was successfully added to the
1911                 * session we follow normal way through workqueue to close the
1912                 * session, thus return 0 to tell cma.c we call
1913                 * rdma_destroy_id() ourselves.
1914                 */
1915                err = 0;
1916                goto close_and_return_err;
1917        }
1918        mutex_unlock(&srv->paths_mutex);
1919
1920        return 0;
1921
1922reject_w_err:
1923        return rtrs_rdma_do_reject(cm_id, err);
1924
1925close_and_return_err:
1926        mutex_unlock(&srv->paths_mutex);
1927        close_sess(sess);
1928
1929        return err;
1930}
1931
1932static int rtrs_srv_rdma_cm_handler(struct rdma_cm_id *cm_id,
1933                                     struct rdma_cm_event *ev)
1934{
1935        struct rtrs_srv_sess *sess = NULL;
1936        struct rtrs_sess *s = NULL;
1937
1938        if (ev->event != RDMA_CM_EVENT_CONNECT_REQUEST) {
1939                struct rtrs_con *c = cm_id->context;
1940
1941                s = c->sess;
1942                sess = to_srv_sess(s);
1943        }
1944
1945        switch (ev->event) {
1946        case RDMA_CM_EVENT_CONNECT_REQUEST:
1947                /*
1948                 * In case of error cma.c will destroy cm_id,
1949                 * see cma_process_remove()
1950                 */
1951                return rtrs_rdma_connect(cm_id, ev->param.conn.private_data,
1952                                          ev->param.conn.private_data_len);
1953        case RDMA_CM_EVENT_ESTABLISHED:
1954                /* Nothing here */
1955                break;
1956        case RDMA_CM_EVENT_REJECTED:
1957        case RDMA_CM_EVENT_CONNECT_ERROR:
1958        case RDMA_CM_EVENT_UNREACHABLE:
1959                rtrs_err(s, "CM error (CM event: %s, err: %d)\n",
1960                          rdma_event_msg(ev->event), ev->status);
1961                fallthrough;
1962        case RDMA_CM_EVENT_DISCONNECTED:
1963        case RDMA_CM_EVENT_ADDR_CHANGE:
1964        case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1965        case RDMA_CM_EVENT_DEVICE_REMOVAL:
1966                close_sess(sess);
1967                break;
1968        default:
1969                pr_err("Ignoring unexpected CM event %s, err %d\n",
1970                       rdma_event_msg(ev->event), ev->status);
1971                break;
1972        }
1973
1974        return 0;
1975}
1976
1977static struct rdma_cm_id *rtrs_srv_cm_init(struct rtrs_srv_ctx *ctx,
1978                                            struct sockaddr *addr,
1979                                            enum rdma_ucm_port_space ps)
1980{
1981        struct rdma_cm_id *cm_id;
1982        int ret;
1983
1984        cm_id = rdma_create_id(&init_net, rtrs_srv_rdma_cm_handler,
1985                               ctx, ps, IB_QPT_RC);
1986        if (IS_ERR(cm_id)) {
1987                ret = PTR_ERR(cm_id);
1988                pr_err("Creating id for RDMA connection failed, err: %d\n",
1989                       ret);
1990                goto err_out;
1991        }
1992        ret = rdma_bind_addr(cm_id, addr);
1993        if (ret) {
1994                pr_err("Binding RDMA address failed, err: %d\n", ret);
1995                goto err_cm;
1996        }
1997        ret = rdma_listen(cm_id, 64);
1998        if (ret) {
1999                pr_err("Listening on RDMA connection failed, err: %d\n",
2000                       ret);
2001                goto err_cm;
2002        }
2003
2004        return cm_id;
2005
2006err_cm:
2007        rdma_destroy_id(cm_id);
2008err_out:
2009
2010        return ERR_PTR(ret);
2011}
2012
2013static int rtrs_srv_rdma_init(struct rtrs_srv_ctx *ctx, u16 port)
2014{
2015        struct sockaddr_in6 sin = {
2016                .sin6_family    = AF_INET6,
2017                .sin6_addr      = IN6ADDR_ANY_INIT,
2018                .sin6_port      = htons(port),
2019        };
2020        struct sockaddr_ib sib = {
2021                .sib_family                     = AF_IB,
2022                .sib_sid        = cpu_to_be64(RDMA_IB_IP_PS_IB | port),
2023                .sib_sid_mask   = cpu_to_be64(0xffffffffffffffffULL),
2024                .sib_pkey       = cpu_to_be16(0xffff),
2025        };
2026        struct rdma_cm_id *cm_ip, *cm_ib;
2027        int ret;
2028
2029        /*
2030         * We accept both IPoIB and IB connections, so we need to keep
2031         * two cm id's, one for each socket type and port space.
2032         * If the cm initialization of one of the id's fails, we abort
2033         * everything.
2034         */
2035        cm_ip = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sin, RDMA_PS_TCP);
2036        if (IS_ERR(cm_ip))
2037                return PTR_ERR(cm_ip);
2038
2039        cm_ib = rtrs_srv_cm_init(ctx, (struct sockaddr *)&sib, RDMA_PS_IB);
2040        if (IS_ERR(cm_ib)) {
2041                ret = PTR_ERR(cm_ib);
2042                goto free_cm_ip;
2043        }
2044
2045        ctx->cm_id_ip = cm_ip;
2046        ctx->cm_id_ib = cm_ib;
2047
2048        return 0;
2049
2050free_cm_ip:
2051        rdma_destroy_id(cm_ip);
2052
2053        return ret;
2054}
2055
2056static struct rtrs_srv_ctx *alloc_srv_ctx(struct rtrs_srv_ops *ops)
2057{
2058        struct rtrs_srv_ctx *ctx;
2059
2060        ctx = kzalloc(sizeof(*ctx), GFP_KERNEL);
2061        if (!ctx)
2062                return NULL;
2063
2064        ctx->ops = *ops;
2065        mutex_init(&ctx->srv_mutex);
2066        INIT_LIST_HEAD(&ctx->srv_list);
2067
2068        return ctx;
2069}
2070
2071static void free_srv_ctx(struct rtrs_srv_ctx *ctx)
2072{
2073        WARN_ON(!list_empty(&ctx->srv_list));
2074        mutex_destroy(&ctx->srv_mutex);
2075        kfree(ctx);
2076}
2077
2078static int rtrs_srv_add_one(struct ib_device *device)
2079{
2080        struct rtrs_srv_ctx *ctx;
2081        int ret = 0;
2082
2083        mutex_lock(&ib_ctx.ib_dev_mutex);
2084        if (ib_ctx.ib_dev_count)
2085                goto out;
2086
2087        /*
2088         * Since our CM IDs are NOT bound to any ib device we will create them
2089         * only once
2090         */
2091        ctx = ib_ctx.srv_ctx;
2092        ret = rtrs_srv_rdma_init(ctx, ib_ctx.port);
2093        if (ret) {
2094                /*
2095                 * We errored out here.
2096                 * According to the ib code, if we encounter an error here then the
2097                 * error code is ignored, and no more calls to our ops are made.
2098                 */
2099                pr_err("Failed to initialize RDMA connection");
2100                goto err_out;
2101        }
2102
2103out:
2104        /*
2105         * Keep a track on the number of ib devices added
2106         */
2107        ib_ctx.ib_dev_count++;
2108
2109err_out:
2110        mutex_unlock(&ib_ctx.ib_dev_mutex);
2111        return ret;
2112}
2113
2114static void rtrs_srv_remove_one(struct ib_device *device, void *client_data)
2115{
2116        struct rtrs_srv_ctx *ctx;
2117
2118        mutex_lock(&ib_ctx.ib_dev_mutex);
2119        ib_ctx.ib_dev_count--;
2120
2121        if (ib_ctx.ib_dev_count)
2122                goto out;
2123
2124        /*
2125         * Since our CM IDs are NOT bound to any ib device we will remove them
2126         * only once, when the last device is removed
2127         */
2128        ctx = ib_ctx.srv_ctx;
2129        rdma_destroy_id(ctx->cm_id_ip);
2130        rdma_destroy_id(ctx->cm_id_ib);
2131
2132out:
2133        mutex_unlock(&ib_ctx.ib_dev_mutex);
2134}
2135
2136static struct ib_client rtrs_srv_client = {
2137        .name   = "rtrs_server",
2138        .add    = rtrs_srv_add_one,
2139        .remove = rtrs_srv_remove_one
2140};
2141
2142/**
2143 * rtrs_srv_open() - open RTRS server context
2144 * @ops:                callback functions
2145 * @port:               port to listen on
2146 *
2147 * Creates server context with specified callbacks.
2148 *
2149 * Return a valid pointer on success otherwise PTR_ERR.
2150 */
2151struct rtrs_srv_ctx *rtrs_srv_open(struct rtrs_srv_ops *ops, u16 port)
2152{
2153        struct rtrs_srv_ctx *ctx;
2154        int err;
2155
2156        ctx = alloc_srv_ctx(ops);
2157        if (!ctx)
2158                return ERR_PTR(-ENOMEM);
2159
2160        mutex_init(&ib_ctx.ib_dev_mutex);
2161        ib_ctx.srv_ctx = ctx;
2162        ib_ctx.port = port;
2163
2164        err = ib_register_client(&rtrs_srv_client);
2165        if (err) {
2166                free_srv_ctx(ctx);
2167                return ERR_PTR(err);
2168        }
2169
2170        return ctx;
2171}
2172EXPORT_SYMBOL(rtrs_srv_open);
2173
2174static void close_sessions(struct rtrs_srv *srv)
2175{
2176        struct rtrs_srv_sess *sess;
2177
2178        mutex_lock(&srv->paths_mutex);
2179        list_for_each_entry(sess, &srv->paths_list, s.entry)
2180                close_sess(sess);
2181        mutex_unlock(&srv->paths_mutex);
2182}
2183
2184static void close_ctx(struct rtrs_srv_ctx *ctx)
2185{
2186        struct rtrs_srv *srv;
2187
2188        mutex_lock(&ctx->srv_mutex);
2189        list_for_each_entry(srv, &ctx->srv_list, ctx_list)
2190                close_sessions(srv);
2191        mutex_unlock(&ctx->srv_mutex);
2192        flush_workqueue(rtrs_wq);
2193}
2194
2195/**
2196 * rtrs_srv_close() - close RTRS server context
2197 * @ctx: pointer to server context
2198 *
2199 * Closes RTRS server context with all client sessions.
2200 */
2201void rtrs_srv_close(struct rtrs_srv_ctx *ctx)
2202{
2203        ib_unregister_client(&rtrs_srv_client);
2204        mutex_destroy(&ib_ctx.ib_dev_mutex);
2205        close_ctx(ctx);
2206        free_srv_ctx(ctx);
2207}
2208EXPORT_SYMBOL(rtrs_srv_close);
2209
2210static int check_module_params(void)
2211{
2212        if (sess_queue_depth < 1 || sess_queue_depth > MAX_SESS_QUEUE_DEPTH) {
2213                pr_err("Invalid sess_queue_depth value %d, has to be >= %d, <= %d.\n",
2214                       sess_queue_depth, 1, MAX_SESS_QUEUE_DEPTH);
2215                return -EINVAL;
2216        }
2217        if (max_chunk_size < MIN_CHUNK_SIZE || !is_power_of_2(max_chunk_size)) {
2218                pr_err("Invalid max_chunk_size value %d, has to be >= %d and should be power of two.\n",
2219                       max_chunk_size, MIN_CHUNK_SIZE);
2220                return -EINVAL;
2221        }
2222
2223        /*
2224         * Check if IB immediate data size is enough to hold the mem_id and the
2225         * offset inside the memory chunk
2226         */
2227        if ((ilog2(sess_queue_depth - 1) + 1) +
2228            (ilog2(max_chunk_size - 1) + 1) > MAX_IMM_PAYL_BITS) {
2229                pr_err("RDMA immediate size (%db) not enough to encode %d buffers of size %dB. Reduce 'sess_queue_depth' or 'max_chunk_size' parameters.\n",
2230                       MAX_IMM_PAYL_BITS, sess_queue_depth, max_chunk_size);
2231                return -EINVAL;
2232        }
2233
2234        return 0;
2235}
2236
2237static int __init rtrs_server_init(void)
2238{
2239        int err;
2240
2241        pr_info("Loading module %s, proto %s: (max_chunk_size: %d (pure IO %ld, headers %ld) , sess_queue_depth: %d, always_invalidate: %d)\n",
2242                KBUILD_MODNAME, RTRS_PROTO_VER_STRING,
2243                max_chunk_size, max_chunk_size - MAX_HDR_SIZE, MAX_HDR_SIZE,
2244                sess_queue_depth, always_invalidate);
2245
2246        rtrs_rdma_dev_pd_init(0, &dev_pd);
2247
2248        err = check_module_params();
2249        if (err) {
2250                pr_err("Failed to load module, invalid module parameters, err: %d\n",
2251                       err);
2252                return err;
2253        }
2254        chunk_pool = mempool_create_page_pool(sess_queue_depth * CHUNK_POOL_SZ,
2255                                              get_order(max_chunk_size));
2256        if (!chunk_pool)
2257                return -ENOMEM;
2258        rtrs_dev_class = class_create(THIS_MODULE, "rtrs-server");
2259        if (IS_ERR(rtrs_dev_class)) {
2260                err = PTR_ERR(rtrs_dev_class);
2261                goto out_chunk_pool;
2262        }
2263        rtrs_wq = alloc_workqueue("rtrs_server_wq", 0, 0);
2264        if (!rtrs_wq) {
2265                err = -ENOMEM;
2266                goto out_dev_class;
2267        }
2268
2269        return 0;
2270
2271out_dev_class:
2272        class_destroy(rtrs_dev_class);
2273out_chunk_pool:
2274        mempool_destroy(chunk_pool);
2275
2276        return err;
2277}
2278
2279static void __exit rtrs_server_exit(void)
2280{
2281        destroy_workqueue(rtrs_wq);
2282        class_destroy(rtrs_dev_class);
2283        mempool_destroy(chunk_pool);
2284        rtrs_rdma_dev_pd_deinit(&dev_pd);
2285}
2286
2287module_init(rtrs_server_init);
2288module_exit(rtrs_server_exit);
2289