1
2
3
4
5
6
7
8
9
10#undef pr_fmt
11#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
12
13#include <linux/module.h>
14#include <linux/rculist.h>
15#include <linux/random.h>
16
17#include "rtrs-clt.h"
18#include "rtrs-log.h"
19
20#define RTRS_CONNECT_TIMEOUT_MS 30000
21
22
23
24
25
26#define RTRS_RECONNECT_BACKOFF 1000
27
28
29
30
31
32#define RTRS_RECONNECT_SEED 8
33
34#define FIRST_CONN 0x01
35
36#define RTRS_MAX_SEGMENTS 128
37
38MODULE_DESCRIPTION("RDMA Transport Client");
39MODULE_LICENSE("GPL");
40
41static const struct rtrs_rdma_dev_pd_ops dev_pd_ops;
42static struct rtrs_rdma_dev_pd dev_pd = {
43 .ops = &dev_pd_ops
44};
45
46static struct workqueue_struct *rtrs_wq;
47static struct class *rtrs_clt_dev_class;
48
49static inline bool rtrs_clt_is_connected(const struct rtrs_clt *clt)
50{
51 struct rtrs_clt_sess *sess;
52 bool connected = false;
53
54 rcu_read_lock();
55 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry)
56 connected |= READ_ONCE(sess->state) == RTRS_CLT_CONNECTED;
57 rcu_read_unlock();
58
59 return connected;
60}
61
62static struct rtrs_permit *
63__rtrs_get_permit(struct rtrs_clt *clt, enum rtrs_clt_con_type con_type)
64{
65 size_t max_depth = clt->queue_depth;
66 struct rtrs_permit *permit;
67 int bit;
68
69
70
71
72
73
74
75
76 do {
77 bit = find_first_zero_bit(clt->permits_map, max_depth);
78 if (unlikely(bit >= max_depth))
79 return NULL;
80 } while (unlikely(test_and_set_bit_lock(bit, clt->permits_map)));
81
82 permit = get_permit(clt, bit);
83 WARN_ON(permit->mem_id != bit);
84 permit->cpu_id = raw_smp_processor_id();
85 permit->con_type = con_type;
86
87 return permit;
88}
89
90static inline void __rtrs_put_permit(struct rtrs_clt *clt,
91 struct rtrs_permit *permit)
92{
93 clear_bit_unlock(permit->mem_id, clt->permits_map);
94}
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110struct rtrs_permit *rtrs_clt_get_permit(struct rtrs_clt *clt,
111 enum rtrs_clt_con_type con_type,
112 enum wait_type can_wait)
113{
114 struct rtrs_permit *permit;
115 DEFINE_WAIT(wait);
116
117 permit = __rtrs_get_permit(clt, con_type);
118 if (likely(permit) || !can_wait)
119 return permit;
120
121 do {
122 prepare_to_wait(&clt->permits_wait, &wait,
123 TASK_UNINTERRUPTIBLE);
124 permit = __rtrs_get_permit(clt, con_type);
125 if (likely(permit))
126 break;
127
128 io_schedule();
129 } while (1);
130
131 finish_wait(&clt->permits_wait, &wait);
132
133 return permit;
134}
135EXPORT_SYMBOL(rtrs_clt_get_permit);
136
137
138
139
140
141
142
143
144
145void rtrs_clt_put_permit(struct rtrs_clt *clt, struct rtrs_permit *permit)
146{
147 if (WARN_ON(!test_bit(permit->mem_id, clt->permits_map)))
148 return;
149
150 __rtrs_put_permit(clt, permit);
151
152
153
154
155
156
157
158
159 if (waitqueue_active(&clt->permits_wait))
160 wake_up(&clt->permits_wait);
161}
162EXPORT_SYMBOL(rtrs_clt_put_permit);
163
164
165
166
167
168
169
170
171
172static
173struct rtrs_clt_con *rtrs_permit_to_clt_con(struct rtrs_clt_sess *sess,
174 struct rtrs_permit *permit)
175{
176 int id = 0;
177
178 if (likely(permit->con_type == RTRS_IO_CON))
179 id = (permit->cpu_id % (sess->s.irq_con_num - 1)) + 1;
180
181 return to_clt_con(sess->s.con[id]);
182}
183
184
185
186
187
188
189
190
191
192
193
194
195
196static bool rtrs_clt_change_state(struct rtrs_clt_sess *sess,
197 enum rtrs_clt_state new_state)
198{
199 enum rtrs_clt_state old_state;
200 bool changed = false;
201
202 lockdep_assert_held(&sess->state_wq.lock);
203
204 old_state = sess->state;
205 switch (new_state) {
206 case RTRS_CLT_CONNECTING:
207 switch (old_state) {
208 case RTRS_CLT_RECONNECTING:
209 changed = true;
210 fallthrough;
211 default:
212 break;
213 }
214 break;
215 case RTRS_CLT_RECONNECTING:
216 switch (old_state) {
217 case RTRS_CLT_CONNECTED:
218 case RTRS_CLT_CONNECTING_ERR:
219 case RTRS_CLT_CLOSED:
220 changed = true;
221 fallthrough;
222 default:
223 break;
224 }
225 break;
226 case RTRS_CLT_CONNECTED:
227 switch (old_state) {
228 case RTRS_CLT_CONNECTING:
229 changed = true;
230 fallthrough;
231 default:
232 break;
233 }
234 break;
235 case RTRS_CLT_CONNECTING_ERR:
236 switch (old_state) {
237 case RTRS_CLT_CONNECTING:
238 changed = true;
239 fallthrough;
240 default:
241 break;
242 }
243 break;
244 case RTRS_CLT_CLOSING:
245 switch (old_state) {
246 case RTRS_CLT_CONNECTING:
247 case RTRS_CLT_CONNECTING_ERR:
248 case RTRS_CLT_RECONNECTING:
249 case RTRS_CLT_CONNECTED:
250 changed = true;
251 fallthrough;
252 default:
253 break;
254 }
255 break;
256 case RTRS_CLT_CLOSED:
257 switch (old_state) {
258 case RTRS_CLT_CLOSING:
259 changed = true;
260 fallthrough;
261 default:
262 break;
263 }
264 break;
265 case RTRS_CLT_DEAD:
266 switch (old_state) {
267 case RTRS_CLT_CLOSED:
268 changed = true;
269 fallthrough;
270 default:
271 break;
272 }
273 break;
274 default:
275 break;
276 }
277 if (changed) {
278 sess->state = new_state;
279 wake_up_locked(&sess->state_wq);
280 }
281
282 return changed;
283}
284
285static bool rtrs_clt_change_state_from_to(struct rtrs_clt_sess *sess,
286 enum rtrs_clt_state old_state,
287 enum rtrs_clt_state new_state)
288{
289 bool changed = false;
290
291 spin_lock_irq(&sess->state_wq.lock);
292 if (sess->state == old_state)
293 changed = rtrs_clt_change_state(sess, new_state);
294 spin_unlock_irq(&sess->state_wq.lock);
295
296 return changed;
297}
298
299static void rtrs_rdma_error_recovery(struct rtrs_clt_con *con)
300{
301 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
302
303 if (rtrs_clt_change_state_from_to(sess,
304 RTRS_CLT_CONNECTED,
305 RTRS_CLT_RECONNECTING)) {
306 struct rtrs_clt *clt = sess->clt;
307 unsigned int delay_ms;
308
309
310
311
312 delay_ms = clt->reconnect_delay_sec * 1000;
313 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork,
314 msecs_to_jiffies(delay_ms +
315 prandom_u32() % RTRS_RECONNECT_SEED));
316 } else {
317
318
319
320
321
322 rtrs_clt_change_state_from_to(sess,
323 RTRS_CLT_CONNECTING,
324 RTRS_CLT_CONNECTING_ERR);
325 }
326}
327
328static void rtrs_clt_fast_reg_done(struct ib_cq *cq, struct ib_wc *wc)
329{
330 struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
331
332 if (unlikely(wc->status != IB_WC_SUCCESS)) {
333 rtrs_err(con->c.sess, "Failed IB_WR_REG_MR: %s\n",
334 ib_wc_status_msg(wc->status));
335 rtrs_rdma_error_recovery(con);
336 }
337}
338
339static struct ib_cqe fast_reg_cqe = {
340 .done = rtrs_clt_fast_reg_done
341};
342
343static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno,
344 bool notify, bool can_wait);
345
346static void rtrs_clt_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc)
347{
348 struct rtrs_clt_io_req *req =
349 container_of(wc->wr_cqe, typeof(*req), inv_cqe);
350 struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
351
352 if (unlikely(wc->status != IB_WC_SUCCESS)) {
353 rtrs_err(con->c.sess, "Failed IB_WR_LOCAL_INV: %s\n",
354 ib_wc_status_msg(wc->status));
355 rtrs_rdma_error_recovery(con);
356 }
357 req->need_inv = false;
358 if (likely(req->need_inv_comp))
359 complete(&req->inv_comp);
360 else
361
362 complete_rdma_req(req, req->inv_errno, true, false);
363}
364
365static int rtrs_inv_rkey(struct rtrs_clt_io_req *req)
366{
367 struct rtrs_clt_con *con = req->con;
368 struct ib_send_wr wr = {
369 .opcode = IB_WR_LOCAL_INV,
370 .wr_cqe = &req->inv_cqe,
371 .send_flags = IB_SEND_SIGNALED,
372 .ex.invalidate_rkey = req->mr->rkey,
373 };
374 req->inv_cqe.done = rtrs_clt_inv_rkey_done;
375
376 return ib_post_send(con->c.qp, &wr, NULL);
377}
378
379static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno,
380 bool notify, bool can_wait)
381{
382 struct rtrs_clt_con *con = req->con;
383 struct rtrs_clt_sess *sess;
384 int err;
385
386 if (WARN_ON(!req->in_use))
387 return;
388 if (WARN_ON(!req->con))
389 return;
390 sess = to_clt_sess(con->c.sess);
391
392 if (req->sg_cnt) {
393 if (unlikely(req->dir == DMA_FROM_DEVICE && req->need_inv)) {
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408 if (likely(can_wait)) {
409 req->need_inv_comp = true;
410 } else {
411
412 WARN_ON(!notify);
413
414 req->inv_errno = errno;
415 }
416
417 refcount_inc(&req->ref);
418 err = rtrs_inv_rkey(req);
419 if (unlikely(err)) {
420 rtrs_err(con->c.sess, "Send INV WR key=%#x: %d\n",
421 req->mr->rkey, err);
422 } else if (likely(can_wait)) {
423 wait_for_completion(&req->inv_comp);
424 } else {
425
426
427
428
429 WARN_ON_ONCE(1);
430
431 return;
432 }
433 if (!refcount_dec_and_test(&req->ref))
434 return;
435 }
436 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist,
437 req->sg_cnt, req->dir);
438 }
439 if (!refcount_dec_and_test(&req->ref))
440 return;
441 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT)
442 atomic_dec(&sess->stats->inflight);
443
444 req->in_use = false;
445 req->con = NULL;
446
447 if (errno) {
448 rtrs_err_rl(con->c.sess, "IO request failed: error=%d path=%s [%s:%u] notify=%d\n",
449 errno, kobject_name(&sess->kobj), sess->hca_name,
450 sess->hca_port, notify);
451 }
452
453 if (notify)
454 req->conf(req->priv, errno);
455}
456
457static int rtrs_post_send_rdma(struct rtrs_clt_con *con,
458 struct rtrs_clt_io_req *req,
459 struct rtrs_rbuf *rbuf, u32 off,
460 u32 imm, struct ib_send_wr *wr)
461{
462 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
463 enum ib_send_flags flags;
464 struct ib_sge sge;
465
466 if (unlikely(!req->sg_size)) {
467 rtrs_wrn(con->c.sess,
468 "Doing RDMA Write failed, no data supplied\n");
469 return -EINVAL;
470 }
471
472
473 sge.addr = req->iu->dma_addr;
474 sge.length = req->sg_size;
475 sge.lkey = sess->s.dev->ib_pd->local_dma_lkey;
476
477
478
479
480
481 flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ?
482 0 : IB_SEND_SIGNALED;
483
484 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr,
485 req->sg_size, DMA_TO_DEVICE);
486
487 return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, &sge, 1,
488 rbuf->rkey, rbuf->addr + off,
489 imm, flags, wr, NULL);
490}
491
492static void process_io_rsp(struct rtrs_clt_sess *sess, u32 msg_id,
493 s16 errno, bool w_inval)
494{
495 struct rtrs_clt_io_req *req;
496
497 if (WARN_ON(msg_id >= sess->queue_depth))
498 return;
499
500 req = &sess->reqs[msg_id];
501
502 req->need_inv &= !w_inval;
503 complete_rdma_req(req, errno, true, false);
504}
505
506static void rtrs_clt_recv_done(struct rtrs_clt_con *con, struct ib_wc *wc)
507{
508 struct rtrs_iu *iu;
509 int err;
510 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
511
512 WARN_ON((sess->flags & RTRS_MSG_NEW_RKEY_F) == 0);
513 iu = container_of(wc->wr_cqe, struct rtrs_iu,
514 cqe);
515 err = rtrs_iu_post_recv(&con->c, iu);
516 if (unlikely(err)) {
517 rtrs_err(con->c.sess, "post iu failed %d\n", err);
518 rtrs_rdma_error_recovery(con);
519 }
520}
521
522static void rtrs_clt_rkey_rsp_done(struct rtrs_clt_con *con, struct ib_wc *wc)
523{
524 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
525 struct rtrs_msg_rkey_rsp *msg;
526 u32 imm_type, imm_payload;
527 bool w_inval = false;
528 struct rtrs_iu *iu;
529 u32 buf_id;
530 int err;
531
532 WARN_ON((sess->flags & RTRS_MSG_NEW_RKEY_F) == 0);
533
534 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
535
536 if (unlikely(wc->byte_len < sizeof(*msg))) {
537 rtrs_err(con->c.sess, "rkey response is malformed: size %d\n",
538 wc->byte_len);
539 goto out;
540 }
541 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr,
542 iu->size, DMA_FROM_DEVICE);
543 msg = iu->buf;
544 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_RKEY_RSP)) {
545 rtrs_err(sess->clt, "rkey response is malformed: type %d\n",
546 le16_to_cpu(msg->type));
547 goto out;
548 }
549 buf_id = le16_to_cpu(msg->buf_id);
550 if (WARN_ON(buf_id >= sess->queue_depth))
551 goto out;
552
553 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), &imm_type, &imm_payload);
554 if (likely(imm_type == RTRS_IO_RSP_IMM ||
555 imm_type == RTRS_IO_RSP_W_INV_IMM)) {
556 u32 msg_id;
557
558 w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM);
559 rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err);
560
561 if (WARN_ON(buf_id != msg_id))
562 goto out;
563 sess->rbufs[buf_id].rkey = le32_to_cpu(msg->rkey);
564 process_io_rsp(sess, msg_id, err, w_inval);
565 }
566 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, iu->dma_addr,
567 iu->size, DMA_FROM_DEVICE);
568 return rtrs_clt_recv_done(con, wc);
569out:
570 rtrs_rdma_error_recovery(con);
571}
572
573static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc);
574
575static struct ib_cqe io_comp_cqe = {
576 .done = rtrs_clt_rdma_done
577};
578
579
580
581
582
583static int rtrs_post_recv_empty_x2(struct rtrs_con *con, struct ib_cqe *cqe)
584{
585 struct ib_recv_wr wr_arr[2], *wr;
586 int i;
587
588 memset(wr_arr, 0, sizeof(wr_arr));
589 for (i = 0; i < ARRAY_SIZE(wr_arr); i++) {
590 wr = &wr_arr[i];
591 wr->wr_cqe = cqe;
592 if (i)
593
594 wr->next = &wr_arr[i - 1];
595 }
596
597 return ib_post_recv(con->qp, wr, NULL);
598}
599
600static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc)
601{
602 struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
603 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
604 u32 imm_type, imm_payload;
605 bool w_inval = false;
606 int err;
607
608 if (unlikely(wc->status != IB_WC_SUCCESS)) {
609 if (wc->status != IB_WC_WR_FLUSH_ERR) {
610 rtrs_err(sess->clt, "RDMA failed: %s\n",
611 ib_wc_status_msg(wc->status));
612 rtrs_rdma_error_recovery(con);
613 }
614 return;
615 }
616 rtrs_clt_update_wc_stats(con);
617
618 switch (wc->opcode) {
619 case IB_WC_RECV_RDMA_WITH_IMM:
620
621
622
623
624 if (WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done))
625 return;
626 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data),
627 &imm_type, &imm_payload);
628 if (likely(imm_type == RTRS_IO_RSP_IMM ||
629 imm_type == RTRS_IO_RSP_W_INV_IMM)) {
630 u32 msg_id;
631
632 w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM);
633 rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err);
634
635 process_io_rsp(sess, msg_id, err, w_inval);
636 } else if (imm_type == RTRS_HB_MSG_IMM) {
637 WARN_ON(con->c.cid);
638 rtrs_send_hb_ack(&sess->s);
639 if (sess->flags & RTRS_MSG_NEW_RKEY_F)
640 return rtrs_clt_recv_done(con, wc);
641 } else if (imm_type == RTRS_HB_ACK_IMM) {
642 WARN_ON(con->c.cid);
643 sess->s.hb_missed_cnt = 0;
644 sess->s.hb_cur_latency =
645 ktime_sub(ktime_get(), sess->s.hb_last_sent);
646 if (sess->flags & RTRS_MSG_NEW_RKEY_F)
647 return rtrs_clt_recv_done(con, wc);
648 } else {
649 rtrs_wrn(con->c.sess, "Unknown IMM type %u\n",
650 imm_type);
651 }
652 if (w_inval)
653
654
655
656
657 err = rtrs_post_recv_empty_x2(&con->c, &io_comp_cqe);
658 else
659 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
660 if (unlikely(err)) {
661 rtrs_err(con->c.sess, "rtrs_post_recv_empty(): %d\n",
662 err);
663 rtrs_rdma_error_recovery(con);
664 }
665 break;
666 case IB_WC_RECV:
667
668
669
670 WARN_ON(!(wc->wc_flags & IB_WC_WITH_INVALIDATE ||
671 wc->wc_flags & IB_WC_WITH_IMM));
672 WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done);
673 if (sess->flags & RTRS_MSG_NEW_RKEY_F) {
674 if (wc->wc_flags & IB_WC_WITH_INVALIDATE)
675 return rtrs_clt_recv_done(con, wc);
676
677 return rtrs_clt_rkey_rsp_done(con, wc);
678 }
679 break;
680 case IB_WC_RDMA_WRITE:
681
682
683
684 break;
685
686 default:
687 rtrs_wrn(sess->clt, "Unexpected WC type: %d\n", wc->opcode);
688 return;
689 }
690}
691
692static int post_recv_io(struct rtrs_clt_con *con, size_t q_size)
693{
694 int err, i;
695 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
696
697 for (i = 0; i < q_size; i++) {
698 if (sess->flags & RTRS_MSG_NEW_RKEY_F) {
699 struct rtrs_iu *iu = &con->rsp_ius[i];
700
701 err = rtrs_iu_post_recv(&con->c, iu);
702 } else {
703 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
704 }
705 if (unlikely(err))
706 return err;
707 }
708
709 return 0;
710}
711
712static int post_recv_sess(struct rtrs_clt_sess *sess)
713{
714 size_t q_size = 0;
715 int err, cid;
716
717 for (cid = 0; cid < sess->s.con_num; cid++) {
718 if (cid == 0)
719 q_size = SERVICE_CON_QUEUE_DEPTH;
720 else
721 q_size = sess->queue_depth;
722
723
724
725
726
727 q_size *= 2;
728
729 err = post_recv_io(to_clt_con(sess->s.con[cid]), q_size);
730 if (unlikely(err)) {
731 rtrs_err(sess->clt, "post_recv_io(), err: %d\n", err);
732 return err;
733 }
734 }
735
736 return 0;
737}
738
739struct path_it {
740 int i;
741 struct list_head skip_list;
742 struct rtrs_clt *clt;
743 struct rtrs_clt_sess *(*next_path)(struct path_it *it);
744};
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759#define list_next_or_null_rr_rcu(head, ptr, type, memb) \
760({ \
761 list_next_or_null_rcu(head, ptr, type, memb) ?: \
762 list_next_or_null_rcu(head, READ_ONCE((ptr)->next), \
763 type, memb); \
764})
765
766
767
768
769
770
771
772
773
774
775static struct rtrs_clt_sess *get_next_path_rr(struct path_it *it)
776{
777 struct rtrs_clt_sess __rcu **ppcpu_path;
778 struct rtrs_clt_sess *path;
779 struct rtrs_clt *clt;
780
781 clt = it->clt;
782
783
784
785
786
787
788
789 ppcpu_path = this_cpu_ptr(clt->pcpu_path);
790 path = rcu_dereference(*ppcpu_path);
791 if (unlikely(!path))
792 path = list_first_or_null_rcu(&clt->paths_list,
793 typeof(*path), s.entry);
794 else
795 path = list_next_or_null_rr_rcu(&clt->paths_list,
796 &path->s.entry,
797 typeof(*path),
798 s.entry);
799 rcu_assign_pointer(*ppcpu_path, path);
800
801 return path;
802}
803
804
805
806
807
808
809
810
811
812
813static struct rtrs_clt_sess *get_next_path_min_inflight(struct path_it *it)
814{
815 struct rtrs_clt_sess *min_path = NULL;
816 struct rtrs_clt *clt = it->clt;
817 struct rtrs_clt_sess *sess;
818 int min_inflight = INT_MAX;
819 int inflight;
820
821 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) {
822 if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED))
823 continue;
824
825 if (unlikely(!list_empty(raw_cpu_ptr(sess->mp_skip_entry))))
826 continue;
827
828 inflight = atomic_read(&sess->stats->inflight);
829
830 if (inflight < min_inflight) {
831 min_inflight = inflight;
832 min_path = sess;
833 }
834 }
835
836
837
838
839
840 if (min_path)
841 list_add(raw_cpu_ptr(min_path->mp_skip_entry), &it->skip_list);
842
843 return min_path;
844}
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864static struct rtrs_clt_sess *get_next_path_min_latency(struct path_it *it)
865{
866 struct rtrs_clt_sess *min_path = NULL;
867 struct rtrs_clt *clt = it->clt;
868 struct rtrs_clt_sess *sess;
869 ktime_t min_latency = INT_MAX;
870 ktime_t latency;
871
872 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) {
873 if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED))
874 continue;
875
876 if (unlikely(!list_empty(raw_cpu_ptr(sess->mp_skip_entry))))
877 continue;
878
879 latency = sess->s.hb_cur_latency;
880
881 if (latency < min_latency) {
882 min_latency = latency;
883 min_path = sess;
884 }
885 }
886
887
888
889
890
891 if (min_path)
892 list_add(raw_cpu_ptr(min_path->mp_skip_entry), &it->skip_list);
893
894 return min_path;
895}
896
897static inline void path_it_init(struct path_it *it, struct rtrs_clt *clt)
898{
899 INIT_LIST_HEAD(&it->skip_list);
900 it->clt = clt;
901 it->i = 0;
902
903 if (clt->mp_policy == MP_POLICY_RR)
904 it->next_path = get_next_path_rr;
905 else if (clt->mp_policy == MP_POLICY_MIN_INFLIGHT)
906 it->next_path = get_next_path_min_inflight;
907 else
908 it->next_path = get_next_path_min_latency;
909}
910
911static inline void path_it_deinit(struct path_it *it)
912{
913 struct list_head *skip, *tmp;
914
915
916
917
918
919 list_for_each_safe(skip, tmp, &it->skip_list)
920 list_del_init(skip);
921}
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941static void rtrs_clt_init_req(struct rtrs_clt_io_req *req,
942 struct rtrs_clt_sess *sess,
943 void (*conf)(void *priv, int errno),
944 struct rtrs_permit *permit, void *priv,
945 const struct kvec *vec, size_t usr_len,
946 struct scatterlist *sg, size_t sg_cnt,
947 size_t data_len, int dir)
948{
949 struct iov_iter iter;
950 size_t len;
951
952 req->permit = permit;
953 req->in_use = true;
954 req->usr_len = usr_len;
955 req->data_len = data_len;
956 req->sglist = sg;
957 req->sg_cnt = sg_cnt;
958 req->priv = priv;
959 req->dir = dir;
960 req->con = rtrs_permit_to_clt_con(sess, permit);
961 req->conf = conf;
962 req->need_inv = false;
963 req->need_inv_comp = false;
964 req->inv_errno = 0;
965 refcount_set(&req->ref, 1);
966
967 iov_iter_kvec(&iter, READ, vec, 1, usr_len);
968 len = _copy_from_iter(req->iu->buf, usr_len, &iter);
969 WARN_ON(len != usr_len);
970
971 reinit_completion(&req->inv_comp);
972}
973
974static struct rtrs_clt_io_req *
975rtrs_clt_get_req(struct rtrs_clt_sess *sess,
976 void (*conf)(void *priv, int errno),
977 struct rtrs_permit *permit, void *priv,
978 const struct kvec *vec, size_t usr_len,
979 struct scatterlist *sg, size_t sg_cnt,
980 size_t data_len, int dir)
981{
982 struct rtrs_clt_io_req *req;
983
984 req = &sess->reqs[permit->mem_id];
985 rtrs_clt_init_req(req, sess, conf, permit, priv, vec, usr_len,
986 sg, sg_cnt, data_len, dir);
987 return req;
988}
989
990static struct rtrs_clt_io_req *
991rtrs_clt_get_copy_req(struct rtrs_clt_sess *alive_sess,
992 struct rtrs_clt_io_req *fail_req)
993{
994 struct rtrs_clt_io_req *req;
995 struct kvec vec = {
996 .iov_base = fail_req->iu->buf,
997 .iov_len = fail_req->usr_len
998 };
999
1000 req = &alive_sess->reqs[fail_req->permit->mem_id];
1001 rtrs_clt_init_req(req, alive_sess, fail_req->conf, fail_req->permit,
1002 fail_req->priv, &vec, fail_req->usr_len,
1003 fail_req->sglist, fail_req->sg_cnt,
1004 fail_req->data_len, fail_req->dir);
1005 return req;
1006}
1007
1008static int rtrs_post_rdma_write_sg(struct rtrs_clt_con *con,
1009 struct rtrs_clt_io_req *req,
1010 struct rtrs_rbuf *rbuf, bool fr_en,
1011 u32 size, u32 imm, struct ib_send_wr *wr,
1012 struct ib_send_wr *tail)
1013{
1014 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1015 struct ib_sge *sge = req->sge;
1016 enum ib_send_flags flags;
1017 struct scatterlist *sg;
1018 size_t num_sge;
1019 int i;
1020 struct ib_send_wr *ptail = NULL;
1021
1022 if (fr_en) {
1023 i = 0;
1024 sge[i].addr = req->mr->iova;
1025 sge[i].length = req->mr->length;
1026 sge[i].lkey = req->mr->lkey;
1027 i++;
1028 num_sge = 2;
1029 ptail = tail;
1030 } else {
1031 for_each_sg(req->sglist, sg, req->sg_cnt, i) {
1032 sge[i].addr = sg_dma_address(sg);
1033 sge[i].length = sg_dma_len(sg);
1034 sge[i].lkey = sess->s.dev->ib_pd->local_dma_lkey;
1035 }
1036 num_sge = 1 + req->sg_cnt;
1037 }
1038 sge[i].addr = req->iu->dma_addr;
1039 sge[i].length = size;
1040 sge[i].lkey = sess->s.dev->ib_pd->local_dma_lkey;
1041
1042
1043
1044
1045
1046 flags = atomic_inc_return(&con->io_cnt) % sess->queue_depth ?
1047 0 : IB_SEND_SIGNALED;
1048
1049 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr,
1050 size, DMA_TO_DEVICE);
1051
1052 return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, sge, num_sge,
1053 rbuf->rkey, rbuf->addr, imm,
1054 flags, wr, ptail);
1055}
1056
1057static int rtrs_map_sg_fr(struct rtrs_clt_io_req *req, size_t count)
1058{
1059 int nr;
1060
1061
1062 nr = ib_map_mr_sg(req->mr, req->sglist, count, NULL, SZ_4K);
1063 if (nr < 0)
1064 return nr;
1065 if (unlikely(nr < req->sg_cnt))
1066 return -EINVAL;
1067 ib_update_fast_reg_key(req->mr, ib_inc_rkey(req->mr->rkey));
1068
1069 return nr;
1070}
1071
1072static int rtrs_clt_write_req(struct rtrs_clt_io_req *req)
1073{
1074 struct rtrs_clt_con *con = req->con;
1075 struct rtrs_sess *s = con->c.sess;
1076 struct rtrs_clt_sess *sess = to_clt_sess(s);
1077 struct rtrs_msg_rdma_write *msg;
1078
1079 struct rtrs_rbuf *rbuf;
1080 int ret, count = 0;
1081 u32 imm, buf_id;
1082 struct ib_reg_wr rwr;
1083 struct ib_send_wr inv_wr;
1084 struct ib_send_wr *wr = NULL;
1085 bool fr_en = false;
1086
1087 const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len;
1088
1089 if (unlikely(tsize > sess->chunk_size)) {
1090 rtrs_wrn(s, "Write request failed, size too big %zu > %d\n",
1091 tsize, sess->chunk_size);
1092 return -EMSGSIZE;
1093 }
1094 if (req->sg_cnt) {
1095 count = ib_dma_map_sg(sess->s.dev->ib_dev, req->sglist,
1096 req->sg_cnt, req->dir);
1097 if (unlikely(!count)) {
1098 rtrs_wrn(s, "Write request failed, map failed\n");
1099 return -EINVAL;
1100 }
1101 }
1102
1103 msg = req->iu->buf + req->usr_len;
1104 msg->type = cpu_to_le16(RTRS_MSG_WRITE);
1105 msg->usr_len = cpu_to_le16(req->usr_len);
1106
1107
1108 imm = req->permit->mem_off + req->data_len + req->usr_len;
1109 imm = rtrs_to_io_req_imm(imm);
1110 buf_id = req->permit->mem_id;
1111 req->sg_size = tsize;
1112 rbuf = &sess->rbufs[buf_id];
1113
1114 if (count) {
1115 ret = rtrs_map_sg_fr(req, count);
1116 if (ret < 0) {
1117 rtrs_err_rl(s,
1118 "Write request failed, failed to map fast reg. data, err: %d\n",
1119 ret);
1120 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist,
1121 req->sg_cnt, req->dir);
1122 return ret;
1123 }
1124 inv_wr = (struct ib_send_wr) {
1125 .opcode = IB_WR_LOCAL_INV,
1126 .wr_cqe = &req->inv_cqe,
1127 .send_flags = IB_SEND_SIGNALED,
1128 .ex.invalidate_rkey = req->mr->rkey,
1129 };
1130 req->inv_cqe.done = rtrs_clt_inv_rkey_done;
1131 rwr = (struct ib_reg_wr) {
1132 .wr.opcode = IB_WR_REG_MR,
1133 .wr.wr_cqe = &fast_reg_cqe,
1134 .mr = req->mr,
1135 .key = req->mr->rkey,
1136 .access = (IB_ACCESS_LOCAL_WRITE),
1137 };
1138 wr = &rwr.wr;
1139 fr_en = true;
1140 refcount_inc(&req->ref);
1141 }
1142
1143
1144
1145
1146 rtrs_clt_update_all_stats(req, WRITE);
1147
1148 ret = rtrs_post_rdma_write_sg(req->con, req, rbuf, fr_en,
1149 req->usr_len + sizeof(*msg),
1150 imm, wr, &inv_wr);
1151 if (unlikely(ret)) {
1152 rtrs_err_rl(s,
1153 "Write request failed: error=%d path=%s [%s:%u]\n",
1154 ret, kobject_name(&sess->kobj), sess->hca_name,
1155 sess->hca_port);
1156 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT)
1157 atomic_dec(&sess->stats->inflight);
1158 if (req->sg_cnt)
1159 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist,
1160 req->sg_cnt, req->dir);
1161 }
1162
1163 return ret;
1164}
1165
1166static int rtrs_clt_read_req(struct rtrs_clt_io_req *req)
1167{
1168 struct rtrs_clt_con *con = req->con;
1169 struct rtrs_sess *s = con->c.sess;
1170 struct rtrs_clt_sess *sess = to_clt_sess(s);
1171 struct rtrs_msg_rdma_read *msg;
1172 struct rtrs_ib_dev *dev = sess->s.dev;
1173
1174 struct ib_reg_wr rwr;
1175 struct ib_send_wr *wr = NULL;
1176
1177 int ret, count = 0;
1178 u32 imm, buf_id;
1179
1180 const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len;
1181
1182 if (unlikely(tsize > sess->chunk_size)) {
1183 rtrs_wrn(s,
1184 "Read request failed, message size is %zu, bigger than CHUNK_SIZE %d\n",
1185 tsize, sess->chunk_size);
1186 return -EMSGSIZE;
1187 }
1188
1189 if (req->sg_cnt) {
1190 count = ib_dma_map_sg(dev->ib_dev, req->sglist, req->sg_cnt,
1191 req->dir);
1192 if (unlikely(!count)) {
1193 rtrs_wrn(s,
1194 "Read request failed, dma map failed\n");
1195 return -EINVAL;
1196 }
1197 }
1198
1199 msg = req->iu->buf + req->usr_len;
1200 msg->type = cpu_to_le16(RTRS_MSG_READ);
1201 msg->usr_len = cpu_to_le16(req->usr_len);
1202
1203 if (count) {
1204 ret = rtrs_map_sg_fr(req, count);
1205 if (ret < 0) {
1206 rtrs_err_rl(s,
1207 "Read request failed, failed to map fast reg. data, err: %d\n",
1208 ret);
1209 ib_dma_unmap_sg(dev->ib_dev, req->sglist, req->sg_cnt,
1210 req->dir);
1211 return ret;
1212 }
1213 rwr = (struct ib_reg_wr) {
1214 .wr.opcode = IB_WR_REG_MR,
1215 .wr.wr_cqe = &fast_reg_cqe,
1216 .mr = req->mr,
1217 .key = req->mr->rkey,
1218 .access = (IB_ACCESS_LOCAL_WRITE |
1219 IB_ACCESS_REMOTE_WRITE),
1220 };
1221 wr = &rwr.wr;
1222
1223 msg->sg_cnt = cpu_to_le16(1);
1224 msg->flags = cpu_to_le16(RTRS_MSG_NEED_INVAL_F);
1225
1226 msg->desc[0].addr = cpu_to_le64(req->mr->iova);
1227 msg->desc[0].key = cpu_to_le32(req->mr->rkey);
1228 msg->desc[0].len = cpu_to_le32(req->mr->length);
1229
1230
1231 req->need_inv = !!RTRS_MSG_NEED_INVAL_F;
1232
1233 } else {
1234 msg->sg_cnt = 0;
1235 msg->flags = 0;
1236 }
1237
1238
1239
1240
1241 imm = req->permit->mem_off + req->data_len + req->usr_len;
1242 imm = rtrs_to_io_req_imm(imm);
1243 buf_id = req->permit->mem_id;
1244
1245 req->sg_size = sizeof(*msg);
1246 req->sg_size += le16_to_cpu(msg->sg_cnt) * sizeof(struct rtrs_sg_desc);
1247 req->sg_size += req->usr_len;
1248
1249
1250
1251
1252
1253 rtrs_clt_update_all_stats(req, READ);
1254
1255 ret = rtrs_post_send_rdma(req->con, req, &sess->rbufs[buf_id],
1256 req->data_len, imm, wr);
1257 if (unlikely(ret)) {
1258 rtrs_err_rl(s,
1259 "Read request failed: error=%d path=%s [%s:%u]\n",
1260 ret, kobject_name(&sess->kobj), sess->hca_name,
1261 sess->hca_port);
1262 if (sess->clt->mp_policy == MP_POLICY_MIN_INFLIGHT)
1263 atomic_dec(&sess->stats->inflight);
1264 req->need_inv = false;
1265 if (req->sg_cnt)
1266 ib_dma_unmap_sg(dev->ib_dev, req->sglist,
1267 req->sg_cnt, req->dir);
1268 }
1269
1270 return ret;
1271}
1272
1273
1274
1275
1276
1277
1278static int rtrs_clt_failover_req(struct rtrs_clt *clt,
1279 struct rtrs_clt_io_req *fail_req)
1280{
1281 struct rtrs_clt_sess *alive_sess;
1282 struct rtrs_clt_io_req *req;
1283 int err = -ECONNABORTED;
1284 struct path_it it;
1285
1286 rcu_read_lock();
1287 for (path_it_init(&it, clt);
1288 (alive_sess = it.next_path(&it)) && it.i < it.clt->paths_num;
1289 it.i++) {
1290 if (unlikely(READ_ONCE(alive_sess->state) !=
1291 RTRS_CLT_CONNECTED))
1292 continue;
1293 req = rtrs_clt_get_copy_req(alive_sess, fail_req);
1294 if (req->dir == DMA_TO_DEVICE)
1295 err = rtrs_clt_write_req(req);
1296 else
1297 err = rtrs_clt_read_req(req);
1298 if (unlikely(err)) {
1299 req->in_use = false;
1300 continue;
1301 }
1302
1303 rtrs_clt_inc_failover_cnt(alive_sess->stats);
1304 break;
1305 }
1306 path_it_deinit(&it);
1307 rcu_read_unlock();
1308
1309 return err;
1310}
1311
1312static void fail_all_outstanding_reqs(struct rtrs_clt_sess *sess)
1313{
1314 struct rtrs_clt *clt = sess->clt;
1315 struct rtrs_clt_io_req *req;
1316 int i, err;
1317
1318 if (!sess->reqs)
1319 return;
1320 for (i = 0; i < sess->queue_depth; ++i) {
1321 req = &sess->reqs[i];
1322 if (!req->in_use)
1323 continue;
1324
1325
1326
1327
1328
1329
1330 complete_rdma_req(req, -ECONNABORTED, false, true);
1331
1332 err = rtrs_clt_failover_req(clt, req);
1333 if (unlikely(err))
1334
1335 req->conf(req->priv, err);
1336 }
1337}
1338
1339static void free_sess_reqs(struct rtrs_clt_sess *sess)
1340{
1341 struct rtrs_clt_io_req *req;
1342 int i;
1343
1344 if (!sess->reqs)
1345 return;
1346 for (i = 0; i < sess->queue_depth; ++i) {
1347 req = &sess->reqs[i];
1348 if (req->mr)
1349 ib_dereg_mr(req->mr);
1350 kfree(req->sge);
1351 rtrs_iu_free(req->iu, sess->s.dev->ib_dev, 1);
1352 }
1353 kfree(sess->reqs);
1354 sess->reqs = NULL;
1355}
1356
1357static int alloc_sess_reqs(struct rtrs_clt_sess *sess)
1358{
1359 struct rtrs_clt_io_req *req;
1360 int i, err = -ENOMEM;
1361
1362 sess->reqs = kcalloc(sess->queue_depth, sizeof(*sess->reqs),
1363 GFP_KERNEL);
1364 if (!sess->reqs)
1365 return -ENOMEM;
1366
1367 for (i = 0; i < sess->queue_depth; ++i) {
1368 req = &sess->reqs[i];
1369 req->iu = rtrs_iu_alloc(1, sess->max_hdr_size, GFP_KERNEL,
1370 sess->s.dev->ib_dev,
1371 DMA_TO_DEVICE,
1372 rtrs_clt_rdma_done);
1373 if (!req->iu)
1374 goto out;
1375
1376 req->sge = kcalloc(2, sizeof(*req->sge), GFP_KERNEL);
1377 if (!req->sge)
1378 goto out;
1379
1380 req->mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG,
1381 sess->max_pages_per_mr);
1382 if (IS_ERR(req->mr)) {
1383 err = PTR_ERR(req->mr);
1384 req->mr = NULL;
1385 pr_err("Failed to alloc sess->max_pages_per_mr %d\n",
1386 sess->max_pages_per_mr);
1387 goto out;
1388 }
1389
1390 init_completion(&req->inv_comp);
1391 }
1392
1393 return 0;
1394
1395out:
1396 free_sess_reqs(sess);
1397
1398 return err;
1399}
1400
1401static int alloc_permits(struct rtrs_clt *clt)
1402{
1403 unsigned int chunk_bits;
1404 int err, i;
1405
1406 clt->permits_map = kcalloc(BITS_TO_LONGS(clt->queue_depth),
1407 sizeof(long), GFP_KERNEL);
1408 if (!clt->permits_map) {
1409 err = -ENOMEM;
1410 goto out_err;
1411 }
1412 clt->permits = kcalloc(clt->queue_depth, permit_size(clt), GFP_KERNEL);
1413 if (!clt->permits) {
1414 err = -ENOMEM;
1415 goto err_map;
1416 }
1417 chunk_bits = ilog2(clt->queue_depth - 1) + 1;
1418 for (i = 0; i < clt->queue_depth; i++) {
1419 struct rtrs_permit *permit;
1420
1421 permit = get_permit(clt, i);
1422 permit->mem_id = i;
1423 permit->mem_off = i << (MAX_IMM_PAYL_BITS - chunk_bits);
1424 }
1425
1426 return 0;
1427
1428err_map:
1429 kfree(clt->permits_map);
1430 clt->permits_map = NULL;
1431out_err:
1432 return err;
1433}
1434
1435static void free_permits(struct rtrs_clt *clt)
1436{
1437 if (clt->permits_map) {
1438 size_t sz = clt->queue_depth;
1439
1440 wait_event(clt->permits_wait,
1441 find_first_bit(clt->permits_map, sz) >= sz);
1442 }
1443 kfree(clt->permits_map);
1444 clt->permits_map = NULL;
1445 kfree(clt->permits);
1446 clt->permits = NULL;
1447}
1448
1449static void query_fast_reg_mode(struct rtrs_clt_sess *sess)
1450{
1451 struct ib_device *ib_dev;
1452 u64 max_pages_per_mr;
1453 int mr_page_shift;
1454
1455 ib_dev = sess->s.dev->ib_dev;
1456
1457
1458
1459
1460
1461
1462 mr_page_shift = max(12, ffs(ib_dev->attrs.page_size_cap) - 1);
1463 max_pages_per_mr = ib_dev->attrs.max_mr_size;
1464 do_div(max_pages_per_mr, (1ull << mr_page_shift));
1465 sess->max_pages_per_mr =
1466 min3(sess->max_pages_per_mr, (u32)max_pages_per_mr,
1467 ib_dev->attrs.max_fast_reg_page_list_len);
1468 sess->clt->max_segments =
1469 min(sess->max_pages_per_mr, sess->clt->max_segments);
1470}
1471
1472static bool rtrs_clt_change_state_get_old(struct rtrs_clt_sess *sess,
1473 enum rtrs_clt_state new_state,
1474 enum rtrs_clt_state *old_state)
1475{
1476 bool changed;
1477
1478 spin_lock_irq(&sess->state_wq.lock);
1479 if (old_state)
1480 *old_state = sess->state;
1481 changed = rtrs_clt_change_state(sess, new_state);
1482 spin_unlock_irq(&sess->state_wq.lock);
1483
1484 return changed;
1485}
1486
1487static void rtrs_clt_hb_err_handler(struct rtrs_con *c)
1488{
1489 struct rtrs_clt_con *con = container_of(c, typeof(*con), c);
1490
1491 rtrs_rdma_error_recovery(con);
1492}
1493
1494static void rtrs_clt_init_hb(struct rtrs_clt_sess *sess)
1495{
1496 rtrs_init_hb(&sess->s, &io_comp_cqe,
1497 RTRS_HB_INTERVAL_MS,
1498 RTRS_HB_MISSED_MAX,
1499 rtrs_clt_hb_err_handler,
1500 rtrs_wq);
1501}
1502
1503static void rtrs_clt_reconnect_work(struct work_struct *work);
1504static void rtrs_clt_close_work(struct work_struct *work);
1505
1506static struct rtrs_clt_sess *alloc_sess(struct rtrs_clt *clt,
1507 const struct rtrs_addr *path,
1508 size_t con_num, u32 nr_poll_queues)
1509{
1510 struct rtrs_clt_sess *sess;
1511 int err = -ENOMEM;
1512 int cpu;
1513 size_t total_con;
1514
1515 sess = kzalloc(sizeof(*sess), GFP_KERNEL);
1516 if (!sess)
1517 goto err;
1518
1519
1520
1521
1522
1523 total_con = con_num + nr_poll_queues + 1;
1524 sess->s.con = kcalloc(total_con, sizeof(*sess->s.con), GFP_KERNEL);
1525 if (!sess->s.con)
1526 goto err_free_sess;
1527
1528 sess->s.con_num = total_con;
1529 sess->s.irq_con_num = con_num + 1;
1530
1531 sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL);
1532 if (!sess->stats)
1533 goto err_free_con;
1534
1535 mutex_init(&sess->init_mutex);
1536 uuid_gen(&sess->s.uuid);
1537 memcpy(&sess->s.dst_addr, path->dst,
1538 rdma_addr_size((struct sockaddr *)path->dst));
1539
1540
1541
1542
1543
1544
1545 if (path->src)
1546 memcpy(&sess->s.src_addr, path->src,
1547 rdma_addr_size((struct sockaddr *)path->src));
1548 strscpy(sess->s.sessname, clt->sessname, sizeof(sess->s.sessname));
1549 sess->clt = clt;
1550 sess->max_pages_per_mr = RTRS_MAX_SEGMENTS;
1551 init_waitqueue_head(&sess->state_wq);
1552 sess->state = RTRS_CLT_CONNECTING;
1553 atomic_set(&sess->connected_cnt, 0);
1554 INIT_WORK(&sess->close_work, rtrs_clt_close_work);
1555 INIT_DELAYED_WORK(&sess->reconnect_dwork, rtrs_clt_reconnect_work);
1556 rtrs_clt_init_hb(sess);
1557
1558 sess->mp_skip_entry = alloc_percpu(typeof(*sess->mp_skip_entry));
1559 if (!sess->mp_skip_entry)
1560 goto err_free_stats;
1561
1562 for_each_possible_cpu(cpu)
1563 INIT_LIST_HEAD(per_cpu_ptr(sess->mp_skip_entry, cpu));
1564
1565 err = rtrs_clt_init_stats(sess->stats);
1566 if (err)
1567 goto err_free_percpu;
1568
1569 return sess;
1570
1571err_free_percpu:
1572 free_percpu(sess->mp_skip_entry);
1573err_free_stats:
1574 kfree(sess->stats);
1575err_free_con:
1576 kfree(sess->s.con);
1577err_free_sess:
1578 kfree(sess);
1579err:
1580 return ERR_PTR(err);
1581}
1582
1583void free_sess(struct rtrs_clt_sess *sess)
1584{
1585 free_percpu(sess->mp_skip_entry);
1586 mutex_destroy(&sess->init_mutex);
1587 kfree(sess->s.con);
1588 kfree(sess->rbufs);
1589 kfree(sess);
1590}
1591
1592static int create_con(struct rtrs_clt_sess *sess, unsigned int cid)
1593{
1594 struct rtrs_clt_con *con;
1595
1596 con = kzalloc(sizeof(*con), GFP_KERNEL);
1597 if (!con)
1598 return -ENOMEM;
1599
1600
1601 con->cpu = (cid ? cid - 1 : 0) % nr_cpu_ids;
1602 con->c.cid = cid;
1603 con->c.sess = &sess->s;
1604 atomic_set(&con->io_cnt, 0);
1605 mutex_init(&con->con_mutex);
1606
1607 sess->s.con[cid] = &con->c;
1608
1609 return 0;
1610}
1611
1612static void destroy_con(struct rtrs_clt_con *con)
1613{
1614 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1615
1616 sess->s.con[con->c.cid] = NULL;
1617 mutex_destroy(&con->con_mutex);
1618 kfree(con);
1619}
1620
1621static int create_con_cq_qp(struct rtrs_clt_con *con)
1622{
1623 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1624 u32 max_send_wr, max_recv_wr, cq_num, max_send_sge, wr_limit;
1625 int err, cq_vector;
1626 struct rtrs_msg_rkey_rsp *rsp;
1627
1628 lockdep_assert_held(&con->con_mutex);
1629 if (con->c.cid == 0) {
1630 max_send_sge = 1;
1631
1632 if (WARN_ON(sess->s.dev))
1633 return -EINVAL;
1634
1635
1636
1637
1638
1639
1640 sess->s.dev = rtrs_ib_dev_find_or_add(con->c.cm_id->device,
1641 &dev_pd);
1642 if (!sess->s.dev) {
1643 rtrs_wrn(sess->clt,
1644 "rtrs_ib_dev_find_get_or_add(): no memory\n");
1645 return -ENOMEM;
1646 }
1647 sess->s.dev_ref = 1;
1648 query_fast_reg_mode(sess);
1649 wr_limit = sess->s.dev->ib_dev->attrs.max_qp_wr;
1650
1651
1652
1653
1654
1655
1656
1657 max_send_wr =
1658 min_t(int, wr_limit, SERVICE_CON_QUEUE_DEPTH * 2 + 2);
1659 max_recv_wr = max_send_wr;
1660 } else {
1661
1662
1663
1664
1665
1666 if (WARN_ON(!sess->s.dev))
1667 return -EINVAL;
1668 if (WARN_ON(!sess->queue_depth))
1669 return -EINVAL;
1670
1671 wr_limit = sess->s.dev->ib_dev->attrs.max_qp_wr;
1672
1673 sess->s.dev_ref++;
1674 max_send_wr = min_t(int, wr_limit,
1675
1676 sess->queue_depth * 3 + 1);
1677 max_recv_wr = min_t(int, wr_limit,
1678 sess->queue_depth * 3 + 1);
1679 max_send_sge = 2;
1680 }
1681 cq_num = max_send_wr + max_recv_wr;
1682
1683 if (sess->flags & RTRS_MSG_NEW_RKEY_F || con->c.cid == 0) {
1684 con->rsp_ius = rtrs_iu_alloc(cq_num, sizeof(*rsp),
1685 GFP_KERNEL, sess->s.dev->ib_dev,
1686 DMA_FROM_DEVICE,
1687 rtrs_clt_rdma_done);
1688 if (!con->rsp_ius)
1689 return -ENOMEM;
1690 con->queue_num = cq_num;
1691 }
1692 cq_num = max_send_wr + max_recv_wr;
1693 cq_vector = con->cpu % sess->s.dev->ib_dev->num_comp_vectors;
1694 if (con->c.cid >= sess->s.irq_con_num)
1695 err = rtrs_cq_qp_create(&sess->s, &con->c, max_send_sge,
1696 cq_vector, cq_num, max_send_wr,
1697 max_recv_wr, IB_POLL_DIRECT);
1698 else
1699 err = rtrs_cq_qp_create(&sess->s, &con->c, max_send_sge,
1700 cq_vector, cq_num, max_send_wr,
1701 max_recv_wr, IB_POLL_SOFTIRQ);
1702
1703
1704
1705
1706 return err;
1707}
1708
1709static void destroy_con_cq_qp(struct rtrs_clt_con *con)
1710{
1711 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1712
1713
1714
1715
1716
1717 lockdep_assert_held(&con->con_mutex);
1718 rtrs_cq_qp_destroy(&con->c);
1719 if (con->rsp_ius) {
1720 rtrs_iu_free(con->rsp_ius, sess->s.dev->ib_dev, con->queue_num);
1721 con->rsp_ius = NULL;
1722 con->queue_num = 0;
1723 }
1724 if (sess->s.dev_ref && !--sess->s.dev_ref) {
1725 rtrs_ib_dev_put(sess->s.dev);
1726 sess->s.dev = NULL;
1727 }
1728}
1729
1730static void stop_cm(struct rtrs_clt_con *con)
1731{
1732 rdma_disconnect(con->c.cm_id);
1733 if (con->c.qp)
1734 ib_drain_qp(con->c.qp);
1735}
1736
1737static void destroy_cm(struct rtrs_clt_con *con)
1738{
1739 rdma_destroy_id(con->c.cm_id);
1740 con->c.cm_id = NULL;
1741}
1742
1743static int rtrs_rdma_addr_resolved(struct rtrs_clt_con *con)
1744{
1745 struct rtrs_sess *s = con->c.sess;
1746 int err;
1747
1748 mutex_lock(&con->con_mutex);
1749 err = create_con_cq_qp(con);
1750 mutex_unlock(&con->con_mutex);
1751 if (err) {
1752 rtrs_err(s, "create_con_cq_qp(), err: %d\n", err);
1753 return err;
1754 }
1755 err = rdma_resolve_route(con->c.cm_id, RTRS_CONNECT_TIMEOUT_MS);
1756 if (err)
1757 rtrs_err(s, "Resolving route failed, err: %d\n", err);
1758
1759 return err;
1760}
1761
1762static int rtrs_rdma_route_resolved(struct rtrs_clt_con *con)
1763{
1764 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1765 struct rtrs_clt *clt = sess->clt;
1766 struct rtrs_msg_conn_req msg;
1767 struct rdma_conn_param param;
1768
1769 int err;
1770
1771 param = (struct rdma_conn_param) {
1772 .retry_count = 7,
1773 .rnr_retry_count = 7,
1774 .private_data = &msg,
1775 .private_data_len = sizeof(msg),
1776 };
1777
1778 msg = (struct rtrs_msg_conn_req) {
1779 .magic = cpu_to_le16(RTRS_MAGIC),
1780 .version = cpu_to_le16(RTRS_PROTO_VER),
1781 .cid = cpu_to_le16(con->c.cid),
1782 .cid_num = cpu_to_le16(sess->s.con_num),
1783 .recon_cnt = cpu_to_le16(sess->s.recon_cnt),
1784 };
1785 msg.first_conn = sess->for_new_clt ? FIRST_CONN : 0;
1786 uuid_copy(&msg.sess_uuid, &sess->s.uuid);
1787 uuid_copy(&msg.paths_uuid, &clt->paths_uuid);
1788
1789 err = rdma_connect_locked(con->c.cm_id, ¶m);
1790 if (err)
1791 rtrs_err(clt, "rdma_connect_locked(): %d\n", err);
1792
1793 return err;
1794}
1795
1796static int rtrs_rdma_conn_established(struct rtrs_clt_con *con,
1797 struct rdma_cm_event *ev)
1798{
1799 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1800 struct rtrs_clt *clt = sess->clt;
1801 const struct rtrs_msg_conn_rsp *msg;
1802 u16 version, queue_depth;
1803 int errno;
1804 u8 len;
1805
1806 msg = ev->param.conn.private_data;
1807 len = ev->param.conn.private_data_len;
1808 if (len < sizeof(*msg)) {
1809 rtrs_err(clt, "Invalid RTRS connection response\n");
1810 return -ECONNRESET;
1811 }
1812 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) {
1813 rtrs_err(clt, "Invalid RTRS magic\n");
1814 return -ECONNRESET;
1815 }
1816 version = le16_to_cpu(msg->version);
1817 if (version >> 8 != RTRS_PROTO_VER_MAJOR) {
1818 rtrs_err(clt, "Unsupported major RTRS version: %d, expected %d\n",
1819 version >> 8, RTRS_PROTO_VER_MAJOR);
1820 return -ECONNRESET;
1821 }
1822 errno = le16_to_cpu(msg->errno);
1823 if (errno) {
1824 rtrs_err(clt, "Invalid RTRS message: errno %d\n",
1825 errno);
1826 return -ECONNRESET;
1827 }
1828 if (con->c.cid == 0) {
1829 queue_depth = le16_to_cpu(msg->queue_depth);
1830
1831 if (sess->queue_depth > 0 && queue_depth != sess->queue_depth) {
1832 rtrs_err(clt, "Error: queue depth changed\n");
1833
1834
1835
1836
1837 sess->reconnect_attempts = -1;
1838 rtrs_err(clt,
1839 "Disabling auto-reconnect. Trigger a manual reconnect after issue is resolved\n");
1840 return -ECONNRESET;
1841 }
1842
1843 if (!sess->rbufs) {
1844 kfree(sess->rbufs);
1845 sess->rbufs = kcalloc(queue_depth, sizeof(*sess->rbufs),
1846 GFP_KERNEL);
1847 if (!sess->rbufs)
1848 return -ENOMEM;
1849 }
1850 sess->queue_depth = queue_depth;
1851 sess->max_hdr_size = le32_to_cpu(msg->max_hdr_size);
1852 sess->max_io_size = le32_to_cpu(msg->max_io_size);
1853 sess->flags = le32_to_cpu(msg->flags);
1854 sess->chunk_size = sess->max_io_size + sess->max_hdr_size;
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864 mutex_lock(&clt->paths_mutex);
1865 clt->queue_depth = sess->queue_depth;
1866 clt->max_io_size = min_not_zero(sess->max_io_size,
1867 clt->max_io_size);
1868 mutex_unlock(&clt->paths_mutex);
1869
1870
1871
1872
1873 sess->hca_port = con->c.cm_id->port_num;
1874 scnprintf(sess->hca_name, sizeof(sess->hca_name),
1875 sess->s.dev->ib_dev->name);
1876 sess->s.src_addr = con->c.cm_id->route.addr.src_addr;
1877
1878 sess->for_new_clt = 1;
1879 }
1880
1881 return 0;
1882}
1883
1884static inline void flag_success_on_conn(struct rtrs_clt_con *con)
1885{
1886 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1887
1888 atomic_inc(&sess->connected_cnt);
1889 con->cm_err = 1;
1890}
1891
1892static int rtrs_rdma_conn_rejected(struct rtrs_clt_con *con,
1893 struct rdma_cm_event *ev)
1894{
1895 struct rtrs_sess *s = con->c.sess;
1896 const struct rtrs_msg_conn_rsp *msg;
1897 const char *rej_msg;
1898 int status, errno;
1899 u8 data_len;
1900
1901 status = ev->status;
1902 rej_msg = rdma_reject_msg(con->c.cm_id, status);
1903 msg = rdma_consumer_reject_data(con->c.cm_id, ev, &data_len);
1904
1905 if (msg && data_len >= sizeof(*msg)) {
1906 errno = (int16_t)le16_to_cpu(msg->errno);
1907 if (errno == -EBUSY)
1908 rtrs_err(s,
1909 "Previous session is still exists on the server, please reconnect later\n");
1910 else
1911 rtrs_err(s,
1912 "Connect rejected: status %d (%s), rtrs errno %d\n",
1913 status, rej_msg, errno);
1914 } else {
1915 rtrs_err(s,
1916 "Connect rejected but with malformed message: status %d (%s)\n",
1917 status, rej_msg);
1918 }
1919
1920 return -ECONNRESET;
1921}
1922
1923void rtrs_clt_close_conns(struct rtrs_clt_sess *sess, bool wait)
1924{
1925 if (rtrs_clt_change_state_get_old(sess, RTRS_CLT_CLOSING, NULL))
1926 queue_work(rtrs_wq, &sess->close_work);
1927 if (wait)
1928 flush_work(&sess->close_work);
1929}
1930
1931static inline void flag_error_on_conn(struct rtrs_clt_con *con, int cm_err)
1932{
1933 if (con->cm_err == 1) {
1934 struct rtrs_clt_sess *sess;
1935
1936 sess = to_clt_sess(con->c.sess);
1937 if (atomic_dec_and_test(&sess->connected_cnt))
1938
1939 wake_up(&sess->state_wq);
1940 }
1941 con->cm_err = cm_err;
1942}
1943
1944static int rtrs_clt_rdma_cm_handler(struct rdma_cm_id *cm_id,
1945 struct rdma_cm_event *ev)
1946{
1947 struct rtrs_clt_con *con = cm_id->context;
1948 struct rtrs_sess *s = con->c.sess;
1949 struct rtrs_clt_sess *sess = to_clt_sess(s);
1950 int cm_err = 0;
1951
1952 switch (ev->event) {
1953 case RDMA_CM_EVENT_ADDR_RESOLVED:
1954 cm_err = rtrs_rdma_addr_resolved(con);
1955 break;
1956 case RDMA_CM_EVENT_ROUTE_RESOLVED:
1957 cm_err = rtrs_rdma_route_resolved(con);
1958 break;
1959 case RDMA_CM_EVENT_ESTABLISHED:
1960 cm_err = rtrs_rdma_conn_established(con, ev);
1961 if (likely(!cm_err)) {
1962
1963
1964
1965
1966 flag_success_on_conn(con);
1967 wake_up(&sess->state_wq);
1968 return 0;
1969 }
1970 break;
1971 case RDMA_CM_EVENT_REJECTED:
1972 cm_err = rtrs_rdma_conn_rejected(con, ev);
1973 break;
1974 case RDMA_CM_EVENT_DISCONNECTED:
1975
1976 cm_err = -ECONNRESET;
1977 break;
1978 case RDMA_CM_EVENT_CONNECT_ERROR:
1979 case RDMA_CM_EVENT_UNREACHABLE:
1980 case RDMA_CM_EVENT_ADDR_CHANGE:
1981 case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1982 rtrs_wrn(s, "CM error (CM event: %s, err: %d)\n",
1983 rdma_event_msg(ev->event), ev->status);
1984 cm_err = -ECONNRESET;
1985 break;
1986 case RDMA_CM_EVENT_ADDR_ERROR:
1987 case RDMA_CM_EVENT_ROUTE_ERROR:
1988 rtrs_wrn(s, "CM error (CM event: %s, err: %d)\n",
1989 rdma_event_msg(ev->event), ev->status);
1990 cm_err = -EHOSTUNREACH;
1991 break;
1992 case RDMA_CM_EVENT_DEVICE_REMOVAL:
1993
1994
1995
1996 rtrs_clt_close_conns(sess, false);
1997 return 0;
1998 default:
1999 rtrs_err(s, "Unexpected RDMA CM error (CM event: %s, err: %d)\n",
2000 rdma_event_msg(ev->event), ev->status);
2001 cm_err = -ECONNRESET;
2002 break;
2003 }
2004
2005 if (cm_err) {
2006
2007
2008
2009
2010 flag_error_on_conn(con, cm_err);
2011 rtrs_rdma_error_recovery(con);
2012 }
2013
2014 return 0;
2015}
2016
2017static int create_cm(struct rtrs_clt_con *con)
2018{
2019 struct rtrs_sess *s = con->c.sess;
2020 struct rtrs_clt_sess *sess = to_clt_sess(s);
2021 struct rdma_cm_id *cm_id;
2022 int err;
2023
2024 cm_id = rdma_create_id(&init_net, rtrs_clt_rdma_cm_handler, con,
2025 sess->s.dst_addr.ss_family == AF_IB ?
2026 RDMA_PS_IB : RDMA_PS_TCP, IB_QPT_RC);
2027 if (IS_ERR(cm_id)) {
2028 err = PTR_ERR(cm_id);
2029 rtrs_err(s, "Failed to create CM ID, err: %d\n", err);
2030
2031 return err;
2032 }
2033 con->c.cm_id = cm_id;
2034 con->cm_err = 0;
2035
2036 err = rdma_set_reuseaddr(cm_id, 1);
2037 if (err != 0) {
2038 rtrs_err(s, "Set address reuse failed, err: %d\n", err);
2039 goto destroy_cm;
2040 }
2041 err = rdma_resolve_addr(cm_id, (struct sockaddr *)&sess->s.src_addr,
2042 (struct sockaddr *)&sess->s.dst_addr,
2043 RTRS_CONNECT_TIMEOUT_MS);
2044 if (err) {
2045 rtrs_err(s, "Failed to resolve address, err: %d\n", err);
2046 goto destroy_cm;
2047 }
2048
2049
2050
2051
2052
2053 err = wait_event_interruptible_timeout(
2054 sess->state_wq,
2055 con->cm_err || sess->state != RTRS_CLT_CONNECTING,
2056 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS));
2057 if (err == 0 || err == -ERESTARTSYS) {
2058 if (err == 0)
2059 err = -ETIMEDOUT;
2060
2061 goto errr;
2062 }
2063 if (con->cm_err < 0) {
2064 err = con->cm_err;
2065 goto errr;
2066 }
2067 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTING) {
2068
2069 err = -ECONNABORTED;
2070 goto errr;
2071 }
2072
2073 return 0;
2074
2075errr:
2076 stop_cm(con);
2077 mutex_lock(&con->con_mutex);
2078 destroy_con_cq_qp(con);
2079 mutex_unlock(&con->con_mutex);
2080destroy_cm:
2081 destroy_cm(con);
2082
2083 return err;
2084}
2085
2086static void rtrs_clt_sess_up(struct rtrs_clt_sess *sess)
2087{
2088 struct rtrs_clt *clt = sess->clt;
2089 int up;
2090
2091
2092
2093
2094
2095
2096
2097
2098 mutex_lock(&clt->paths_ev_mutex);
2099 up = ++clt->paths_up;
2100
2101
2102
2103
2104
2105 if (up > MAX_PATHS_NUM && up == MAX_PATHS_NUM + clt->paths_num)
2106 clt->paths_up = clt->paths_num;
2107 else if (up == 1)
2108 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_RECONNECTED);
2109 mutex_unlock(&clt->paths_ev_mutex);
2110
2111
2112 sess->established = true;
2113 sess->reconnect_attempts = 0;
2114 sess->stats->reconnects.successful_cnt++;
2115}
2116
2117static void rtrs_clt_sess_down(struct rtrs_clt_sess *sess)
2118{
2119 struct rtrs_clt *clt = sess->clt;
2120
2121 if (!sess->established)
2122 return;
2123
2124 sess->established = false;
2125 mutex_lock(&clt->paths_ev_mutex);
2126 WARN_ON(!clt->paths_up);
2127 if (--clt->paths_up == 0)
2128 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_DISCONNECTED);
2129 mutex_unlock(&clt->paths_ev_mutex);
2130}
2131
2132static void rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_sess *sess)
2133{
2134 struct rtrs_clt_con *con;
2135 unsigned int cid;
2136
2137 WARN_ON(READ_ONCE(sess->state) == RTRS_CLT_CONNECTED);
2138
2139
2140
2141
2142
2143 mutex_lock(&sess->init_mutex);
2144 mutex_unlock(&sess->init_mutex);
2145
2146
2147
2148
2149
2150 synchronize_rcu();
2151
2152 rtrs_stop_hb(&sess->s);
2153
2154
2155
2156
2157
2158
2159
2160
2161 for (cid = 0; cid < sess->s.con_num; cid++) {
2162 if (!sess->s.con[cid])
2163 break;
2164 con = to_clt_con(sess->s.con[cid]);
2165 stop_cm(con);
2166 }
2167 fail_all_outstanding_reqs(sess);
2168 free_sess_reqs(sess);
2169 rtrs_clt_sess_down(sess);
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179 wait_event_timeout(sess->state_wq, !atomic_read(&sess->connected_cnt),
2180 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS));
2181
2182 for (cid = 0; cid < sess->s.con_num; cid++) {
2183 if (!sess->s.con[cid])
2184 break;
2185 con = to_clt_con(sess->s.con[cid]);
2186 mutex_lock(&con->con_mutex);
2187 destroy_con_cq_qp(con);
2188 mutex_unlock(&con->con_mutex);
2189 destroy_cm(con);
2190 destroy_con(con);
2191 }
2192}
2193
2194static inline bool xchg_sessions(struct rtrs_clt_sess __rcu **rcu_ppcpu_path,
2195 struct rtrs_clt_sess *sess,
2196 struct rtrs_clt_sess *next)
2197{
2198 struct rtrs_clt_sess **ppcpu_path;
2199
2200
2201 ppcpu_path = (typeof(ppcpu_path))rcu_ppcpu_path;
2202 return sess == cmpxchg(ppcpu_path, sess, next);
2203}
2204
2205static void rtrs_clt_remove_path_from_arr(struct rtrs_clt_sess *sess)
2206{
2207 struct rtrs_clt *clt = sess->clt;
2208 struct rtrs_clt_sess *next;
2209 bool wait_for_grace = false;
2210 int cpu;
2211
2212 mutex_lock(&clt->paths_mutex);
2213 list_del_rcu(&sess->s.entry);
2214
2215
2216 synchronize_rcu();
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247 clt->paths_num--;
2248
2249
2250
2251
2252
2253 rcu_read_lock();
2254 next = list_next_or_null_rr_rcu(&clt->paths_list, &sess->s.entry,
2255 typeof(*next), s.entry);
2256 rcu_read_unlock();
2257
2258
2259
2260
2261
2262 for_each_possible_cpu(cpu) {
2263 struct rtrs_clt_sess __rcu **ppcpu_path;
2264
2265 ppcpu_path = per_cpu_ptr(clt->pcpu_path, cpu);
2266 if (rcu_dereference_protected(*ppcpu_path,
2267 lockdep_is_held(&clt->paths_mutex)) != sess)
2268
2269
2270
2271
2272
2273
2274 continue;
2275
2276
2277
2278
2279
2280 if (xchg_sessions(ppcpu_path, sess, next))
2281
2282
2283
2284
2285
2286
2287 wait_for_grace = true;
2288 }
2289 if (wait_for_grace)
2290 synchronize_rcu();
2291
2292 mutex_unlock(&clt->paths_mutex);
2293}
2294
2295static void rtrs_clt_add_path_to_arr(struct rtrs_clt_sess *sess)
2296{
2297 struct rtrs_clt *clt = sess->clt;
2298
2299 mutex_lock(&clt->paths_mutex);
2300 clt->paths_num++;
2301
2302 list_add_tail_rcu(&sess->s.entry, &clt->paths_list);
2303 mutex_unlock(&clt->paths_mutex);
2304}
2305
2306static void rtrs_clt_close_work(struct work_struct *work)
2307{
2308 struct rtrs_clt_sess *sess;
2309
2310 sess = container_of(work, struct rtrs_clt_sess, close_work);
2311
2312 cancel_delayed_work_sync(&sess->reconnect_dwork);
2313 rtrs_clt_stop_and_destroy_conns(sess);
2314 rtrs_clt_change_state_get_old(sess, RTRS_CLT_CLOSED, NULL);
2315}
2316
2317static int init_conns(struct rtrs_clt_sess *sess)
2318{
2319 unsigned int cid;
2320 int err;
2321
2322
2323
2324
2325
2326
2327 sess->s.recon_cnt++;
2328
2329
2330 for (cid = 0; cid < sess->s.con_num; cid++) {
2331 err = create_con(sess, cid);
2332 if (err)
2333 goto destroy;
2334
2335 err = create_cm(to_clt_con(sess->s.con[cid]));
2336 if (err) {
2337 destroy_con(to_clt_con(sess->s.con[cid]));
2338 goto destroy;
2339 }
2340 }
2341 err = alloc_sess_reqs(sess);
2342 if (err)
2343 goto destroy;
2344
2345 rtrs_start_hb(&sess->s);
2346
2347 return 0;
2348
2349destroy:
2350 while (cid--) {
2351 struct rtrs_clt_con *con = to_clt_con(sess->s.con[cid]);
2352
2353 stop_cm(con);
2354
2355 mutex_lock(&con->con_mutex);
2356 destroy_con_cq_qp(con);
2357 mutex_unlock(&con->con_mutex);
2358 destroy_cm(con);
2359 destroy_con(con);
2360 }
2361
2362
2363
2364
2365
2366 rtrs_clt_change_state_get_old(sess, RTRS_CLT_CONNECTING_ERR, NULL);
2367
2368 return err;
2369}
2370
2371static void rtrs_clt_info_req_done(struct ib_cq *cq, struct ib_wc *wc)
2372{
2373 struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
2374 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
2375 struct rtrs_iu *iu;
2376
2377 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
2378 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1);
2379
2380 if (unlikely(wc->status != IB_WC_SUCCESS)) {
2381 rtrs_err(sess->clt, "Sess info request send failed: %s\n",
2382 ib_wc_status_msg(wc->status));
2383 rtrs_clt_change_state_get_old(sess, RTRS_CLT_CONNECTING_ERR, NULL);
2384 return;
2385 }
2386
2387 rtrs_clt_update_wc_stats(con);
2388}
2389
2390static int process_info_rsp(struct rtrs_clt_sess *sess,
2391 const struct rtrs_msg_info_rsp *msg)
2392{
2393 unsigned int sg_cnt, total_len;
2394 int i, sgi;
2395
2396 sg_cnt = le16_to_cpu(msg->sg_cnt);
2397 if (unlikely(!sg_cnt || (sess->queue_depth % sg_cnt))) {
2398 rtrs_err(sess->clt, "Incorrect sg_cnt %d, is not multiple\n",
2399 sg_cnt);
2400 return -EINVAL;
2401 }
2402
2403
2404
2405
2406
2407 if (unlikely((ilog2(sg_cnt - 1) + 1) +
2408 (ilog2(sess->chunk_size - 1) + 1) >
2409 MAX_IMM_PAYL_BITS)) {
2410 rtrs_err(sess->clt,
2411 "RDMA immediate size (%db) not enough to encode %d buffers of size %dB\n",
2412 MAX_IMM_PAYL_BITS, sg_cnt, sess->chunk_size);
2413 return -EINVAL;
2414 }
2415 total_len = 0;
2416 for (sgi = 0, i = 0; sgi < sg_cnt && i < sess->queue_depth; sgi++) {
2417 const struct rtrs_sg_desc *desc = &msg->desc[sgi];
2418 u32 len, rkey;
2419 u64 addr;
2420
2421 addr = le64_to_cpu(desc->addr);
2422 rkey = le32_to_cpu(desc->key);
2423 len = le32_to_cpu(desc->len);
2424
2425 total_len += len;
2426
2427 if (unlikely(!len || (len % sess->chunk_size))) {
2428 rtrs_err(sess->clt, "Incorrect [%d].len %d\n", sgi,
2429 len);
2430 return -EINVAL;
2431 }
2432 for ( ; len && i < sess->queue_depth; i++) {
2433 sess->rbufs[i].addr = addr;
2434 sess->rbufs[i].rkey = rkey;
2435
2436 len -= sess->chunk_size;
2437 addr += sess->chunk_size;
2438 }
2439 }
2440
2441 if (unlikely(sgi != sg_cnt || i != sess->queue_depth)) {
2442 rtrs_err(sess->clt, "Incorrect sg vector, not fully mapped\n");
2443 return -EINVAL;
2444 }
2445 if (unlikely(total_len != sess->chunk_size * sess->queue_depth)) {
2446 rtrs_err(sess->clt, "Incorrect total_len %d\n", total_len);
2447 return -EINVAL;
2448 }
2449
2450 return 0;
2451}
2452
2453static void rtrs_clt_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc)
2454{
2455 struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
2456 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
2457 struct rtrs_msg_info_rsp *msg;
2458 enum rtrs_clt_state state;
2459 struct rtrs_iu *iu;
2460 size_t rx_sz;
2461 int err;
2462
2463 state = RTRS_CLT_CONNECTING_ERR;
2464
2465 WARN_ON(con->c.cid);
2466 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
2467 if (unlikely(wc->status != IB_WC_SUCCESS)) {
2468 rtrs_err(sess->clt, "Sess info response recv failed: %s\n",
2469 ib_wc_status_msg(wc->status));
2470 goto out;
2471 }
2472 WARN_ON(wc->opcode != IB_WC_RECV);
2473
2474 if (unlikely(wc->byte_len < sizeof(*msg))) {
2475 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n",
2476 wc->byte_len);
2477 goto out;
2478 }
2479 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr,
2480 iu->size, DMA_FROM_DEVICE);
2481 msg = iu->buf;
2482 if (unlikely(le16_to_cpu(msg->type) != RTRS_MSG_INFO_RSP)) {
2483 rtrs_err(sess->clt, "Sess info response is malformed: type %d\n",
2484 le16_to_cpu(msg->type));
2485 goto out;
2486 }
2487 rx_sz = sizeof(*msg);
2488 rx_sz += sizeof(msg->desc[0]) * le16_to_cpu(msg->sg_cnt);
2489 if (unlikely(wc->byte_len < rx_sz)) {
2490 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n",
2491 wc->byte_len);
2492 goto out;
2493 }
2494 err = process_info_rsp(sess, msg);
2495 if (unlikely(err))
2496 goto out;
2497
2498 err = post_recv_sess(sess);
2499 if (unlikely(err))
2500 goto out;
2501
2502 state = RTRS_CLT_CONNECTED;
2503
2504out:
2505 rtrs_clt_update_wc_stats(con);
2506 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1);
2507 rtrs_clt_change_state_get_old(sess, state, NULL);
2508}
2509
2510static int rtrs_send_sess_info(struct rtrs_clt_sess *sess)
2511{
2512 struct rtrs_clt_con *usr_con = to_clt_con(sess->s.con[0]);
2513 struct rtrs_msg_info_req *msg;
2514 struct rtrs_iu *tx_iu, *rx_iu;
2515 size_t rx_sz;
2516 int err;
2517
2518 rx_sz = sizeof(struct rtrs_msg_info_rsp);
2519 rx_sz += sizeof(struct rtrs_sg_desc) * sess->queue_depth;
2520
2521 tx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), GFP_KERNEL,
2522 sess->s.dev->ib_dev, DMA_TO_DEVICE,
2523 rtrs_clt_info_req_done);
2524 rx_iu = rtrs_iu_alloc(1, rx_sz, GFP_KERNEL, sess->s.dev->ib_dev,
2525 DMA_FROM_DEVICE, rtrs_clt_info_rsp_done);
2526 if (unlikely(!tx_iu || !rx_iu)) {
2527 err = -ENOMEM;
2528 goto out;
2529 }
2530
2531 err = rtrs_iu_post_recv(&usr_con->c, rx_iu);
2532 if (unlikely(err)) {
2533 rtrs_err(sess->clt, "rtrs_iu_post_recv(), err: %d\n", err);
2534 goto out;
2535 }
2536 rx_iu = NULL;
2537
2538 msg = tx_iu->buf;
2539 msg->type = cpu_to_le16(RTRS_MSG_INFO_REQ);
2540 memcpy(msg->sessname, sess->s.sessname, sizeof(msg->sessname));
2541
2542 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr,
2543 tx_iu->size, DMA_TO_DEVICE);
2544
2545
2546 err = rtrs_iu_post_send(&usr_con->c, tx_iu, sizeof(*msg), NULL);
2547 if (unlikely(err)) {
2548 rtrs_err(sess->clt, "rtrs_iu_post_send(), err: %d\n", err);
2549 goto out;
2550 }
2551 tx_iu = NULL;
2552
2553
2554 wait_event_interruptible_timeout(sess->state_wq,
2555 sess->state != RTRS_CLT_CONNECTING,
2556 msecs_to_jiffies(
2557 RTRS_CONNECT_TIMEOUT_MS));
2558 if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)) {
2559 if (READ_ONCE(sess->state) == RTRS_CLT_CONNECTING_ERR)
2560 err = -ECONNRESET;
2561 else
2562 err = -ETIMEDOUT;
2563 }
2564
2565out:
2566 if (tx_iu)
2567 rtrs_iu_free(tx_iu, sess->s.dev->ib_dev, 1);
2568 if (rx_iu)
2569 rtrs_iu_free(rx_iu, sess->s.dev->ib_dev, 1);
2570 if (unlikely(err))
2571
2572 rtrs_clt_change_state_get_old(sess, RTRS_CLT_CONNECTING_ERR, NULL);
2573
2574 return err;
2575}
2576
2577
2578
2579
2580
2581
2582
2583static int init_sess(struct rtrs_clt_sess *sess)
2584{
2585 int err;
2586 char str[NAME_MAX];
2587 struct rtrs_addr path = {
2588 .src = &sess->s.src_addr,
2589 .dst = &sess->s.dst_addr,
2590 };
2591
2592 rtrs_addr_to_str(&path, str, sizeof(str));
2593
2594 mutex_lock(&sess->init_mutex);
2595 err = init_conns(sess);
2596 if (err) {
2597 rtrs_err(sess->clt,
2598 "init_conns() failed: err=%d path=%s [%s:%u]\n", err,
2599 str, sess->hca_name, sess->hca_port);
2600 goto out;
2601 }
2602 err = rtrs_send_sess_info(sess);
2603 if (err) {
2604 rtrs_err(
2605 sess->clt,
2606 "rtrs_send_sess_info() failed: err=%d path=%s [%s:%u]\n",
2607 err, str, sess->hca_name, sess->hca_port);
2608 goto out;
2609 }
2610 rtrs_clt_sess_up(sess);
2611out:
2612 mutex_unlock(&sess->init_mutex);
2613
2614 return err;
2615}
2616
2617static void rtrs_clt_reconnect_work(struct work_struct *work)
2618{
2619 struct rtrs_clt_sess *sess;
2620 struct rtrs_clt *clt;
2621 unsigned int delay_ms;
2622 int err;
2623
2624 sess = container_of(to_delayed_work(work), struct rtrs_clt_sess,
2625 reconnect_dwork);
2626 clt = sess->clt;
2627
2628 if (READ_ONCE(sess->state) != RTRS_CLT_RECONNECTING)
2629 return;
2630
2631 if (sess->reconnect_attempts >= clt->max_reconnect_attempts) {
2632
2633 rtrs_clt_close_conns(sess, false);
2634 return;
2635 }
2636 sess->reconnect_attempts++;
2637
2638
2639 rtrs_clt_stop_and_destroy_conns(sess);
2640 msleep(RTRS_RECONNECT_BACKOFF);
2641 if (rtrs_clt_change_state_get_old(sess, RTRS_CLT_CONNECTING, NULL)) {
2642 err = init_sess(sess);
2643 if (err)
2644 goto reconnect_again;
2645 }
2646
2647 return;
2648
2649reconnect_again:
2650 if (rtrs_clt_change_state_get_old(sess, RTRS_CLT_RECONNECTING, NULL)) {
2651 sess->stats->reconnects.fail_cnt++;
2652 delay_ms = clt->reconnect_delay_sec * 1000;
2653 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork,
2654 msecs_to_jiffies(delay_ms +
2655 prandom_u32() %
2656 RTRS_RECONNECT_SEED));
2657 }
2658}
2659
2660static void rtrs_clt_dev_release(struct device *dev)
2661{
2662 struct rtrs_clt *clt = container_of(dev, struct rtrs_clt, dev);
2663
2664 kfree(clt);
2665}
2666
2667static struct rtrs_clt *alloc_clt(const char *sessname, size_t paths_num,
2668 u16 port, size_t pdu_sz, void *priv,
2669 void (*link_ev)(void *priv,
2670 enum rtrs_clt_link_ev ev),
2671 unsigned int reconnect_delay_sec,
2672 unsigned int max_reconnect_attempts)
2673{
2674 struct rtrs_clt *clt;
2675 int err;
2676
2677 if (!paths_num || paths_num > MAX_PATHS_NUM)
2678 return ERR_PTR(-EINVAL);
2679
2680 if (strlen(sessname) >= sizeof(clt->sessname))
2681 return ERR_PTR(-EINVAL);
2682
2683 clt = kzalloc(sizeof(*clt), GFP_KERNEL);
2684 if (!clt)
2685 return ERR_PTR(-ENOMEM);
2686
2687 clt->pcpu_path = alloc_percpu(typeof(*clt->pcpu_path));
2688 if (!clt->pcpu_path) {
2689 kfree(clt);
2690 return ERR_PTR(-ENOMEM);
2691 }
2692
2693 uuid_gen(&clt->paths_uuid);
2694 INIT_LIST_HEAD_RCU(&clt->paths_list);
2695 clt->paths_num = paths_num;
2696 clt->paths_up = MAX_PATHS_NUM;
2697 clt->port = port;
2698 clt->pdu_sz = pdu_sz;
2699 clt->max_segments = RTRS_MAX_SEGMENTS;
2700 clt->reconnect_delay_sec = reconnect_delay_sec;
2701 clt->max_reconnect_attempts = max_reconnect_attempts;
2702 clt->priv = priv;
2703 clt->link_ev = link_ev;
2704 clt->mp_policy = MP_POLICY_MIN_INFLIGHT;
2705 strscpy(clt->sessname, sessname, sizeof(clt->sessname));
2706 init_waitqueue_head(&clt->permits_wait);
2707 mutex_init(&clt->paths_ev_mutex);
2708 mutex_init(&clt->paths_mutex);
2709
2710 clt->dev.class = rtrs_clt_dev_class;
2711 clt->dev.release = rtrs_clt_dev_release;
2712 err = dev_set_name(&clt->dev, "%s", sessname);
2713 if (err)
2714 goto err;
2715
2716
2717
2718
2719 dev_set_uevent_suppress(&clt->dev, true);
2720 err = device_register(&clt->dev);
2721 if (err) {
2722 put_device(&clt->dev);
2723 goto err;
2724 }
2725
2726 clt->kobj_paths = kobject_create_and_add("paths", &clt->dev.kobj);
2727 if (!clt->kobj_paths) {
2728 err = -ENOMEM;
2729 goto err_dev;
2730 }
2731 err = rtrs_clt_create_sysfs_root_files(clt);
2732 if (err) {
2733 kobject_del(clt->kobj_paths);
2734 kobject_put(clt->kobj_paths);
2735 goto err_dev;
2736 }
2737 dev_set_uevent_suppress(&clt->dev, false);
2738 kobject_uevent(&clt->dev.kobj, KOBJ_ADD);
2739
2740 return clt;
2741err_dev:
2742 device_unregister(&clt->dev);
2743err:
2744 free_percpu(clt->pcpu_path);
2745 kfree(clt);
2746 return ERR_PTR(err);
2747}
2748
2749static void free_clt(struct rtrs_clt *clt)
2750{
2751 free_permits(clt);
2752 free_percpu(clt->pcpu_path);
2753 mutex_destroy(&clt->paths_ev_mutex);
2754 mutex_destroy(&clt->paths_mutex);
2755
2756 device_unregister(&clt->dev);
2757}
2758
2759
2760
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777struct rtrs_clt *rtrs_clt_open(struct rtrs_clt_ops *ops,
2778 const char *sessname,
2779 const struct rtrs_addr *paths,
2780 size_t paths_num, u16 port,
2781 size_t pdu_sz, u8 reconnect_delay_sec,
2782 s16 max_reconnect_attempts, u32 nr_poll_queues)
2783{
2784 struct rtrs_clt_sess *sess, *tmp;
2785 struct rtrs_clt *clt;
2786 int err, i;
2787
2788 clt = alloc_clt(sessname, paths_num, port, pdu_sz, ops->priv,
2789 ops->link_ev,
2790 reconnect_delay_sec,
2791 max_reconnect_attempts);
2792 if (IS_ERR(clt)) {
2793 err = PTR_ERR(clt);
2794 goto out;
2795 }
2796 for (i = 0; i < paths_num; i++) {
2797 struct rtrs_clt_sess *sess;
2798
2799 sess = alloc_sess(clt, &paths[i], nr_cpu_ids,
2800 nr_poll_queues);
2801 if (IS_ERR(sess)) {
2802 err = PTR_ERR(sess);
2803 goto close_all_sess;
2804 }
2805 if (!i)
2806 sess->for_new_clt = 1;
2807 list_add_tail_rcu(&sess->s.entry, &clt->paths_list);
2808
2809 err = init_sess(sess);
2810 if (err) {
2811 list_del_rcu(&sess->s.entry);
2812 rtrs_clt_close_conns(sess, true);
2813 free_percpu(sess->stats->pcpu_stats);
2814 kfree(sess->stats);
2815 free_sess(sess);
2816 goto close_all_sess;
2817 }
2818
2819 err = rtrs_clt_create_sess_files(sess);
2820 if (err) {
2821 list_del_rcu(&sess->s.entry);
2822 rtrs_clt_close_conns(sess, true);
2823 free_percpu(sess->stats->pcpu_stats);
2824 kfree(sess->stats);
2825 free_sess(sess);
2826 goto close_all_sess;
2827 }
2828 }
2829 err = alloc_permits(clt);
2830 if (err)
2831 goto close_all_sess;
2832
2833 return clt;
2834
2835close_all_sess:
2836 list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) {
2837 rtrs_clt_destroy_sess_files(sess, NULL);
2838 rtrs_clt_close_conns(sess, true);
2839 kobject_put(&sess->kobj);
2840 }
2841 rtrs_clt_destroy_sysfs_root(clt);
2842 free_clt(clt);
2843
2844out:
2845 return ERR_PTR(err);
2846}
2847EXPORT_SYMBOL(rtrs_clt_open);
2848
2849
2850
2851
2852
2853void rtrs_clt_close(struct rtrs_clt *clt)
2854{
2855 struct rtrs_clt_sess *sess, *tmp;
2856
2857
2858 rtrs_clt_destroy_sysfs_root(clt);
2859
2860
2861 list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) {
2862 rtrs_clt_close_conns(sess, true);
2863 rtrs_clt_destroy_sess_files(sess, NULL);
2864 kobject_put(&sess->kobj);
2865 }
2866 free_clt(clt);
2867}
2868EXPORT_SYMBOL(rtrs_clt_close);
2869
2870int rtrs_clt_reconnect_from_sysfs(struct rtrs_clt_sess *sess)
2871{
2872 enum rtrs_clt_state old_state;
2873 int err = -EBUSY;
2874 bool changed;
2875
2876 changed = rtrs_clt_change_state_get_old(sess, RTRS_CLT_RECONNECTING,
2877 &old_state);
2878 if (changed) {
2879 sess->reconnect_attempts = 0;
2880 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 0);
2881 }
2882 if (changed || old_state == RTRS_CLT_RECONNECTING) {
2883
2884
2885
2886
2887
2888 flush_delayed_work(&sess->reconnect_dwork);
2889 err = (READ_ONCE(sess->state) ==
2890 RTRS_CLT_CONNECTED ? 0 : -ENOTCONN);
2891 }
2892
2893 return err;
2894}
2895
2896int rtrs_clt_remove_path_from_sysfs(struct rtrs_clt_sess *sess,
2897 const struct attribute *sysfs_self)
2898{
2899 enum rtrs_clt_state old_state;
2900 bool changed;
2901
2902
2903
2904
2905
2906
2907
2908
2909
2910
2911 do {
2912 rtrs_clt_close_conns(sess, true);
2913 changed = rtrs_clt_change_state_get_old(sess,
2914 RTRS_CLT_DEAD,
2915 &old_state);
2916 } while (!changed && old_state != RTRS_CLT_DEAD);
2917
2918 if (likely(changed)) {
2919 rtrs_clt_remove_path_from_arr(sess);
2920 rtrs_clt_destroy_sess_files(sess, sysfs_self);
2921 kobject_put(&sess->kobj);
2922 }
2923
2924 return 0;
2925}
2926
2927void rtrs_clt_set_max_reconnect_attempts(struct rtrs_clt *clt, int value)
2928{
2929 clt->max_reconnect_attempts = (unsigned int)value;
2930}
2931
2932int rtrs_clt_get_max_reconnect_attempts(const struct rtrs_clt *clt)
2933{
2934 return (int)clt->max_reconnect_attempts;
2935}
2936
2937
2938
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961int rtrs_clt_request(int dir, struct rtrs_clt_req_ops *ops,
2962 struct rtrs_clt *clt, struct rtrs_permit *permit,
2963 const struct kvec *vec, size_t nr, size_t data_len,
2964 struct scatterlist *sg, unsigned int sg_cnt)
2965{
2966 struct rtrs_clt_io_req *req;
2967 struct rtrs_clt_sess *sess;
2968
2969 enum dma_data_direction dma_dir;
2970 int err = -ECONNABORTED, i;
2971 size_t usr_len, hdr_len;
2972 struct path_it it;
2973
2974
2975 for (i = 0, usr_len = 0; i < nr; i++)
2976 usr_len += vec[i].iov_len;
2977
2978 if (dir == READ) {
2979 hdr_len = sizeof(struct rtrs_msg_rdma_read) +
2980 sg_cnt * sizeof(struct rtrs_sg_desc);
2981 dma_dir = DMA_FROM_DEVICE;
2982 } else {
2983 hdr_len = sizeof(struct rtrs_msg_rdma_write);
2984 dma_dir = DMA_TO_DEVICE;
2985 }
2986
2987 rcu_read_lock();
2988 for (path_it_init(&it, clt);
2989 (sess = it.next_path(&it)) && it.i < it.clt->paths_num; it.i++) {
2990 if (unlikely(READ_ONCE(sess->state) != RTRS_CLT_CONNECTED))
2991 continue;
2992
2993 if (unlikely(usr_len + hdr_len > sess->max_hdr_size)) {
2994 rtrs_wrn_rl(sess->clt,
2995 "%s request failed, user message size is %zu and header length %zu, but max size is %u\n",
2996 dir == READ ? "Read" : "Write",
2997 usr_len, hdr_len, sess->max_hdr_size);
2998 err = -EMSGSIZE;
2999 break;
3000 }
3001 req = rtrs_clt_get_req(sess, ops->conf_fn, permit, ops->priv,
3002 vec, usr_len, sg, sg_cnt, data_len,
3003 dma_dir);
3004 if (dir == READ)
3005 err = rtrs_clt_read_req(req);
3006 else
3007 err = rtrs_clt_write_req(req);
3008 if (unlikely(err)) {
3009 req->in_use = false;
3010 continue;
3011 }
3012
3013 break;
3014 }
3015 path_it_deinit(&it);
3016 rcu_read_unlock();
3017
3018 return err;
3019}
3020EXPORT_SYMBOL(rtrs_clt_request);
3021
3022int rtrs_clt_rdma_cq_direct(struct rtrs_clt *clt, unsigned int index)
3023{
3024
3025 int cnt = -1;
3026 struct rtrs_con *con;
3027 struct rtrs_clt_sess *sess;
3028 struct path_it it;
3029
3030 rcu_read_lock();
3031 for (path_it_init(&it, clt);
3032 (sess = it.next_path(&it)) && it.i < it.clt->paths_num; it.i++) {
3033 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)
3034 continue;
3035
3036 con = sess->s.con[index + 1];
3037 cnt = ib_process_cq_direct(con->cq, -1);
3038 if (cnt)
3039 break;
3040 }
3041 path_it_deinit(&it);
3042 rcu_read_unlock();
3043
3044 return cnt;
3045}
3046EXPORT_SYMBOL(rtrs_clt_rdma_cq_direct);
3047
3048
3049
3050
3051
3052
3053
3054
3055
3056int rtrs_clt_query(struct rtrs_clt *clt, struct rtrs_attrs *attr)
3057{
3058 if (!rtrs_clt_is_connected(clt))
3059 return -ECOMM;
3060
3061 attr->queue_depth = clt->queue_depth;
3062 attr->max_segments = clt->max_segments;
3063
3064 attr->max_io_size = min_t(int, clt->max_io_size,
3065 clt->max_segments * SZ_4K);
3066
3067 return 0;
3068}
3069EXPORT_SYMBOL(rtrs_clt_query);
3070
3071int rtrs_clt_create_path_from_sysfs(struct rtrs_clt *clt,
3072 struct rtrs_addr *addr)
3073{
3074 struct rtrs_clt_sess *sess;
3075 int err;
3076
3077 sess = alloc_sess(clt, addr, nr_cpu_ids, 0);
3078 if (IS_ERR(sess))
3079 return PTR_ERR(sess);
3080
3081
3082
3083
3084
3085
3086 rtrs_clt_add_path_to_arr(sess);
3087
3088 err = init_sess(sess);
3089 if (err)
3090 goto close_sess;
3091
3092 err = rtrs_clt_create_sess_files(sess);
3093 if (err)
3094 goto close_sess;
3095
3096 return 0;
3097
3098close_sess:
3099 rtrs_clt_remove_path_from_arr(sess);
3100 rtrs_clt_close_conns(sess, true);
3101 free_percpu(sess->stats->pcpu_stats);
3102 kfree(sess->stats);
3103 free_sess(sess);
3104
3105 return err;
3106}
3107
3108static int rtrs_clt_ib_dev_init(struct rtrs_ib_dev *dev)
3109{
3110 if (!(dev->ib_dev->attrs.device_cap_flags &
3111 IB_DEVICE_MEM_MGT_EXTENSIONS)) {
3112 pr_err("Memory registrations not supported.\n");
3113 return -ENOTSUPP;
3114 }
3115
3116 return 0;
3117}
3118
3119static const struct rtrs_rdma_dev_pd_ops dev_pd_ops = {
3120 .init = rtrs_clt_ib_dev_init
3121};
3122
3123static int __init rtrs_client_init(void)
3124{
3125 rtrs_rdma_dev_pd_init(0, &dev_pd);
3126
3127 rtrs_clt_dev_class = class_create(THIS_MODULE, "rtrs-client");
3128 if (IS_ERR(rtrs_clt_dev_class)) {
3129 pr_err("Failed to create rtrs-client dev class\n");
3130 return PTR_ERR(rtrs_clt_dev_class);
3131 }
3132 rtrs_wq = alloc_workqueue("rtrs_client_wq", 0, 0);
3133 if (!rtrs_wq) {
3134 class_destroy(rtrs_clt_dev_class);
3135 return -ENOMEM;
3136 }
3137
3138 return 0;
3139}
3140
3141static void __exit rtrs_client_exit(void)
3142{
3143 destroy_workqueue(rtrs_wq);
3144 class_destroy(rtrs_clt_dev_class);
3145 rtrs_rdma_dev_pd_deinit(&dev_pd);
3146}
3147
3148module_init(rtrs_client_init);
3149module_exit(rtrs_client_exit);
3150