1
2
3
4
5
6
7
8
9
10#undef pr_fmt
11#define pr_fmt(fmt) KBUILD_MODNAME " L" __stringify(__LINE__) ": " fmt
12
13#include <linux/module.h>
14#include <linux/rculist.h>
15#include <linux/random.h>
16
17#include "rtrs-clt.h"
18#include "rtrs-log.h"
19
20#define RTRS_CONNECT_TIMEOUT_MS 30000
21
22
23
24
25
26#define RTRS_RECONNECT_BACKOFF 1000
27
28
29
30
31
32#define RTRS_RECONNECT_SEED 8
33
34#define FIRST_CONN 0x01
35
36#define RTRS_MAX_SEGMENTS 128
37
38MODULE_DESCRIPTION("RDMA Transport Client");
39MODULE_LICENSE("GPL");
40
41static const struct rtrs_rdma_dev_pd_ops dev_pd_ops;
42static struct rtrs_rdma_dev_pd dev_pd = {
43 .ops = &dev_pd_ops
44};
45
46static struct workqueue_struct *rtrs_wq;
47static struct class *rtrs_clt_dev_class;
48
49static inline bool rtrs_clt_is_connected(const struct rtrs_clt *clt)
50{
51 struct rtrs_clt_sess *sess;
52 bool connected = false;
53
54 rcu_read_lock();
55 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry)
56 connected |= READ_ONCE(sess->state) == RTRS_CLT_CONNECTED;
57 rcu_read_unlock();
58
59 return connected;
60}
61
62static struct rtrs_permit *
63__rtrs_get_permit(struct rtrs_clt *clt, enum rtrs_clt_con_type con_type)
64{
65 size_t max_depth = clt->queue_depth;
66 struct rtrs_permit *permit;
67 int bit;
68
69
70
71
72
73
74
75
76 do {
77 bit = find_first_zero_bit(clt->permits_map, max_depth);
78 if (bit >= max_depth)
79 return NULL;
80 } while (test_and_set_bit_lock(bit, clt->permits_map));
81
82 permit = get_permit(clt, bit);
83 WARN_ON(permit->mem_id != bit);
84 permit->cpu_id = raw_smp_processor_id();
85 permit->con_type = con_type;
86
87 return permit;
88}
89
90static inline void __rtrs_put_permit(struct rtrs_clt *clt,
91 struct rtrs_permit *permit)
92{
93 clear_bit_unlock(permit->mem_id, clt->permits_map);
94}
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110struct rtrs_permit *rtrs_clt_get_permit(struct rtrs_clt *clt,
111 enum rtrs_clt_con_type con_type,
112 enum wait_type can_wait)
113{
114 struct rtrs_permit *permit;
115 DEFINE_WAIT(wait);
116
117 permit = __rtrs_get_permit(clt, con_type);
118 if (permit || !can_wait)
119 return permit;
120
121 do {
122 prepare_to_wait(&clt->permits_wait, &wait,
123 TASK_UNINTERRUPTIBLE);
124 permit = __rtrs_get_permit(clt, con_type);
125 if (permit)
126 break;
127
128 io_schedule();
129 } while (1);
130
131 finish_wait(&clt->permits_wait, &wait);
132
133 return permit;
134}
135EXPORT_SYMBOL(rtrs_clt_get_permit);
136
137
138
139
140
141
142
143
144
145void rtrs_clt_put_permit(struct rtrs_clt *clt, struct rtrs_permit *permit)
146{
147 if (WARN_ON(!test_bit(permit->mem_id, clt->permits_map)))
148 return;
149
150 __rtrs_put_permit(clt, permit);
151
152
153
154
155
156
157
158
159 if (waitqueue_active(&clt->permits_wait))
160 wake_up(&clt->permits_wait);
161}
162EXPORT_SYMBOL(rtrs_clt_put_permit);
163
164
165
166
167
168
169
170
171
172static
173struct rtrs_clt_con *rtrs_permit_to_clt_con(struct rtrs_clt_sess *sess,
174 struct rtrs_permit *permit)
175{
176 int id = 0;
177
178 if (permit->con_type == RTRS_IO_CON)
179 id = (permit->cpu_id % (sess->s.irq_con_num - 1)) + 1;
180
181 return to_clt_con(sess->s.con[id]);
182}
183
184
185
186
187
188
189
190
191
192
193
194
195
196static bool rtrs_clt_change_state(struct rtrs_clt_sess *sess,
197 enum rtrs_clt_state new_state)
198{
199 enum rtrs_clt_state old_state;
200 bool changed = false;
201
202 lockdep_assert_held(&sess->state_wq.lock);
203
204 old_state = sess->state;
205 switch (new_state) {
206 case RTRS_CLT_CONNECTING:
207 switch (old_state) {
208 case RTRS_CLT_RECONNECTING:
209 changed = true;
210 fallthrough;
211 default:
212 break;
213 }
214 break;
215 case RTRS_CLT_RECONNECTING:
216 switch (old_state) {
217 case RTRS_CLT_CONNECTED:
218 case RTRS_CLT_CONNECTING_ERR:
219 case RTRS_CLT_CLOSED:
220 changed = true;
221 fallthrough;
222 default:
223 break;
224 }
225 break;
226 case RTRS_CLT_CONNECTED:
227 switch (old_state) {
228 case RTRS_CLT_CONNECTING:
229 changed = true;
230 fallthrough;
231 default:
232 break;
233 }
234 break;
235 case RTRS_CLT_CONNECTING_ERR:
236 switch (old_state) {
237 case RTRS_CLT_CONNECTING:
238 changed = true;
239 fallthrough;
240 default:
241 break;
242 }
243 break;
244 case RTRS_CLT_CLOSING:
245 switch (old_state) {
246 case RTRS_CLT_CONNECTING:
247 case RTRS_CLT_CONNECTING_ERR:
248 case RTRS_CLT_RECONNECTING:
249 case RTRS_CLT_CONNECTED:
250 changed = true;
251 fallthrough;
252 default:
253 break;
254 }
255 break;
256 case RTRS_CLT_CLOSED:
257 switch (old_state) {
258 case RTRS_CLT_CLOSING:
259 changed = true;
260 fallthrough;
261 default:
262 break;
263 }
264 break;
265 case RTRS_CLT_DEAD:
266 switch (old_state) {
267 case RTRS_CLT_CLOSED:
268 changed = true;
269 fallthrough;
270 default:
271 break;
272 }
273 break;
274 default:
275 break;
276 }
277 if (changed) {
278 sess->state = new_state;
279 wake_up_locked(&sess->state_wq);
280 }
281
282 return changed;
283}
284
285static bool rtrs_clt_change_state_from_to(struct rtrs_clt_sess *sess,
286 enum rtrs_clt_state old_state,
287 enum rtrs_clt_state new_state)
288{
289 bool changed = false;
290
291 spin_lock_irq(&sess->state_wq.lock);
292 if (sess->state == old_state)
293 changed = rtrs_clt_change_state(sess, new_state);
294 spin_unlock_irq(&sess->state_wq.lock);
295
296 return changed;
297}
298
299static void rtrs_rdma_error_recovery(struct rtrs_clt_con *con)
300{
301 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
302
303 if (rtrs_clt_change_state_from_to(sess,
304 RTRS_CLT_CONNECTED,
305 RTRS_CLT_RECONNECTING)) {
306 struct rtrs_clt *clt = sess->clt;
307 unsigned int delay_ms;
308
309
310
311
312 delay_ms = clt->reconnect_delay_sec * 1000;
313 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork,
314 msecs_to_jiffies(delay_ms +
315 prandom_u32() % RTRS_RECONNECT_SEED));
316 } else {
317
318
319
320
321
322 rtrs_clt_change_state_from_to(sess,
323 RTRS_CLT_CONNECTING,
324 RTRS_CLT_CONNECTING_ERR);
325 }
326}
327
328static void rtrs_clt_fast_reg_done(struct ib_cq *cq, struct ib_wc *wc)
329{
330 struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
331
332 if (wc->status != IB_WC_SUCCESS) {
333 rtrs_err(con->c.sess, "Failed IB_WR_REG_MR: %s\n",
334 ib_wc_status_msg(wc->status));
335 rtrs_rdma_error_recovery(con);
336 }
337}
338
339static struct ib_cqe fast_reg_cqe = {
340 .done = rtrs_clt_fast_reg_done
341};
342
343static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno,
344 bool notify, bool can_wait);
345
346static void rtrs_clt_inv_rkey_done(struct ib_cq *cq, struct ib_wc *wc)
347{
348 struct rtrs_clt_io_req *req =
349 container_of(wc->wr_cqe, typeof(*req), inv_cqe);
350 struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
351
352 if (wc->status != IB_WC_SUCCESS) {
353 rtrs_err(con->c.sess, "Failed IB_WR_LOCAL_INV: %s\n",
354 ib_wc_status_msg(wc->status));
355 rtrs_rdma_error_recovery(con);
356 }
357 req->need_inv = false;
358 if (req->need_inv_comp)
359 complete(&req->inv_comp);
360 else
361
362 complete_rdma_req(req, req->inv_errno, true, false);
363}
364
365static int rtrs_inv_rkey(struct rtrs_clt_io_req *req)
366{
367 struct rtrs_clt_con *con = req->con;
368 struct ib_send_wr wr = {
369 .opcode = IB_WR_LOCAL_INV,
370 .wr_cqe = &req->inv_cqe,
371 .send_flags = IB_SEND_SIGNALED,
372 .ex.invalidate_rkey = req->mr->rkey,
373 };
374 req->inv_cqe.done = rtrs_clt_inv_rkey_done;
375
376 return ib_post_send(con->c.qp, &wr, NULL);
377}
378
379static void complete_rdma_req(struct rtrs_clt_io_req *req, int errno,
380 bool notify, bool can_wait)
381{
382 struct rtrs_clt_con *con = req->con;
383 struct rtrs_clt_sess *sess;
384 int err;
385
386 if (WARN_ON(!req->in_use))
387 return;
388 if (WARN_ON(!req->con))
389 return;
390 sess = to_clt_sess(con->c.sess);
391
392 if (req->sg_cnt) {
393 if (req->dir == DMA_FROM_DEVICE && req->need_inv) {
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408 if (can_wait) {
409 req->need_inv_comp = true;
410 } else {
411
412 WARN_ON(!notify);
413
414 req->inv_errno = errno;
415 }
416
417 refcount_inc(&req->ref);
418 err = rtrs_inv_rkey(req);
419 if (err) {
420 rtrs_err(con->c.sess, "Send INV WR key=%#x: %d\n",
421 req->mr->rkey, err);
422 } else if (can_wait) {
423 wait_for_completion(&req->inv_comp);
424 } else {
425
426
427
428
429 WARN_ON_ONCE(1);
430
431 return;
432 }
433 if (!refcount_dec_and_test(&req->ref))
434 return;
435 }
436 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist,
437 req->sg_cnt, req->dir);
438 }
439 if (!refcount_dec_and_test(&req->ref))
440 return;
441 if (req->mp_policy == MP_POLICY_MIN_INFLIGHT)
442 atomic_dec(&sess->stats->inflight);
443
444 req->in_use = false;
445 req->con = NULL;
446
447 if (errno) {
448 rtrs_err_rl(con->c.sess, "IO request failed: error=%d path=%s [%s:%u] notify=%d\n",
449 errno, kobject_name(&sess->kobj), sess->hca_name,
450 sess->hca_port, notify);
451 }
452
453 if (notify)
454 req->conf(req->priv, errno);
455}
456
457static int rtrs_post_send_rdma(struct rtrs_clt_con *con,
458 struct rtrs_clt_io_req *req,
459 struct rtrs_rbuf *rbuf, u32 off,
460 u32 imm, struct ib_send_wr *wr)
461{
462 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
463 enum ib_send_flags flags;
464 struct ib_sge sge;
465
466 if (!req->sg_size) {
467 rtrs_wrn(con->c.sess,
468 "Doing RDMA Write failed, no data supplied\n");
469 return -EINVAL;
470 }
471
472
473 sge.addr = req->iu->dma_addr;
474 sge.length = req->sg_size;
475 sge.lkey = sess->s.dev->ib_pd->local_dma_lkey;
476
477
478
479
480
481 flags = atomic_inc_return(&con->c.wr_cnt) % sess->s.signal_interval ?
482 0 : IB_SEND_SIGNALED;
483
484 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr,
485 req->sg_size, DMA_TO_DEVICE);
486
487 return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, &sge, 1,
488 rbuf->rkey, rbuf->addr + off,
489 imm, flags, wr, NULL);
490}
491
492static void process_io_rsp(struct rtrs_clt_sess *sess, u32 msg_id,
493 s16 errno, bool w_inval)
494{
495 struct rtrs_clt_io_req *req;
496
497 if (WARN_ON(msg_id >= sess->queue_depth))
498 return;
499
500 req = &sess->reqs[msg_id];
501
502 req->need_inv &= !w_inval;
503 complete_rdma_req(req, errno, true, false);
504}
505
506static void rtrs_clt_recv_done(struct rtrs_clt_con *con, struct ib_wc *wc)
507{
508 struct rtrs_iu *iu;
509 int err;
510 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
511
512 WARN_ON((sess->flags & RTRS_MSG_NEW_RKEY_F) == 0);
513 iu = container_of(wc->wr_cqe, struct rtrs_iu,
514 cqe);
515 err = rtrs_iu_post_recv(&con->c, iu);
516 if (err) {
517 rtrs_err(con->c.sess, "post iu failed %d\n", err);
518 rtrs_rdma_error_recovery(con);
519 }
520}
521
522static void rtrs_clt_rkey_rsp_done(struct rtrs_clt_con *con, struct ib_wc *wc)
523{
524 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
525 struct rtrs_msg_rkey_rsp *msg;
526 u32 imm_type, imm_payload;
527 bool w_inval = false;
528 struct rtrs_iu *iu;
529 u32 buf_id;
530 int err;
531
532 WARN_ON((sess->flags & RTRS_MSG_NEW_RKEY_F) == 0);
533
534 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
535
536 if (wc->byte_len < sizeof(*msg)) {
537 rtrs_err(con->c.sess, "rkey response is malformed: size %d\n",
538 wc->byte_len);
539 goto out;
540 }
541 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr,
542 iu->size, DMA_FROM_DEVICE);
543 msg = iu->buf;
544 if (le16_to_cpu(msg->type) != RTRS_MSG_RKEY_RSP) {
545 rtrs_err(sess->clt, "rkey response is malformed: type %d\n",
546 le16_to_cpu(msg->type));
547 goto out;
548 }
549 buf_id = le16_to_cpu(msg->buf_id);
550 if (WARN_ON(buf_id >= sess->queue_depth))
551 goto out;
552
553 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data), &imm_type, &imm_payload);
554 if (imm_type == RTRS_IO_RSP_IMM ||
555 imm_type == RTRS_IO_RSP_W_INV_IMM) {
556 u32 msg_id;
557
558 w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM);
559 rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err);
560
561 if (WARN_ON(buf_id != msg_id))
562 goto out;
563 sess->rbufs[buf_id].rkey = le32_to_cpu(msg->rkey);
564 process_io_rsp(sess, msg_id, err, w_inval);
565 }
566 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, iu->dma_addr,
567 iu->size, DMA_FROM_DEVICE);
568 return rtrs_clt_recv_done(con, wc);
569out:
570 rtrs_rdma_error_recovery(con);
571}
572
573static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc);
574
575static struct ib_cqe io_comp_cqe = {
576 .done = rtrs_clt_rdma_done
577};
578
579
580
581
582
583static int rtrs_post_recv_empty_x2(struct rtrs_con *con, struct ib_cqe *cqe)
584{
585 struct ib_recv_wr wr_arr[2], *wr;
586 int i;
587
588 memset(wr_arr, 0, sizeof(wr_arr));
589 for (i = 0; i < ARRAY_SIZE(wr_arr); i++) {
590 wr = &wr_arr[i];
591 wr->wr_cqe = cqe;
592 if (i)
593
594 wr->next = &wr_arr[i - 1];
595 }
596
597 return ib_post_recv(con->qp, wr, NULL);
598}
599
600static void rtrs_clt_rdma_done(struct ib_cq *cq, struct ib_wc *wc)
601{
602 struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
603 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
604 u32 imm_type, imm_payload;
605 bool w_inval = false;
606 int err;
607
608 if (wc->status != IB_WC_SUCCESS) {
609 if (wc->status != IB_WC_WR_FLUSH_ERR) {
610 rtrs_err(sess->clt, "RDMA failed: %s\n",
611 ib_wc_status_msg(wc->status));
612 rtrs_rdma_error_recovery(con);
613 }
614 return;
615 }
616 rtrs_clt_update_wc_stats(con);
617
618 switch (wc->opcode) {
619 case IB_WC_RECV_RDMA_WITH_IMM:
620
621
622
623
624 if (WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done))
625 return;
626 rtrs_from_imm(be32_to_cpu(wc->ex.imm_data),
627 &imm_type, &imm_payload);
628 if (imm_type == RTRS_IO_RSP_IMM ||
629 imm_type == RTRS_IO_RSP_W_INV_IMM) {
630 u32 msg_id;
631
632 w_inval = (imm_type == RTRS_IO_RSP_W_INV_IMM);
633 rtrs_from_io_rsp_imm(imm_payload, &msg_id, &err);
634
635 process_io_rsp(sess, msg_id, err, w_inval);
636 } else if (imm_type == RTRS_HB_MSG_IMM) {
637 WARN_ON(con->c.cid);
638 rtrs_send_hb_ack(&sess->s);
639 if (sess->flags & RTRS_MSG_NEW_RKEY_F)
640 return rtrs_clt_recv_done(con, wc);
641 } else if (imm_type == RTRS_HB_ACK_IMM) {
642 WARN_ON(con->c.cid);
643 sess->s.hb_missed_cnt = 0;
644 sess->s.hb_cur_latency =
645 ktime_sub(ktime_get(), sess->s.hb_last_sent);
646 if (sess->flags & RTRS_MSG_NEW_RKEY_F)
647 return rtrs_clt_recv_done(con, wc);
648 } else {
649 rtrs_wrn(con->c.sess, "Unknown IMM type %u\n",
650 imm_type);
651 }
652 if (w_inval)
653
654
655
656
657 err = rtrs_post_recv_empty_x2(&con->c, &io_comp_cqe);
658 else
659 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
660 if (err) {
661 rtrs_err(con->c.sess, "rtrs_post_recv_empty(): %d\n",
662 err);
663 rtrs_rdma_error_recovery(con);
664 }
665 break;
666 case IB_WC_RECV:
667
668
669
670 WARN_ON(!(wc->wc_flags & IB_WC_WITH_INVALIDATE ||
671 wc->wc_flags & IB_WC_WITH_IMM));
672 WARN_ON(wc->wr_cqe->done != rtrs_clt_rdma_done);
673 if (sess->flags & RTRS_MSG_NEW_RKEY_F) {
674 if (wc->wc_flags & IB_WC_WITH_INVALIDATE)
675 return rtrs_clt_recv_done(con, wc);
676
677 return rtrs_clt_rkey_rsp_done(con, wc);
678 }
679 break;
680 case IB_WC_RDMA_WRITE:
681
682
683
684
685 break;
686
687 default:
688 rtrs_wrn(sess->clt, "Unexpected WC type: %d\n", wc->opcode);
689 return;
690 }
691}
692
693static int post_recv_io(struct rtrs_clt_con *con, size_t q_size)
694{
695 int err, i;
696 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
697
698 for (i = 0; i < q_size; i++) {
699 if (sess->flags & RTRS_MSG_NEW_RKEY_F) {
700 struct rtrs_iu *iu = &con->rsp_ius[i];
701
702 err = rtrs_iu_post_recv(&con->c, iu);
703 } else {
704 err = rtrs_post_recv_empty(&con->c, &io_comp_cqe);
705 }
706 if (err)
707 return err;
708 }
709
710 return 0;
711}
712
713static int post_recv_sess(struct rtrs_clt_sess *sess)
714{
715 size_t q_size = 0;
716 int err, cid;
717
718 for (cid = 0; cid < sess->s.con_num; cid++) {
719 if (cid == 0)
720 q_size = SERVICE_CON_QUEUE_DEPTH;
721 else
722 q_size = sess->queue_depth;
723
724
725
726
727
728 q_size *= 2;
729
730 err = post_recv_io(to_clt_con(sess->s.con[cid]), q_size);
731 if (err) {
732 rtrs_err(sess->clt, "post_recv_io(), err: %d\n", err);
733 return err;
734 }
735 }
736
737 return 0;
738}
739
740struct path_it {
741 int i;
742 struct list_head skip_list;
743 struct rtrs_clt *clt;
744 struct rtrs_clt_sess *(*next_path)(struct path_it *it);
745};
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760#define list_next_or_null_rr_rcu(head, ptr, type, memb) \
761({ \
762 list_next_or_null_rcu(head, ptr, type, memb) ?: \
763 list_next_or_null_rcu(head, READ_ONCE((ptr)->next), \
764 type, memb); \
765})
766
767
768
769
770
771
772
773
774
775
776static struct rtrs_clt_sess *get_next_path_rr(struct path_it *it)
777{
778 struct rtrs_clt_sess __rcu **ppcpu_path;
779 struct rtrs_clt_sess *path;
780 struct rtrs_clt *clt;
781
782 clt = it->clt;
783
784
785
786
787
788
789
790 ppcpu_path = this_cpu_ptr(clt->pcpu_path);
791 path = rcu_dereference(*ppcpu_path);
792 if (!path)
793 path = list_first_or_null_rcu(&clt->paths_list,
794 typeof(*path), s.entry);
795 else
796 path = list_next_or_null_rr_rcu(&clt->paths_list,
797 &path->s.entry,
798 typeof(*path),
799 s.entry);
800 rcu_assign_pointer(*ppcpu_path, path);
801
802 return path;
803}
804
805
806
807
808
809
810
811
812
813
814static struct rtrs_clt_sess *get_next_path_min_inflight(struct path_it *it)
815{
816 struct rtrs_clt_sess *min_path = NULL;
817 struct rtrs_clt *clt = it->clt;
818 struct rtrs_clt_sess *sess;
819 int min_inflight = INT_MAX;
820 int inflight;
821
822 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) {
823 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)
824 continue;
825
826 if (!list_empty(raw_cpu_ptr(sess->mp_skip_entry)))
827 continue;
828
829 inflight = atomic_read(&sess->stats->inflight);
830
831 if (inflight < min_inflight) {
832 min_inflight = inflight;
833 min_path = sess;
834 }
835 }
836
837
838
839
840
841 if (min_path)
842 list_add(raw_cpu_ptr(min_path->mp_skip_entry), &it->skip_list);
843
844 return min_path;
845}
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865static struct rtrs_clt_sess *get_next_path_min_latency(struct path_it *it)
866{
867 struct rtrs_clt_sess *min_path = NULL;
868 struct rtrs_clt *clt = it->clt;
869 struct rtrs_clt_sess *sess;
870 ktime_t min_latency = INT_MAX;
871 ktime_t latency;
872
873 list_for_each_entry_rcu(sess, &clt->paths_list, s.entry) {
874 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)
875 continue;
876
877 if (!list_empty(raw_cpu_ptr(sess->mp_skip_entry)))
878 continue;
879
880 latency = sess->s.hb_cur_latency;
881
882 if (latency < min_latency) {
883 min_latency = latency;
884 min_path = sess;
885 }
886 }
887
888
889
890
891
892 if (min_path)
893 list_add(raw_cpu_ptr(min_path->mp_skip_entry), &it->skip_list);
894
895 return min_path;
896}
897
898static inline void path_it_init(struct path_it *it, struct rtrs_clt *clt)
899{
900 INIT_LIST_HEAD(&it->skip_list);
901 it->clt = clt;
902 it->i = 0;
903
904 if (clt->mp_policy == MP_POLICY_RR)
905 it->next_path = get_next_path_rr;
906 else if (clt->mp_policy == MP_POLICY_MIN_INFLIGHT)
907 it->next_path = get_next_path_min_inflight;
908 else
909 it->next_path = get_next_path_min_latency;
910}
911
912static inline void path_it_deinit(struct path_it *it)
913{
914 struct list_head *skip, *tmp;
915
916
917
918
919
920 list_for_each_safe(skip, tmp, &it->skip_list)
921 list_del_init(skip);
922}
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942static void rtrs_clt_init_req(struct rtrs_clt_io_req *req,
943 struct rtrs_clt_sess *sess,
944 void (*conf)(void *priv, int errno),
945 struct rtrs_permit *permit, void *priv,
946 const struct kvec *vec, size_t usr_len,
947 struct scatterlist *sg, size_t sg_cnt,
948 size_t data_len, int dir)
949{
950 struct iov_iter iter;
951 size_t len;
952
953 req->permit = permit;
954 req->in_use = true;
955 req->usr_len = usr_len;
956 req->data_len = data_len;
957 req->sglist = sg;
958 req->sg_cnt = sg_cnt;
959 req->priv = priv;
960 req->dir = dir;
961 req->con = rtrs_permit_to_clt_con(sess, permit);
962 req->conf = conf;
963 req->need_inv = false;
964 req->need_inv_comp = false;
965 req->inv_errno = 0;
966 refcount_set(&req->ref, 1);
967 req->mp_policy = sess->clt->mp_policy;
968
969 iov_iter_kvec(&iter, READ, vec, 1, usr_len);
970 len = _copy_from_iter(req->iu->buf, usr_len, &iter);
971 WARN_ON(len != usr_len);
972
973 reinit_completion(&req->inv_comp);
974}
975
976static struct rtrs_clt_io_req *
977rtrs_clt_get_req(struct rtrs_clt_sess *sess,
978 void (*conf)(void *priv, int errno),
979 struct rtrs_permit *permit, void *priv,
980 const struct kvec *vec, size_t usr_len,
981 struct scatterlist *sg, size_t sg_cnt,
982 size_t data_len, int dir)
983{
984 struct rtrs_clt_io_req *req;
985
986 req = &sess->reqs[permit->mem_id];
987 rtrs_clt_init_req(req, sess, conf, permit, priv, vec, usr_len,
988 sg, sg_cnt, data_len, dir);
989 return req;
990}
991
992static struct rtrs_clt_io_req *
993rtrs_clt_get_copy_req(struct rtrs_clt_sess *alive_sess,
994 struct rtrs_clt_io_req *fail_req)
995{
996 struct rtrs_clt_io_req *req;
997 struct kvec vec = {
998 .iov_base = fail_req->iu->buf,
999 .iov_len = fail_req->usr_len
1000 };
1001
1002 req = &alive_sess->reqs[fail_req->permit->mem_id];
1003 rtrs_clt_init_req(req, alive_sess, fail_req->conf, fail_req->permit,
1004 fail_req->priv, &vec, fail_req->usr_len,
1005 fail_req->sglist, fail_req->sg_cnt,
1006 fail_req->data_len, fail_req->dir);
1007 return req;
1008}
1009
1010static int rtrs_post_rdma_write_sg(struct rtrs_clt_con *con,
1011 struct rtrs_clt_io_req *req,
1012 struct rtrs_rbuf *rbuf, bool fr_en,
1013 u32 size, u32 imm, struct ib_send_wr *wr,
1014 struct ib_send_wr *tail)
1015{
1016 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1017 struct ib_sge *sge = req->sge;
1018 enum ib_send_flags flags;
1019 struct scatterlist *sg;
1020 size_t num_sge;
1021 int i;
1022 struct ib_send_wr *ptail = NULL;
1023
1024 if (fr_en) {
1025 i = 0;
1026 sge[i].addr = req->mr->iova;
1027 sge[i].length = req->mr->length;
1028 sge[i].lkey = req->mr->lkey;
1029 i++;
1030 num_sge = 2;
1031 ptail = tail;
1032 } else {
1033 for_each_sg(req->sglist, sg, req->sg_cnt, i) {
1034 sge[i].addr = sg_dma_address(sg);
1035 sge[i].length = sg_dma_len(sg);
1036 sge[i].lkey = sess->s.dev->ib_pd->local_dma_lkey;
1037 }
1038 num_sge = 1 + req->sg_cnt;
1039 }
1040 sge[i].addr = req->iu->dma_addr;
1041 sge[i].length = size;
1042 sge[i].lkey = sess->s.dev->ib_pd->local_dma_lkey;
1043
1044
1045
1046
1047
1048 flags = atomic_inc_return(&con->c.wr_cnt) % sess->s.signal_interval ?
1049 0 : IB_SEND_SIGNALED;
1050
1051 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, req->iu->dma_addr,
1052 size, DMA_TO_DEVICE);
1053
1054 return rtrs_iu_post_rdma_write_imm(&con->c, req->iu, sge, num_sge,
1055 rbuf->rkey, rbuf->addr, imm,
1056 flags, wr, ptail);
1057}
1058
1059static int rtrs_map_sg_fr(struct rtrs_clt_io_req *req, size_t count)
1060{
1061 int nr;
1062
1063
1064 nr = ib_map_mr_sg(req->mr, req->sglist, count, NULL, SZ_4K);
1065 if (nr < 0)
1066 return nr;
1067 if (nr < req->sg_cnt)
1068 return -EINVAL;
1069 ib_update_fast_reg_key(req->mr, ib_inc_rkey(req->mr->rkey));
1070
1071 return nr;
1072}
1073
1074static int rtrs_clt_write_req(struct rtrs_clt_io_req *req)
1075{
1076 struct rtrs_clt_con *con = req->con;
1077 struct rtrs_sess *s = con->c.sess;
1078 struct rtrs_clt_sess *sess = to_clt_sess(s);
1079 struct rtrs_msg_rdma_write *msg;
1080
1081 struct rtrs_rbuf *rbuf;
1082 int ret, count = 0;
1083 u32 imm, buf_id;
1084 struct ib_reg_wr rwr;
1085 struct ib_send_wr inv_wr;
1086 struct ib_send_wr *wr = NULL;
1087 bool fr_en = false;
1088
1089 const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len;
1090
1091 if (tsize > sess->chunk_size) {
1092 rtrs_wrn(s, "Write request failed, size too big %zu > %d\n",
1093 tsize, sess->chunk_size);
1094 return -EMSGSIZE;
1095 }
1096 if (req->sg_cnt) {
1097 count = ib_dma_map_sg(sess->s.dev->ib_dev, req->sglist,
1098 req->sg_cnt, req->dir);
1099 if (!count) {
1100 rtrs_wrn(s, "Write request failed, map failed\n");
1101 return -EINVAL;
1102 }
1103 }
1104
1105 msg = req->iu->buf + req->usr_len;
1106 msg->type = cpu_to_le16(RTRS_MSG_WRITE);
1107 msg->usr_len = cpu_to_le16(req->usr_len);
1108
1109
1110 imm = req->permit->mem_off + req->data_len + req->usr_len;
1111 imm = rtrs_to_io_req_imm(imm);
1112 buf_id = req->permit->mem_id;
1113 req->sg_size = tsize;
1114 rbuf = &sess->rbufs[buf_id];
1115
1116 if (count) {
1117 ret = rtrs_map_sg_fr(req, count);
1118 if (ret < 0) {
1119 rtrs_err_rl(s,
1120 "Write request failed, failed to map fast reg. data, err: %d\n",
1121 ret);
1122 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist,
1123 req->sg_cnt, req->dir);
1124 return ret;
1125 }
1126 inv_wr = (struct ib_send_wr) {
1127 .opcode = IB_WR_LOCAL_INV,
1128 .wr_cqe = &req->inv_cqe,
1129 .send_flags = IB_SEND_SIGNALED,
1130 .ex.invalidate_rkey = req->mr->rkey,
1131 };
1132 req->inv_cqe.done = rtrs_clt_inv_rkey_done;
1133 rwr = (struct ib_reg_wr) {
1134 .wr.opcode = IB_WR_REG_MR,
1135 .wr.wr_cqe = &fast_reg_cqe,
1136 .mr = req->mr,
1137 .key = req->mr->rkey,
1138 .access = (IB_ACCESS_LOCAL_WRITE),
1139 };
1140 wr = &rwr.wr;
1141 fr_en = true;
1142 refcount_inc(&req->ref);
1143 }
1144
1145
1146
1147
1148 rtrs_clt_update_all_stats(req, WRITE);
1149
1150 ret = rtrs_post_rdma_write_sg(req->con, req, rbuf, fr_en,
1151 req->usr_len + sizeof(*msg),
1152 imm, wr, &inv_wr);
1153 if (ret) {
1154 rtrs_err_rl(s,
1155 "Write request failed: error=%d path=%s [%s:%u]\n",
1156 ret, kobject_name(&sess->kobj), sess->hca_name,
1157 sess->hca_port);
1158 if (req->mp_policy == MP_POLICY_MIN_INFLIGHT)
1159 atomic_dec(&sess->stats->inflight);
1160 if (req->sg_cnt)
1161 ib_dma_unmap_sg(sess->s.dev->ib_dev, req->sglist,
1162 req->sg_cnt, req->dir);
1163 }
1164
1165 return ret;
1166}
1167
1168static int rtrs_clt_read_req(struct rtrs_clt_io_req *req)
1169{
1170 struct rtrs_clt_con *con = req->con;
1171 struct rtrs_sess *s = con->c.sess;
1172 struct rtrs_clt_sess *sess = to_clt_sess(s);
1173 struct rtrs_msg_rdma_read *msg;
1174 struct rtrs_ib_dev *dev = sess->s.dev;
1175
1176 struct ib_reg_wr rwr;
1177 struct ib_send_wr *wr = NULL;
1178
1179 int ret, count = 0;
1180 u32 imm, buf_id;
1181
1182 const size_t tsize = sizeof(*msg) + req->data_len + req->usr_len;
1183
1184 if (tsize > sess->chunk_size) {
1185 rtrs_wrn(s,
1186 "Read request failed, message size is %zu, bigger than CHUNK_SIZE %d\n",
1187 tsize, sess->chunk_size);
1188 return -EMSGSIZE;
1189 }
1190
1191 if (req->sg_cnt) {
1192 count = ib_dma_map_sg(dev->ib_dev, req->sglist, req->sg_cnt,
1193 req->dir);
1194 if (!count) {
1195 rtrs_wrn(s,
1196 "Read request failed, dma map failed\n");
1197 return -EINVAL;
1198 }
1199 }
1200
1201 msg = req->iu->buf + req->usr_len;
1202 msg->type = cpu_to_le16(RTRS_MSG_READ);
1203 msg->usr_len = cpu_to_le16(req->usr_len);
1204
1205 if (count) {
1206 ret = rtrs_map_sg_fr(req, count);
1207 if (ret < 0) {
1208 rtrs_err_rl(s,
1209 "Read request failed, failed to map fast reg. data, err: %d\n",
1210 ret);
1211 ib_dma_unmap_sg(dev->ib_dev, req->sglist, req->sg_cnt,
1212 req->dir);
1213 return ret;
1214 }
1215 rwr = (struct ib_reg_wr) {
1216 .wr.opcode = IB_WR_REG_MR,
1217 .wr.wr_cqe = &fast_reg_cqe,
1218 .mr = req->mr,
1219 .key = req->mr->rkey,
1220 .access = (IB_ACCESS_LOCAL_WRITE |
1221 IB_ACCESS_REMOTE_WRITE),
1222 };
1223 wr = &rwr.wr;
1224
1225 msg->sg_cnt = cpu_to_le16(1);
1226 msg->flags = cpu_to_le16(RTRS_MSG_NEED_INVAL_F);
1227
1228 msg->desc[0].addr = cpu_to_le64(req->mr->iova);
1229 msg->desc[0].key = cpu_to_le32(req->mr->rkey);
1230 msg->desc[0].len = cpu_to_le32(req->mr->length);
1231
1232
1233 req->need_inv = !!RTRS_MSG_NEED_INVAL_F;
1234
1235 } else {
1236 msg->sg_cnt = 0;
1237 msg->flags = 0;
1238 }
1239
1240
1241
1242
1243 imm = req->permit->mem_off + req->data_len + req->usr_len;
1244 imm = rtrs_to_io_req_imm(imm);
1245 buf_id = req->permit->mem_id;
1246
1247 req->sg_size = sizeof(*msg);
1248 req->sg_size += le16_to_cpu(msg->sg_cnt) * sizeof(struct rtrs_sg_desc);
1249 req->sg_size += req->usr_len;
1250
1251
1252
1253
1254
1255 rtrs_clt_update_all_stats(req, READ);
1256
1257 ret = rtrs_post_send_rdma(req->con, req, &sess->rbufs[buf_id],
1258 req->data_len, imm, wr);
1259 if (ret) {
1260 rtrs_err_rl(s,
1261 "Read request failed: error=%d path=%s [%s:%u]\n",
1262 ret, kobject_name(&sess->kobj), sess->hca_name,
1263 sess->hca_port);
1264 if (req->mp_policy == MP_POLICY_MIN_INFLIGHT)
1265 atomic_dec(&sess->stats->inflight);
1266 req->need_inv = false;
1267 if (req->sg_cnt)
1268 ib_dma_unmap_sg(dev->ib_dev, req->sglist,
1269 req->sg_cnt, req->dir);
1270 }
1271
1272 return ret;
1273}
1274
1275
1276
1277
1278
1279
1280static int rtrs_clt_failover_req(struct rtrs_clt *clt,
1281 struct rtrs_clt_io_req *fail_req)
1282{
1283 struct rtrs_clt_sess *alive_sess;
1284 struct rtrs_clt_io_req *req;
1285 int err = -ECONNABORTED;
1286 struct path_it it;
1287
1288 rcu_read_lock();
1289 for (path_it_init(&it, clt);
1290 (alive_sess = it.next_path(&it)) && it.i < it.clt->paths_num;
1291 it.i++) {
1292 if (READ_ONCE(alive_sess->state) != RTRS_CLT_CONNECTED)
1293 continue;
1294 req = rtrs_clt_get_copy_req(alive_sess, fail_req);
1295 if (req->dir == DMA_TO_DEVICE)
1296 err = rtrs_clt_write_req(req);
1297 else
1298 err = rtrs_clt_read_req(req);
1299 if (err) {
1300 req->in_use = false;
1301 continue;
1302 }
1303
1304 rtrs_clt_inc_failover_cnt(alive_sess->stats);
1305 break;
1306 }
1307 path_it_deinit(&it);
1308 rcu_read_unlock();
1309
1310 return err;
1311}
1312
1313static void fail_all_outstanding_reqs(struct rtrs_clt_sess *sess)
1314{
1315 struct rtrs_clt *clt = sess->clt;
1316 struct rtrs_clt_io_req *req;
1317 int i, err;
1318
1319 if (!sess->reqs)
1320 return;
1321 for (i = 0; i < sess->queue_depth; ++i) {
1322 req = &sess->reqs[i];
1323 if (!req->in_use)
1324 continue;
1325
1326
1327
1328
1329
1330
1331 complete_rdma_req(req, -ECONNABORTED, false, true);
1332
1333 err = rtrs_clt_failover_req(clt, req);
1334 if (err)
1335
1336 req->conf(req->priv, err);
1337 }
1338}
1339
1340static void free_sess_reqs(struct rtrs_clt_sess *sess)
1341{
1342 struct rtrs_clt_io_req *req;
1343 int i;
1344
1345 if (!sess->reqs)
1346 return;
1347 for (i = 0; i < sess->queue_depth; ++i) {
1348 req = &sess->reqs[i];
1349 if (req->mr)
1350 ib_dereg_mr(req->mr);
1351 kfree(req->sge);
1352 rtrs_iu_free(req->iu, sess->s.dev->ib_dev, 1);
1353 }
1354 kfree(sess->reqs);
1355 sess->reqs = NULL;
1356}
1357
1358static int alloc_sess_reqs(struct rtrs_clt_sess *sess)
1359{
1360 struct rtrs_clt_io_req *req;
1361 int i, err = -ENOMEM;
1362
1363 sess->reqs = kcalloc(sess->queue_depth, sizeof(*sess->reqs),
1364 GFP_KERNEL);
1365 if (!sess->reqs)
1366 return -ENOMEM;
1367
1368 for (i = 0; i < sess->queue_depth; ++i) {
1369 req = &sess->reqs[i];
1370 req->iu = rtrs_iu_alloc(1, sess->max_hdr_size, GFP_KERNEL,
1371 sess->s.dev->ib_dev,
1372 DMA_TO_DEVICE,
1373 rtrs_clt_rdma_done);
1374 if (!req->iu)
1375 goto out;
1376
1377 req->sge = kcalloc(2, sizeof(*req->sge), GFP_KERNEL);
1378 if (!req->sge)
1379 goto out;
1380
1381 req->mr = ib_alloc_mr(sess->s.dev->ib_pd, IB_MR_TYPE_MEM_REG,
1382 sess->max_pages_per_mr);
1383 if (IS_ERR(req->mr)) {
1384 err = PTR_ERR(req->mr);
1385 req->mr = NULL;
1386 pr_err("Failed to alloc sess->max_pages_per_mr %d\n",
1387 sess->max_pages_per_mr);
1388 goto out;
1389 }
1390
1391 init_completion(&req->inv_comp);
1392 }
1393
1394 return 0;
1395
1396out:
1397 free_sess_reqs(sess);
1398
1399 return err;
1400}
1401
1402static int alloc_permits(struct rtrs_clt *clt)
1403{
1404 unsigned int chunk_bits;
1405 int err, i;
1406
1407 clt->permits_map = kcalloc(BITS_TO_LONGS(clt->queue_depth),
1408 sizeof(long), GFP_KERNEL);
1409 if (!clt->permits_map) {
1410 err = -ENOMEM;
1411 goto out_err;
1412 }
1413 clt->permits = kcalloc(clt->queue_depth, permit_size(clt), GFP_KERNEL);
1414 if (!clt->permits) {
1415 err = -ENOMEM;
1416 goto err_map;
1417 }
1418 chunk_bits = ilog2(clt->queue_depth - 1) + 1;
1419 for (i = 0; i < clt->queue_depth; i++) {
1420 struct rtrs_permit *permit;
1421
1422 permit = get_permit(clt, i);
1423 permit->mem_id = i;
1424 permit->mem_off = i << (MAX_IMM_PAYL_BITS - chunk_bits);
1425 }
1426
1427 return 0;
1428
1429err_map:
1430 kfree(clt->permits_map);
1431 clt->permits_map = NULL;
1432out_err:
1433 return err;
1434}
1435
1436static void free_permits(struct rtrs_clt *clt)
1437{
1438 if (clt->permits_map) {
1439 size_t sz = clt->queue_depth;
1440
1441 wait_event(clt->permits_wait,
1442 find_first_bit(clt->permits_map, sz) >= sz);
1443 }
1444 kfree(clt->permits_map);
1445 clt->permits_map = NULL;
1446 kfree(clt->permits);
1447 clt->permits = NULL;
1448}
1449
1450static void query_fast_reg_mode(struct rtrs_clt_sess *sess)
1451{
1452 struct ib_device *ib_dev;
1453 u64 max_pages_per_mr;
1454 int mr_page_shift;
1455
1456 ib_dev = sess->s.dev->ib_dev;
1457
1458
1459
1460
1461
1462
1463 mr_page_shift = max(12, ffs(ib_dev->attrs.page_size_cap) - 1);
1464 max_pages_per_mr = ib_dev->attrs.max_mr_size;
1465 do_div(max_pages_per_mr, (1ull << mr_page_shift));
1466 sess->max_pages_per_mr =
1467 min3(sess->max_pages_per_mr, (u32)max_pages_per_mr,
1468 ib_dev->attrs.max_fast_reg_page_list_len);
1469 sess->clt->max_segments =
1470 min(sess->max_pages_per_mr, sess->clt->max_segments);
1471}
1472
1473static bool rtrs_clt_change_state_get_old(struct rtrs_clt_sess *sess,
1474 enum rtrs_clt_state new_state,
1475 enum rtrs_clt_state *old_state)
1476{
1477 bool changed;
1478
1479 spin_lock_irq(&sess->state_wq.lock);
1480 if (old_state)
1481 *old_state = sess->state;
1482 changed = rtrs_clt_change_state(sess, new_state);
1483 spin_unlock_irq(&sess->state_wq.lock);
1484
1485 return changed;
1486}
1487
1488static void rtrs_clt_hb_err_handler(struct rtrs_con *c)
1489{
1490 struct rtrs_clt_con *con = container_of(c, typeof(*con), c);
1491
1492 rtrs_rdma_error_recovery(con);
1493}
1494
1495static void rtrs_clt_init_hb(struct rtrs_clt_sess *sess)
1496{
1497 rtrs_init_hb(&sess->s, &io_comp_cqe,
1498 RTRS_HB_INTERVAL_MS,
1499 RTRS_HB_MISSED_MAX,
1500 rtrs_clt_hb_err_handler,
1501 rtrs_wq);
1502}
1503
1504static void rtrs_clt_reconnect_work(struct work_struct *work);
1505static void rtrs_clt_close_work(struct work_struct *work);
1506
1507static struct rtrs_clt_sess *alloc_sess(struct rtrs_clt *clt,
1508 const struct rtrs_addr *path,
1509 size_t con_num, u32 nr_poll_queues)
1510{
1511 struct rtrs_clt_sess *sess;
1512 int err = -ENOMEM;
1513 int cpu;
1514 size_t total_con;
1515
1516 sess = kzalloc(sizeof(*sess), GFP_KERNEL);
1517 if (!sess)
1518 goto err;
1519
1520
1521
1522
1523
1524 total_con = con_num + nr_poll_queues + 1;
1525 sess->s.con = kcalloc(total_con, sizeof(*sess->s.con), GFP_KERNEL);
1526 if (!sess->s.con)
1527 goto err_free_sess;
1528
1529 sess->s.con_num = total_con;
1530 sess->s.irq_con_num = con_num + 1;
1531
1532 sess->stats = kzalloc(sizeof(*sess->stats), GFP_KERNEL);
1533 if (!sess->stats)
1534 goto err_free_con;
1535
1536 mutex_init(&sess->init_mutex);
1537 uuid_gen(&sess->s.uuid);
1538 memcpy(&sess->s.dst_addr, path->dst,
1539 rdma_addr_size((struct sockaddr *)path->dst));
1540
1541
1542
1543
1544
1545
1546 if (path->src)
1547 memcpy(&sess->s.src_addr, path->src,
1548 rdma_addr_size((struct sockaddr *)path->src));
1549 strscpy(sess->s.sessname, clt->sessname, sizeof(sess->s.sessname));
1550 sess->clt = clt;
1551 sess->max_pages_per_mr = RTRS_MAX_SEGMENTS;
1552 init_waitqueue_head(&sess->state_wq);
1553 sess->state = RTRS_CLT_CONNECTING;
1554 atomic_set(&sess->connected_cnt, 0);
1555 INIT_WORK(&sess->close_work, rtrs_clt_close_work);
1556 INIT_DELAYED_WORK(&sess->reconnect_dwork, rtrs_clt_reconnect_work);
1557 rtrs_clt_init_hb(sess);
1558
1559 sess->mp_skip_entry = alloc_percpu(typeof(*sess->mp_skip_entry));
1560 if (!sess->mp_skip_entry)
1561 goto err_free_stats;
1562
1563 for_each_possible_cpu(cpu)
1564 INIT_LIST_HEAD(per_cpu_ptr(sess->mp_skip_entry, cpu));
1565
1566 err = rtrs_clt_init_stats(sess->stats);
1567 if (err)
1568 goto err_free_percpu;
1569
1570 return sess;
1571
1572err_free_percpu:
1573 free_percpu(sess->mp_skip_entry);
1574err_free_stats:
1575 kfree(sess->stats);
1576err_free_con:
1577 kfree(sess->s.con);
1578err_free_sess:
1579 kfree(sess);
1580err:
1581 return ERR_PTR(err);
1582}
1583
1584void free_sess(struct rtrs_clt_sess *sess)
1585{
1586 free_percpu(sess->mp_skip_entry);
1587 mutex_destroy(&sess->init_mutex);
1588 kfree(sess->s.con);
1589 kfree(sess->rbufs);
1590 kfree(sess);
1591}
1592
1593static int create_con(struct rtrs_clt_sess *sess, unsigned int cid)
1594{
1595 struct rtrs_clt_con *con;
1596
1597 con = kzalloc(sizeof(*con), GFP_KERNEL);
1598 if (!con)
1599 return -ENOMEM;
1600
1601
1602 con->cpu = (cid ? cid - 1 : 0) % nr_cpu_ids;
1603 con->c.cid = cid;
1604 con->c.sess = &sess->s;
1605
1606 atomic_set(&con->c.wr_cnt, 1);
1607 mutex_init(&con->con_mutex);
1608
1609 sess->s.con[cid] = &con->c;
1610
1611 return 0;
1612}
1613
1614static void destroy_con(struct rtrs_clt_con *con)
1615{
1616 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1617
1618 sess->s.con[con->c.cid] = NULL;
1619 mutex_destroy(&con->con_mutex);
1620 kfree(con);
1621}
1622
1623static int create_con_cq_qp(struct rtrs_clt_con *con)
1624{
1625 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1626 u32 max_send_wr, max_recv_wr, cq_num, max_send_sge, wr_limit;
1627 int err, cq_vector;
1628 struct rtrs_msg_rkey_rsp *rsp;
1629
1630 lockdep_assert_held(&con->con_mutex);
1631 if (con->c.cid == 0) {
1632 max_send_sge = 1;
1633
1634 if (WARN_ON(sess->s.dev))
1635 return -EINVAL;
1636
1637
1638
1639
1640
1641
1642 sess->s.dev = rtrs_ib_dev_find_or_add(con->c.cm_id->device,
1643 &dev_pd);
1644 if (!sess->s.dev) {
1645 rtrs_wrn(sess->clt,
1646 "rtrs_ib_dev_find_get_or_add(): no memory\n");
1647 return -ENOMEM;
1648 }
1649 sess->s.dev_ref = 1;
1650 query_fast_reg_mode(sess);
1651 wr_limit = sess->s.dev->ib_dev->attrs.max_qp_wr;
1652
1653
1654
1655
1656
1657
1658
1659 max_send_wr =
1660 min_t(int, wr_limit, SERVICE_CON_QUEUE_DEPTH * 2 + 2);
1661 max_recv_wr = max_send_wr;
1662 } else {
1663
1664
1665
1666
1667
1668 if (WARN_ON(!sess->s.dev))
1669 return -EINVAL;
1670 if (WARN_ON(!sess->queue_depth))
1671 return -EINVAL;
1672
1673 wr_limit = sess->s.dev->ib_dev->attrs.max_qp_wr;
1674
1675 sess->s.dev_ref++;
1676 max_send_wr = min_t(int, wr_limit,
1677
1678 sess->queue_depth * 3 + 1);
1679 max_recv_wr = min_t(int, wr_limit,
1680 sess->queue_depth * 3 + 1);
1681 max_send_sge = 2;
1682 }
1683 atomic_set(&con->c.sq_wr_avail, max_send_wr);
1684 cq_num = max_send_wr + max_recv_wr;
1685
1686 if (sess->flags & RTRS_MSG_NEW_RKEY_F || con->c.cid == 0) {
1687 con->rsp_ius = rtrs_iu_alloc(cq_num, sizeof(*rsp),
1688 GFP_KERNEL, sess->s.dev->ib_dev,
1689 DMA_FROM_DEVICE,
1690 rtrs_clt_rdma_done);
1691 if (!con->rsp_ius)
1692 return -ENOMEM;
1693 con->queue_num = cq_num;
1694 }
1695 cq_num = max_send_wr + max_recv_wr;
1696 cq_vector = con->cpu % sess->s.dev->ib_dev->num_comp_vectors;
1697 if (con->c.cid >= sess->s.irq_con_num)
1698 err = rtrs_cq_qp_create(&sess->s, &con->c, max_send_sge,
1699 cq_vector, cq_num, max_send_wr,
1700 max_recv_wr, IB_POLL_DIRECT);
1701 else
1702 err = rtrs_cq_qp_create(&sess->s, &con->c, max_send_sge,
1703 cq_vector, cq_num, max_send_wr,
1704 max_recv_wr, IB_POLL_SOFTIRQ);
1705
1706
1707
1708
1709 return err;
1710}
1711
1712static void destroy_con_cq_qp(struct rtrs_clt_con *con)
1713{
1714 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1715
1716
1717
1718
1719
1720 lockdep_assert_held(&con->con_mutex);
1721 rtrs_cq_qp_destroy(&con->c);
1722 if (con->rsp_ius) {
1723 rtrs_iu_free(con->rsp_ius, sess->s.dev->ib_dev, con->queue_num);
1724 con->rsp_ius = NULL;
1725 con->queue_num = 0;
1726 }
1727 if (sess->s.dev_ref && !--sess->s.dev_ref) {
1728 rtrs_ib_dev_put(sess->s.dev);
1729 sess->s.dev = NULL;
1730 }
1731}
1732
1733static void stop_cm(struct rtrs_clt_con *con)
1734{
1735 rdma_disconnect(con->c.cm_id);
1736 if (con->c.qp)
1737 ib_drain_qp(con->c.qp);
1738}
1739
1740static void destroy_cm(struct rtrs_clt_con *con)
1741{
1742 rdma_destroy_id(con->c.cm_id);
1743 con->c.cm_id = NULL;
1744}
1745
1746static int rtrs_rdma_addr_resolved(struct rtrs_clt_con *con)
1747{
1748 struct rtrs_sess *s = con->c.sess;
1749 int err;
1750
1751 mutex_lock(&con->con_mutex);
1752 err = create_con_cq_qp(con);
1753 mutex_unlock(&con->con_mutex);
1754 if (err) {
1755 rtrs_err(s, "create_con_cq_qp(), err: %d\n", err);
1756 return err;
1757 }
1758 err = rdma_resolve_route(con->c.cm_id, RTRS_CONNECT_TIMEOUT_MS);
1759 if (err)
1760 rtrs_err(s, "Resolving route failed, err: %d\n", err);
1761
1762 return err;
1763}
1764
1765static int rtrs_rdma_route_resolved(struct rtrs_clt_con *con)
1766{
1767 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1768 struct rtrs_clt *clt = sess->clt;
1769 struct rtrs_msg_conn_req msg;
1770 struct rdma_conn_param param;
1771
1772 int err;
1773
1774 param = (struct rdma_conn_param) {
1775 .retry_count = 7,
1776 .rnr_retry_count = 7,
1777 .private_data = &msg,
1778 .private_data_len = sizeof(msg),
1779 };
1780
1781 msg = (struct rtrs_msg_conn_req) {
1782 .magic = cpu_to_le16(RTRS_MAGIC),
1783 .version = cpu_to_le16(RTRS_PROTO_VER),
1784 .cid = cpu_to_le16(con->c.cid),
1785 .cid_num = cpu_to_le16(sess->s.con_num),
1786 .recon_cnt = cpu_to_le16(sess->s.recon_cnt),
1787 };
1788 msg.first_conn = sess->for_new_clt ? FIRST_CONN : 0;
1789 uuid_copy(&msg.sess_uuid, &sess->s.uuid);
1790 uuid_copy(&msg.paths_uuid, &clt->paths_uuid);
1791
1792 err = rdma_connect_locked(con->c.cm_id, ¶m);
1793 if (err)
1794 rtrs_err(clt, "rdma_connect_locked(): %d\n", err);
1795
1796 return err;
1797}
1798
1799static int rtrs_rdma_conn_established(struct rtrs_clt_con *con,
1800 struct rdma_cm_event *ev)
1801{
1802 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1803 struct rtrs_clt *clt = sess->clt;
1804 const struct rtrs_msg_conn_rsp *msg;
1805 u16 version, queue_depth;
1806 int errno;
1807 u8 len;
1808
1809 msg = ev->param.conn.private_data;
1810 len = ev->param.conn.private_data_len;
1811 if (len < sizeof(*msg)) {
1812 rtrs_err(clt, "Invalid RTRS connection response\n");
1813 return -ECONNRESET;
1814 }
1815 if (le16_to_cpu(msg->magic) != RTRS_MAGIC) {
1816 rtrs_err(clt, "Invalid RTRS magic\n");
1817 return -ECONNRESET;
1818 }
1819 version = le16_to_cpu(msg->version);
1820 if (version >> 8 != RTRS_PROTO_VER_MAJOR) {
1821 rtrs_err(clt, "Unsupported major RTRS version: %d, expected %d\n",
1822 version >> 8, RTRS_PROTO_VER_MAJOR);
1823 return -ECONNRESET;
1824 }
1825 errno = le16_to_cpu(msg->errno);
1826 if (errno) {
1827 rtrs_err(clt, "Invalid RTRS message: errno %d\n",
1828 errno);
1829 return -ECONNRESET;
1830 }
1831 if (con->c.cid == 0) {
1832 queue_depth = le16_to_cpu(msg->queue_depth);
1833
1834 if (sess->queue_depth > 0 && queue_depth != sess->queue_depth) {
1835 rtrs_err(clt, "Error: queue depth changed\n");
1836
1837
1838
1839
1840 sess->reconnect_attempts = -1;
1841 rtrs_err(clt,
1842 "Disabling auto-reconnect. Trigger a manual reconnect after issue is resolved\n");
1843 return -ECONNRESET;
1844 }
1845
1846 if (!sess->rbufs) {
1847 sess->rbufs = kcalloc(queue_depth, sizeof(*sess->rbufs),
1848 GFP_KERNEL);
1849 if (!sess->rbufs)
1850 return -ENOMEM;
1851 }
1852 sess->queue_depth = queue_depth;
1853 sess->s.signal_interval = min_not_zero(queue_depth,
1854 (unsigned short) SERVICE_CON_QUEUE_DEPTH);
1855 sess->max_hdr_size = le32_to_cpu(msg->max_hdr_size);
1856 sess->max_io_size = le32_to_cpu(msg->max_io_size);
1857 sess->flags = le32_to_cpu(msg->flags);
1858 sess->chunk_size = sess->max_io_size + sess->max_hdr_size;
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868 mutex_lock(&clt->paths_mutex);
1869 clt->queue_depth = sess->queue_depth;
1870 clt->max_io_size = min_not_zero(sess->max_io_size,
1871 clt->max_io_size);
1872 mutex_unlock(&clt->paths_mutex);
1873
1874
1875
1876
1877 sess->hca_port = con->c.cm_id->port_num;
1878 scnprintf(sess->hca_name, sizeof(sess->hca_name),
1879 sess->s.dev->ib_dev->name);
1880 sess->s.src_addr = con->c.cm_id->route.addr.src_addr;
1881
1882 sess->for_new_clt = 1;
1883 }
1884
1885 return 0;
1886}
1887
1888static inline void flag_success_on_conn(struct rtrs_clt_con *con)
1889{
1890 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
1891
1892 atomic_inc(&sess->connected_cnt);
1893 con->cm_err = 1;
1894}
1895
1896static int rtrs_rdma_conn_rejected(struct rtrs_clt_con *con,
1897 struct rdma_cm_event *ev)
1898{
1899 struct rtrs_sess *s = con->c.sess;
1900 const struct rtrs_msg_conn_rsp *msg;
1901 const char *rej_msg;
1902 int status, errno;
1903 u8 data_len;
1904
1905 status = ev->status;
1906 rej_msg = rdma_reject_msg(con->c.cm_id, status);
1907 msg = rdma_consumer_reject_data(con->c.cm_id, ev, &data_len);
1908
1909 if (msg && data_len >= sizeof(*msg)) {
1910 errno = (int16_t)le16_to_cpu(msg->errno);
1911 if (errno == -EBUSY)
1912 rtrs_err(s,
1913 "Previous session is still exists on the server, please reconnect later\n");
1914 else
1915 rtrs_err(s,
1916 "Connect rejected: status %d (%s), rtrs errno %d\n",
1917 status, rej_msg, errno);
1918 } else {
1919 rtrs_err(s,
1920 "Connect rejected but with malformed message: status %d (%s)\n",
1921 status, rej_msg);
1922 }
1923
1924 return -ECONNRESET;
1925}
1926
1927void rtrs_clt_close_conns(struct rtrs_clt_sess *sess, bool wait)
1928{
1929 if (rtrs_clt_change_state_get_old(sess, RTRS_CLT_CLOSING, NULL))
1930 queue_work(rtrs_wq, &sess->close_work);
1931 if (wait)
1932 flush_work(&sess->close_work);
1933}
1934
1935static inline void flag_error_on_conn(struct rtrs_clt_con *con, int cm_err)
1936{
1937 if (con->cm_err == 1) {
1938 struct rtrs_clt_sess *sess;
1939
1940 sess = to_clt_sess(con->c.sess);
1941 if (atomic_dec_and_test(&sess->connected_cnt))
1942
1943 wake_up(&sess->state_wq);
1944 }
1945 con->cm_err = cm_err;
1946}
1947
1948static int rtrs_clt_rdma_cm_handler(struct rdma_cm_id *cm_id,
1949 struct rdma_cm_event *ev)
1950{
1951 struct rtrs_clt_con *con = cm_id->context;
1952 struct rtrs_sess *s = con->c.sess;
1953 struct rtrs_clt_sess *sess = to_clt_sess(s);
1954 int cm_err = 0;
1955
1956 switch (ev->event) {
1957 case RDMA_CM_EVENT_ADDR_RESOLVED:
1958 cm_err = rtrs_rdma_addr_resolved(con);
1959 break;
1960 case RDMA_CM_EVENT_ROUTE_RESOLVED:
1961 cm_err = rtrs_rdma_route_resolved(con);
1962 break;
1963 case RDMA_CM_EVENT_ESTABLISHED:
1964 cm_err = rtrs_rdma_conn_established(con, ev);
1965 if (!cm_err) {
1966
1967
1968
1969
1970 flag_success_on_conn(con);
1971 wake_up(&sess->state_wq);
1972 return 0;
1973 }
1974 break;
1975 case RDMA_CM_EVENT_REJECTED:
1976 cm_err = rtrs_rdma_conn_rejected(con, ev);
1977 break;
1978 case RDMA_CM_EVENT_DISCONNECTED:
1979
1980 cm_err = -ECONNRESET;
1981 break;
1982 case RDMA_CM_EVENT_CONNECT_ERROR:
1983 case RDMA_CM_EVENT_UNREACHABLE:
1984 case RDMA_CM_EVENT_ADDR_CHANGE:
1985 case RDMA_CM_EVENT_TIMEWAIT_EXIT:
1986 rtrs_wrn(s, "CM error (CM event: %s, err: %d)\n",
1987 rdma_event_msg(ev->event), ev->status);
1988 cm_err = -ECONNRESET;
1989 break;
1990 case RDMA_CM_EVENT_ADDR_ERROR:
1991 case RDMA_CM_EVENT_ROUTE_ERROR:
1992 rtrs_wrn(s, "CM error (CM event: %s, err: %d)\n",
1993 rdma_event_msg(ev->event), ev->status);
1994 cm_err = -EHOSTUNREACH;
1995 break;
1996 case RDMA_CM_EVENT_DEVICE_REMOVAL:
1997
1998
1999
2000 rtrs_clt_close_conns(sess, false);
2001 return 0;
2002 default:
2003 rtrs_err(s, "Unexpected RDMA CM error (CM event: %s, err: %d)\n",
2004 rdma_event_msg(ev->event), ev->status);
2005 cm_err = -ECONNRESET;
2006 break;
2007 }
2008
2009 if (cm_err) {
2010
2011
2012
2013
2014 flag_error_on_conn(con, cm_err);
2015 rtrs_rdma_error_recovery(con);
2016 }
2017
2018 return 0;
2019}
2020
2021static int create_cm(struct rtrs_clt_con *con)
2022{
2023 struct rtrs_sess *s = con->c.sess;
2024 struct rtrs_clt_sess *sess = to_clt_sess(s);
2025 struct rdma_cm_id *cm_id;
2026 int err;
2027
2028 cm_id = rdma_create_id(&init_net, rtrs_clt_rdma_cm_handler, con,
2029 sess->s.dst_addr.ss_family == AF_IB ?
2030 RDMA_PS_IB : RDMA_PS_TCP, IB_QPT_RC);
2031 if (IS_ERR(cm_id)) {
2032 err = PTR_ERR(cm_id);
2033 rtrs_err(s, "Failed to create CM ID, err: %d\n", err);
2034
2035 return err;
2036 }
2037 con->c.cm_id = cm_id;
2038 con->cm_err = 0;
2039
2040 err = rdma_set_reuseaddr(cm_id, 1);
2041 if (err != 0) {
2042 rtrs_err(s, "Set address reuse failed, err: %d\n", err);
2043 goto destroy_cm;
2044 }
2045 err = rdma_resolve_addr(cm_id, (struct sockaddr *)&sess->s.src_addr,
2046 (struct sockaddr *)&sess->s.dst_addr,
2047 RTRS_CONNECT_TIMEOUT_MS);
2048 if (err) {
2049 rtrs_err(s, "Failed to resolve address, err: %d\n", err);
2050 goto destroy_cm;
2051 }
2052
2053
2054
2055
2056
2057 err = wait_event_interruptible_timeout(
2058 sess->state_wq,
2059 con->cm_err || sess->state != RTRS_CLT_CONNECTING,
2060 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS));
2061 if (err == 0 || err == -ERESTARTSYS) {
2062 if (err == 0)
2063 err = -ETIMEDOUT;
2064
2065 goto errr;
2066 }
2067 if (con->cm_err < 0) {
2068 err = con->cm_err;
2069 goto errr;
2070 }
2071 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTING) {
2072
2073 err = -ECONNABORTED;
2074 goto errr;
2075 }
2076
2077 return 0;
2078
2079errr:
2080 stop_cm(con);
2081 mutex_lock(&con->con_mutex);
2082 destroy_con_cq_qp(con);
2083 mutex_unlock(&con->con_mutex);
2084destroy_cm:
2085 destroy_cm(con);
2086
2087 return err;
2088}
2089
2090static void rtrs_clt_sess_up(struct rtrs_clt_sess *sess)
2091{
2092 struct rtrs_clt *clt = sess->clt;
2093 int up;
2094
2095
2096
2097
2098
2099
2100
2101
2102 mutex_lock(&clt->paths_ev_mutex);
2103 up = ++clt->paths_up;
2104
2105
2106
2107
2108
2109 if (up > MAX_PATHS_NUM && up == MAX_PATHS_NUM + clt->paths_num)
2110 clt->paths_up = clt->paths_num;
2111 else if (up == 1)
2112 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_RECONNECTED);
2113 mutex_unlock(&clt->paths_ev_mutex);
2114
2115
2116 sess->established = true;
2117 sess->reconnect_attempts = 0;
2118 sess->stats->reconnects.successful_cnt++;
2119}
2120
2121static void rtrs_clt_sess_down(struct rtrs_clt_sess *sess)
2122{
2123 struct rtrs_clt *clt = sess->clt;
2124
2125 if (!sess->established)
2126 return;
2127
2128 sess->established = false;
2129 mutex_lock(&clt->paths_ev_mutex);
2130 WARN_ON(!clt->paths_up);
2131 if (--clt->paths_up == 0)
2132 clt->link_ev(clt->priv, RTRS_CLT_LINK_EV_DISCONNECTED);
2133 mutex_unlock(&clt->paths_ev_mutex);
2134}
2135
2136static void rtrs_clt_stop_and_destroy_conns(struct rtrs_clt_sess *sess)
2137{
2138 struct rtrs_clt_con *con;
2139 unsigned int cid;
2140
2141 WARN_ON(READ_ONCE(sess->state) == RTRS_CLT_CONNECTED);
2142
2143
2144
2145
2146
2147 mutex_lock(&sess->init_mutex);
2148 mutex_unlock(&sess->init_mutex);
2149
2150
2151
2152
2153
2154 synchronize_rcu();
2155
2156 rtrs_stop_hb(&sess->s);
2157
2158
2159
2160
2161
2162
2163
2164
2165 for (cid = 0; cid < sess->s.con_num; cid++) {
2166 if (!sess->s.con[cid])
2167 break;
2168 con = to_clt_con(sess->s.con[cid]);
2169 stop_cm(con);
2170 }
2171 fail_all_outstanding_reqs(sess);
2172 free_sess_reqs(sess);
2173 rtrs_clt_sess_down(sess);
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183 wait_event_timeout(sess->state_wq, !atomic_read(&sess->connected_cnt),
2184 msecs_to_jiffies(RTRS_CONNECT_TIMEOUT_MS));
2185
2186 for (cid = 0; cid < sess->s.con_num; cid++) {
2187 if (!sess->s.con[cid])
2188 break;
2189 con = to_clt_con(sess->s.con[cid]);
2190 mutex_lock(&con->con_mutex);
2191 destroy_con_cq_qp(con);
2192 mutex_unlock(&con->con_mutex);
2193 destroy_cm(con);
2194 destroy_con(con);
2195 }
2196}
2197
2198static inline bool xchg_sessions(struct rtrs_clt_sess __rcu **rcu_ppcpu_path,
2199 struct rtrs_clt_sess *sess,
2200 struct rtrs_clt_sess *next)
2201{
2202 struct rtrs_clt_sess **ppcpu_path;
2203
2204
2205 ppcpu_path = (typeof(ppcpu_path))rcu_ppcpu_path;
2206 return sess == cmpxchg(ppcpu_path, sess, next);
2207}
2208
2209static void rtrs_clt_remove_path_from_arr(struct rtrs_clt_sess *sess)
2210{
2211 struct rtrs_clt *clt = sess->clt;
2212 struct rtrs_clt_sess *next;
2213 bool wait_for_grace = false;
2214 int cpu;
2215
2216 mutex_lock(&clt->paths_mutex);
2217 list_del_rcu(&sess->s.entry);
2218
2219
2220 synchronize_rcu();
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251 clt->paths_num--;
2252
2253
2254
2255
2256
2257 rcu_read_lock();
2258 next = list_next_or_null_rr_rcu(&clt->paths_list, &sess->s.entry,
2259 typeof(*next), s.entry);
2260 rcu_read_unlock();
2261
2262
2263
2264
2265
2266 for_each_possible_cpu(cpu) {
2267 struct rtrs_clt_sess __rcu **ppcpu_path;
2268
2269 ppcpu_path = per_cpu_ptr(clt->pcpu_path, cpu);
2270 if (rcu_dereference_protected(*ppcpu_path,
2271 lockdep_is_held(&clt->paths_mutex)) != sess)
2272
2273
2274
2275
2276
2277
2278 continue;
2279
2280
2281
2282
2283
2284 if (xchg_sessions(ppcpu_path, sess, next))
2285
2286
2287
2288
2289
2290
2291 wait_for_grace = true;
2292 }
2293 if (wait_for_grace)
2294 synchronize_rcu();
2295
2296 mutex_unlock(&clt->paths_mutex);
2297}
2298
2299static void rtrs_clt_add_path_to_arr(struct rtrs_clt_sess *sess)
2300{
2301 struct rtrs_clt *clt = sess->clt;
2302
2303 mutex_lock(&clt->paths_mutex);
2304 clt->paths_num++;
2305
2306 list_add_tail_rcu(&sess->s.entry, &clt->paths_list);
2307 mutex_unlock(&clt->paths_mutex);
2308}
2309
2310static void rtrs_clt_close_work(struct work_struct *work)
2311{
2312 struct rtrs_clt_sess *sess;
2313
2314 sess = container_of(work, struct rtrs_clt_sess, close_work);
2315
2316 cancel_delayed_work_sync(&sess->reconnect_dwork);
2317 rtrs_clt_stop_and_destroy_conns(sess);
2318 rtrs_clt_change_state_get_old(sess, RTRS_CLT_CLOSED, NULL);
2319}
2320
2321static int init_conns(struct rtrs_clt_sess *sess)
2322{
2323 unsigned int cid;
2324 int err;
2325
2326
2327
2328
2329
2330
2331 sess->s.recon_cnt++;
2332
2333
2334 for (cid = 0; cid < sess->s.con_num; cid++) {
2335 err = create_con(sess, cid);
2336 if (err)
2337 goto destroy;
2338
2339 err = create_cm(to_clt_con(sess->s.con[cid]));
2340 if (err) {
2341 destroy_con(to_clt_con(sess->s.con[cid]));
2342 goto destroy;
2343 }
2344 }
2345 err = alloc_sess_reqs(sess);
2346 if (err)
2347 goto destroy;
2348
2349 rtrs_start_hb(&sess->s);
2350
2351 return 0;
2352
2353destroy:
2354 while (cid--) {
2355 struct rtrs_clt_con *con = to_clt_con(sess->s.con[cid]);
2356
2357 stop_cm(con);
2358
2359 mutex_lock(&con->con_mutex);
2360 destroy_con_cq_qp(con);
2361 mutex_unlock(&con->con_mutex);
2362 destroy_cm(con);
2363 destroy_con(con);
2364 }
2365
2366
2367
2368
2369
2370 rtrs_clt_change_state_get_old(sess, RTRS_CLT_CONNECTING_ERR, NULL);
2371
2372 return err;
2373}
2374
2375static void rtrs_clt_info_req_done(struct ib_cq *cq, struct ib_wc *wc)
2376{
2377 struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
2378 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
2379 struct rtrs_iu *iu;
2380
2381 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
2382 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1);
2383
2384 if (wc->status != IB_WC_SUCCESS) {
2385 rtrs_err(sess->clt, "Sess info request send failed: %s\n",
2386 ib_wc_status_msg(wc->status));
2387 rtrs_clt_change_state_get_old(sess, RTRS_CLT_CONNECTING_ERR, NULL);
2388 return;
2389 }
2390
2391 rtrs_clt_update_wc_stats(con);
2392}
2393
2394static int process_info_rsp(struct rtrs_clt_sess *sess,
2395 const struct rtrs_msg_info_rsp *msg)
2396{
2397 unsigned int sg_cnt, total_len;
2398 int i, sgi;
2399
2400 sg_cnt = le16_to_cpu(msg->sg_cnt);
2401 if (!sg_cnt || (sess->queue_depth % sg_cnt)) {
2402 rtrs_err(sess->clt, "Incorrect sg_cnt %d, is not multiple\n",
2403 sg_cnt);
2404 return -EINVAL;
2405 }
2406
2407
2408
2409
2410
2411 if ((ilog2(sg_cnt - 1) + 1) + (ilog2(sess->chunk_size - 1) + 1) >
2412 MAX_IMM_PAYL_BITS) {
2413 rtrs_err(sess->clt,
2414 "RDMA immediate size (%db) not enough to encode %d buffers of size %dB\n",
2415 MAX_IMM_PAYL_BITS, sg_cnt, sess->chunk_size);
2416 return -EINVAL;
2417 }
2418 total_len = 0;
2419 for (sgi = 0, i = 0; sgi < sg_cnt && i < sess->queue_depth; sgi++) {
2420 const struct rtrs_sg_desc *desc = &msg->desc[sgi];
2421 u32 len, rkey;
2422 u64 addr;
2423
2424 addr = le64_to_cpu(desc->addr);
2425 rkey = le32_to_cpu(desc->key);
2426 len = le32_to_cpu(desc->len);
2427
2428 total_len += len;
2429
2430 if (!len || (len % sess->chunk_size)) {
2431 rtrs_err(sess->clt, "Incorrect [%d].len %d\n", sgi,
2432 len);
2433 return -EINVAL;
2434 }
2435 for ( ; len && i < sess->queue_depth; i++) {
2436 sess->rbufs[i].addr = addr;
2437 sess->rbufs[i].rkey = rkey;
2438
2439 len -= sess->chunk_size;
2440 addr += sess->chunk_size;
2441 }
2442 }
2443
2444 if (sgi != sg_cnt || i != sess->queue_depth) {
2445 rtrs_err(sess->clt, "Incorrect sg vector, not fully mapped\n");
2446 return -EINVAL;
2447 }
2448 if (total_len != sess->chunk_size * sess->queue_depth) {
2449 rtrs_err(sess->clt, "Incorrect total_len %d\n", total_len);
2450 return -EINVAL;
2451 }
2452
2453 return 0;
2454}
2455
2456static void rtrs_clt_info_rsp_done(struct ib_cq *cq, struct ib_wc *wc)
2457{
2458 struct rtrs_clt_con *con = to_clt_con(wc->qp->qp_context);
2459 struct rtrs_clt_sess *sess = to_clt_sess(con->c.sess);
2460 struct rtrs_msg_info_rsp *msg;
2461 enum rtrs_clt_state state;
2462 struct rtrs_iu *iu;
2463 size_t rx_sz;
2464 int err;
2465
2466 state = RTRS_CLT_CONNECTING_ERR;
2467
2468 WARN_ON(con->c.cid);
2469 iu = container_of(wc->wr_cqe, struct rtrs_iu, cqe);
2470 if (wc->status != IB_WC_SUCCESS) {
2471 rtrs_err(sess->clt, "Sess info response recv failed: %s\n",
2472 ib_wc_status_msg(wc->status));
2473 goto out;
2474 }
2475 WARN_ON(wc->opcode != IB_WC_RECV);
2476
2477 if (wc->byte_len < sizeof(*msg)) {
2478 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n",
2479 wc->byte_len);
2480 goto out;
2481 }
2482 ib_dma_sync_single_for_cpu(sess->s.dev->ib_dev, iu->dma_addr,
2483 iu->size, DMA_FROM_DEVICE);
2484 msg = iu->buf;
2485 if (le16_to_cpu(msg->type) != RTRS_MSG_INFO_RSP) {
2486 rtrs_err(sess->clt, "Sess info response is malformed: type %d\n",
2487 le16_to_cpu(msg->type));
2488 goto out;
2489 }
2490 rx_sz = sizeof(*msg);
2491 rx_sz += sizeof(msg->desc[0]) * le16_to_cpu(msg->sg_cnt);
2492 if (wc->byte_len < rx_sz) {
2493 rtrs_err(sess->clt, "Sess info response is malformed: size %d\n",
2494 wc->byte_len);
2495 goto out;
2496 }
2497 err = process_info_rsp(sess, msg);
2498 if (err)
2499 goto out;
2500
2501 err = post_recv_sess(sess);
2502 if (err)
2503 goto out;
2504
2505 state = RTRS_CLT_CONNECTED;
2506
2507out:
2508 rtrs_clt_update_wc_stats(con);
2509 rtrs_iu_free(iu, sess->s.dev->ib_dev, 1);
2510 rtrs_clt_change_state_get_old(sess, state, NULL);
2511}
2512
2513static int rtrs_send_sess_info(struct rtrs_clt_sess *sess)
2514{
2515 struct rtrs_clt_con *usr_con = to_clt_con(sess->s.con[0]);
2516 struct rtrs_msg_info_req *msg;
2517 struct rtrs_iu *tx_iu, *rx_iu;
2518 size_t rx_sz;
2519 int err;
2520
2521 rx_sz = sizeof(struct rtrs_msg_info_rsp);
2522 rx_sz += sizeof(struct rtrs_sg_desc) * sess->queue_depth;
2523
2524 tx_iu = rtrs_iu_alloc(1, sizeof(struct rtrs_msg_info_req), GFP_KERNEL,
2525 sess->s.dev->ib_dev, DMA_TO_DEVICE,
2526 rtrs_clt_info_req_done);
2527 rx_iu = rtrs_iu_alloc(1, rx_sz, GFP_KERNEL, sess->s.dev->ib_dev,
2528 DMA_FROM_DEVICE, rtrs_clt_info_rsp_done);
2529 if (!tx_iu || !rx_iu) {
2530 err = -ENOMEM;
2531 goto out;
2532 }
2533
2534 err = rtrs_iu_post_recv(&usr_con->c, rx_iu);
2535 if (err) {
2536 rtrs_err(sess->clt, "rtrs_iu_post_recv(), err: %d\n", err);
2537 goto out;
2538 }
2539 rx_iu = NULL;
2540
2541 msg = tx_iu->buf;
2542 msg->type = cpu_to_le16(RTRS_MSG_INFO_REQ);
2543 memcpy(msg->sessname, sess->s.sessname, sizeof(msg->sessname));
2544
2545 ib_dma_sync_single_for_device(sess->s.dev->ib_dev, tx_iu->dma_addr,
2546 tx_iu->size, DMA_TO_DEVICE);
2547
2548
2549 err = rtrs_iu_post_send(&usr_con->c, tx_iu, sizeof(*msg), NULL);
2550 if (err) {
2551 rtrs_err(sess->clt, "rtrs_iu_post_send(), err: %d\n", err);
2552 goto out;
2553 }
2554 tx_iu = NULL;
2555
2556
2557 wait_event_interruptible_timeout(sess->state_wq,
2558 sess->state != RTRS_CLT_CONNECTING,
2559 msecs_to_jiffies(
2560 RTRS_CONNECT_TIMEOUT_MS));
2561 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTED) {
2562 if (READ_ONCE(sess->state) == RTRS_CLT_CONNECTING_ERR)
2563 err = -ECONNRESET;
2564 else
2565 err = -ETIMEDOUT;
2566 }
2567
2568out:
2569 if (tx_iu)
2570 rtrs_iu_free(tx_iu, sess->s.dev->ib_dev, 1);
2571 if (rx_iu)
2572 rtrs_iu_free(rx_iu, sess->s.dev->ib_dev, 1);
2573 if (err)
2574
2575 rtrs_clt_change_state_get_old(sess, RTRS_CLT_CONNECTING_ERR, NULL);
2576
2577 return err;
2578}
2579
2580
2581
2582
2583
2584
2585
2586static int init_sess(struct rtrs_clt_sess *sess)
2587{
2588 int err;
2589 char str[NAME_MAX];
2590 struct rtrs_addr path = {
2591 .src = &sess->s.src_addr,
2592 .dst = &sess->s.dst_addr,
2593 };
2594
2595 rtrs_addr_to_str(&path, str, sizeof(str));
2596
2597 mutex_lock(&sess->init_mutex);
2598 err = init_conns(sess);
2599 if (err) {
2600 rtrs_err(sess->clt,
2601 "init_conns() failed: err=%d path=%s [%s:%u]\n", err,
2602 str, sess->hca_name, sess->hca_port);
2603 goto out;
2604 }
2605 err = rtrs_send_sess_info(sess);
2606 if (err) {
2607 rtrs_err(
2608 sess->clt,
2609 "rtrs_send_sess_info() failed: err=%d path=%s [%s:%u]\n",
2610 err, str, sess->hca_name, sess->hca_port);
2611 goto out;
2612 }
2613 rtrs_clt_sess_up(sess);
2614out:
2615 mutex_unlock(&sess->init_mutex);
2616
2617 return err;
2618}
2619
2620static void rtrs_clt_reconnect_work(struct work_struct *work)
2621{
2622 struct rtrs_clt_sess *sess;
2623 struct rtrs_clt *clt;
2624 unsigned int delay_ms;
2625 int err;
2626
2627 sess = container_of(to_delayed_work(work), struct rtrs_clt_sess,
2628 reconnect_dwork);
2629 clt = sess->clt;
2630
2631 if (READ_ONCE(sess->state) != RTRS_CLT_RECONNECTING)
2632 return;
2633
2634 if (sess->reconnect_attempts >= clt->max_reconnect_attempts) {
2635
2636 rtrs_clt_close_conns(sess, false);
2637 return;
2638 }
2639 sess->reconnect_attempts++;
2640
2641
2642 rtrs_clt_stop_and_destroy_conns(sess);
2643 msleep(RTRS_RECONNECT_BACKOFF);
2644 if (rtrs_clt_change_state_get_old(sess, RTRS_CLT_CONNECTING, NULL)) {
2645 err = init_sess(sess);
2646 if (err)
2647 goto reconnect_again;
2648 }
2649
2650 return;
2651
2652reconnect_again:
2653 if (rtrs_clt_change_state_get_old(sess, RTRS_CLT_RECONNECTING, NULL)) {
2654 sess->stats->reconnects.fail_cnt++;
2655 delay_ms = clt->reconnect_delay_sec * 1000;
2656 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork,
2657 msecs_to_jiffies(delay_ms +
2658 prandom_u32() %
2659 RTRS_RECONNECT_SEED));
2660 }
2661}
2662
2663static void rtrs_clt_dev_release(struct device *dev)
2664{
2665 struct rtrs_clt *clt = container_of(dev, struct rtrs_clt, dev);
2666
2667 kfree(clt);
2668}
2669
2670static struct rtrs_clt *alloc_clt(const char *sessname, size_t paths_num,
2671 u16 port, size_t pdu_sz, void *priv,
2672 void (*link_ev)(void *priv,
2673 enum rtrs_clt_link_ev ev),
2674 unsigned int reconnect_delay_sec,
2675 unsigned int max_reconnect_attempts)
2676{
2677 struct rtrs_clt *clt;
2678 int err;
2679
2680 if (!paths_num || paths_num > MAX_PATHS_NUM)
2681 return ERR_PTR(-EINVAL);
2682
2683 if (strlen(sessname) >= sizeof(clt->sessname))
2684 return ERR_PTR(-EINVAL);
2685
2686 clt = kzalloc(sizeof(*clt), GFP_KERNEL);
2687 if (!clt)
2688 return ERR_PTR(-ENOMEM);
2689
2690 clt->pcpu_path = alloc_percpu(typeof(*clt->pcpu_path));
2691 if (!clt->pcpu_path) {
2692 kfree(clt);
2693 return ERR_PTR(-ENOMEM);
2694 }
2695
2696 uuid_gen(&clt->paths_uuid);
2697 INIT_LIST_HEAD_RCU(&clt->paths_list);
2698 clt->paths_num = paths_num;
2699 clt->paths_up = MAX_PATHS_NUM;
2700 clt->port = port;
2701 clt->pdu_sz = pdu_sz;
2702 clt->max_segments = RTRS_MAX_SEGMENTS;
2703 clt->reconnect_delay_sec = reconnect_delay_sec;
2704 clt->max_reconnect_attempts = max_reconnect_attempts;
2705 clt->priv = priv;
2706 clt->link_ev = link_ev;
2707 clt->mp_policy = MP_POLICY_MIN_INFLIGHT;
2708 strscpy(clt->sessname, sessname, sizeof(clt->sessname));
2709 init_waitqueue_head(&clt->permits_wait);
2710 mutex_init(&clt->paths_ev_mutex);
2711 mutex_init(&clt->paths_mutex);
2712
2713 clt->dev.class = rtrs_clt_dev_class;
2714 clt->dev.release = rtrs_clt_dev_release;
2715 err = dev_set_name(&clt->dev, "%s", sessname);
2716 if (err)
2717 goto err;
2718
2719
2720
2721
2722 dev_set_uevent_suppress(&clt->dev, true);
2723 err = device_register(&clt->dev);
2724 if (err) {
2725 put_device(&clt->dev);
2726 goto err;
2727 }
2728
2729 clt->kobj_paths = kobject_create_and_add("paths", &clt->dev.kobj);
2730 if (!clt->kobj_paths) {
2731 err = -ENOMEM;
2732 goto err_dev;
2733 }
2734 err = rtrs_clt_create_sysfs_root_files(clt);
2735 if (err) {
2736 kobject_del(clt->kobj_paths);
2737 kobject_put(clt->kobj_paths);
2738 goto err_dev;
2739 }
2740 dev_set_uevent_suppress(&clt->dev, false);
2741 kobject_uevent(&clt->dev.kobj, KOBJ_ADD);
2742
2743 return clt;
2744err_dev:
2745 device_unregister(&clt->dev);
2746err:
2747 free_percpu(clt->pcpu_path);
2748 kfree(clt);
2749 return ERR_PTR(err);
2750}
2751
2752static void free_clt(struct rtrs_clt *clt)
2753{
2754 free_permits(clt);
2755 free_percpu(clt->pcpu_path);
2756 mutex_destroy(&clt->paths_ev_mutex);
2757 mutex_destroy(&clt->paths_mutex);
2758
2759 device_unregister(&clt->dev);
2760}
2761
2762
2763
2764
2765
2766
2767
2768
2769
2770
2771
2772
2773
2774
2775
2776
2777
2778
2779
2780struct rtrs_clt *rtrs_clt_open(struct rtrs_clt_ops *ops,
2781 const char *sessname,
2782 const struct rtrs_addr *paths,
2783 size_t paths_num, u16 port,
2784 size_t pdu_sz, u8 reconnect_delay_sec,
2785 s16 max_reconnect_attempts, u32 nr_poll_queues)
2786{
2787 struct rtrs_clt_sess *sess, *tmp;
2788 struct rtrs_clt *clt;
2789 int err, i;
2790
2791 clt = alloc_clt(sessname, paths_num, port, pdu_sz, ops->priv,
2792 ops->link_ev,
2793 reconnect_delay_sec,
2794 max_reconnect_attempts);
2795 if (IS_ERR(clt)) {
2796 err = PTR_ERR(clt);
2797 goto out;
2798 }
2799 for (i = 0; i < paths_num; i++) {
2800 struct rtrs_clt_sess *sess;
2801
2802 sess = alloc_sess(clt, &paths[i], nr_cpu_ids,
2803 nr_poll_queues);
2804 if (IS_ERR(sess)) {
2805 err = PTR_ERR(sess);
2806 goto close_all_sess;
2807 }
2808 if (!i)
2809 sess->for_new_clt = 1;
2810 list_add_tail_rcu(&sess->s.entry, &clt->paths_list);
2811
2812 err = init_sess(sess);
2813 if (err) {
2814 list_del_rcu(&sess->s.entry);
2815 rtrs_clt_close_conns(sess, true);
2816 free_percpu(sess->stats->pcpu_stats);
2817 kfree(sess->stats);
2818 free_sess(sess);
2819 goto close_all_sess;
2820 }
2821
2822 err = rtrs_clt_create_sess_files(sess);
2823 if (err) {
2824 list_del_rcu(&sess->s.entry);
2825 rtrs_clt_close_conns(sess, true);
2826 free_percpu(sess->stats->pcpu_stats);
2827 kfree(sess->stats);
2828 free_sess(sess);
2829 goto close_all_sess;
2830 }
2831 }
2832 err = alloc_permits(clt);
2833 if (err)
2834 goto close_all_sess;
2835
2836 return clt;
2837
2838close_all_sess:
2839 list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) {
2840 rtrs_clt_destroy_sess_files(sess, NULL);
2841 rtrs_clt_close_conns(sess, true);
2842 kobject_put(&sess->kobj);
2843 }
2844 rtrs_clt_destroy_sysfs_root(clt);
2845 free_clt(clt);
2846
2847out:
2848 return ERR_PTR(err);
2849}
2850EXPORT_SYMBOL(rtrs_clt_open);
2851
2852
2853
2854
2855
2856void rtrs_clt_close(struct rtrs_clt *clt)
2857{
2858 struct rtrs_clt_sess *sess, *tmp;
2859
2860
2861 rtrs_clt_destroy_sysfs_root(clt);
2862
2863
2864 list_for_each_entry_safe(sess, tmp, &clt->paths_list, s.entry) {
2865 rtrs_clt_close_conns(sess, true);
2866 rtrs_clt_destroy_sess_files(sess, NULL);
2867 kobject_put(&sess->kobj);
2868 }
2869 free_clt(clt);
2870}
2871EXPORT_SYMBOL(rtrs_clt_close);
2872
2873int rtrs_clt_reconnect_from_sysfs(struct rtrs_clt_sess *sess)
2874{
2875 enum rtrs_clt_state old_state;
2876 int err = -EBUSY;
2877 bool changed;
2878
2879 changed = rtrs_clt_change_state_get_old(sess, RTRS_CLT_RECONNECTING,
2880 &old_state);
2881 if (changed) {
2882 sess->reconnect_attempts = 0;
2883 queue_delayed_work(rtrs_wq, &sess->reconnect_dwork, 0);
2884 }
2885 if (changed || old_state == RTRS_CLT_RECONNECTING) {
2886
2887
2888
2889
2890
2891 flush_delayed_work(&sess->reconnect_dwork);
2892 err = (READ_ONCE(sess->state) ==
2893 RTRS_CLT_CONNECTED ? 0 : -ENOTCONN);
2894 }
2895
2896 return err;
2897}
2898
2899int rtrs_clt_remove_path_from_sysfs(struct rtrs_clt_sess *sess,
2900 const struct attribute *sysfs_self)
2901{
2902 enum rtrs_clt_state old_state;
2903 bool changed;
2904
2905
2906
2907
2908
2909
2910
2911
2912
2913
2914 do {
2915 rtrs_clt_close_conns(sess, true);
2916 changed = rtrs_clt_change_state_get_old(sess,
2917 RTRS_CLT_DEAD,
2918 &old_state);
2919 } while (!changed && old_state != RTRS_CLT_DEAD);
2920
2921 if (changed) {
2922 rtrs_clt_remove_path_from_arr(sess);
2923 rtrs_clt_destroy_sess_files(sess, sysfs_self);
2924 kobject_put(&sess->kobj);
2925 }
2926
2927 return 0;
2928}
2929
2930void rtrs_clt_set_max_reconnect_attempts(struct rtrs_clt *clt, int value)
2931{
2932 clt->max_reconnect_attempts = (unsigned int)value;
2933}
2934
2935int rtrs_clt_get_max_reconnect_attempts(const struct rtrs_clt *clt)
2936{
2937 return (int)clt->max_reconnect_attempts;
2938}
2939
2940
2941
2942
2943
2944
2945
2946
2947
2948
2949
2950
2951
2952
2953
2954
2955
2956
2957
2958
2959
2960
2961
2962
2963
2964int rtrs_clt_request(int dir, struct rtrs_clt_req_ops *ops,
2965 struct rtrs_clt *clt, struct rtrs_permit *permit,
2966 const struct kvec *vec, size_t nr, size_t data_len,
2967 struct scatterlist *sg, unsigned int sg_cnt)
2968{
2969 struct rtrs_clt_io_req *req;
2970 struct rtrs_clt_sess *sess;
2971
2972 enum dma_data_direction dma_dir;
2973 int err = -ECONNABORTED, i;
2974 size_t usr_len, hdr_len;
2975 struct path_it it;
2976
2977
2978 for (i = 0, usr_len = 0; i < nr; i++)
2979 usr_len += vec[i].iov_len;
2980
2981 if (dir == READ) {
2982 hdr_len = sizeof(struct rtrs_msg_rdma_read) +
2983 sg_cnt * sizeof(struct rtrs_sg_desc);
2984 dma_dir = DMA_FROM_DEVICE;
2985 } else {
2986 hdr_len = sizeof(struct rtrs_msg_rdma_write);
2987 dma_dir = DMA_TO_DEVICE;
2988 }
2989
2990 rcu_read_lock();
2991 for (path_it_init(&it, clt);
2992 (sess = it.next_path(&it)) && it.i < it.clt->paths_num; it.i++) {
2993 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)
2994 continue;
2995
2996 if (usr_len + hdr_len > sess->max_hdr_size) {
2997 rtrs_wrn_rl(sess->clt,
2998 "%s request failed, user message size is %zu and header length %zu, but max size is %u\n",
2999 dir == READ ? "Read" : "Write",
3000 usr_len, hdr_len, sess->max_hdr_size);
3001 err = -EMSGSIZE;
3002 break;
3003 }
3004 req = rtrs_clt_get_req(sess, ops->conf_fn, permit, ops->priv,
3005 vec, usr_len, sg, sg_cnt, data_len,
3006 dma_dir);
3007 if (dir == READ)
3008 err = rtrs_clt_read_req(req);
3009 else
3010 err = rtrs_clt_write_req(req);
3011 if (err) {
3012 req->in_use = false;
3013 continue;
3014 }
3015
3016 break;
3017 }
3018 path_it_deinit(&it);
3019 rcu_read_unlock();
3020
3021 return err;
3022}
3023EXPORT_SYMBOL(rtrs_clt_request);
3024
3025int rtrs_clt_rdma_cq_direct(struct rtrs_clt *clt, unsigned int index)
3026{
3027
3028 int cnt = -1;
3029 struct rtrs_con *con;
3030 struct rtrs_clt_sess *sess;
3031 struct path_it it;
3032
3033 rcu_read_lock();
3034 for (path_it_init(&it, clt);
3035 (sess = it.next_path(&it)) && it.i < it.clt->paths_num; it.i++) {
3036 if (READ_ONCE(sess->state) != RTRS_CLT_CONNECTED)
3037 continue;
3038
3039 con = sess->s.con[index + 1];
3040 cnt = ib_process_cq_direct(con->cq, -1);
3041 if (cnt)
3042 break;
3043 }
3044 path_it_deinit(&it);
3045 rcu_read_unlock();
3046
3047 return cnt;
3048}
3049EXPORT_SYMBOL(rtrs_clt_rdma_cq_direct);
3050
3051
3052
3053
3054
3055
3056
3057
3058
3059int rtrs_clt_query(struct rtrs_clt *clt, struct rtrs_attrs *attr)
3060{
3061 if (!rtrs_clt_is_connected(clt))
3062 return -ECOMM;
3063
3064 attr->queue_depth = clt->queue_depth;
3065 attr->max_segments = clt->max_segments;
3066
3067 attr->max_io_size = min_t(int, clt->max_io_size,
3068 clt->max_segments * SZ_4K);
3069
3070 return 0;
3071}
3072EXPORT_SYMBOL(rtrs_clt_query);
3073
3074int rtrs_clt_create_path_from_sysfs(struct rtrs_clt *clt,
3075 struct rtrs_addr *addr)
3076{
3077 struct rtrs_clt_sess *sess;
3078 int err;
3079
3080 sess = alloc_sess(clt, addr, nr_cpu_ids, 0);
3081 if (IS_ERR(sess))
3082 return PTR_ERR(sess);
3083
3084 mutex_lock(&clt->paths_mutex);
3085 if (clt->paths_num == 0) {
3086
3087
3088
3089
3090
3091 sess->for_new_clt = 1;
3092 }
3093
3094 mutex_unlock(&clt->paths_mutex);
3095
3096
3097
3098
3099
3100
3101 rtrs_clt_add_path_to_arr(sess);
3102
3103 err = init_sess(sess);
3104 if (err)
3105 goto close_sess;
3106
3107 err = rtrs_clt_create_sess_files(sess);
3108 if (err)
3109 goto close_sess;
3110
3111 return 0;
3112
3113close_sess:
3114 rtrs_clt_remove_path_from_arr(sess);
3115 rtrs_clt_close_conns(sess, true);
3116 free_percpu(sess->stats->pcpu_stats);
3117 kfree(sess->stats);
3118 free_sess(sess);
3119
3120 return err;
3121}
3122
3123static int rtrs_clt_ib_dev_init(struct rtrs_ib_dev *dev)
3124{
3125 if (!(dev->ib_dev->attrs.device_cap_flags &
3126 IB_DEVICE_MEM_MGT_EXTENSIONS)) {
3127 pr_err("Memory registrations not supported.\n");
3128 return -ENOTSUPP;
3129 }
3130
3131 return 0;
3132}
3133
3134static const struct rtrs_rdma_dev_pd_ops dev_pd_ops = {
3135 .init = rtrs_clt_ib_dev_init
3136};
3137
3138static int __init rtrs_client_init(void)
3139{
3140 rtrs_rdma_dev_pd_init(0, &dev_pd);
3141
3142 rtrs_clt_dev_class = class_create(THIS_MODULE, "rtrs-client");
3143 if (IS_ERR(rtrs_clt_dev_class)) {
3144 pr_err("Failed to create rtrs-client dev class\n");
3145 return PTR_ERR(rtrs_clt_dev_class);
3146 }
3147 rtrs_wq = alloc_workqueue("rtrs_client_wq", 0, 0);
3148 if (!rtrs_wq) {
3149 class_destroy(rtrs_clt_dev_class);
3150 return -ENOMEM;
3151 }
3152
3153 return 0;
3154}
3155
3156static void __exit rtrs_client_exit(void)
3157{
3158 destroy_workqueue(rtrs_wq);
3159 class_destroy(rtrs_clt_dev_class);
3160 rtrs_rdma_dev_pd_deinit(&dev_pd);
3161}
3162
3163module_init(rtrs_client_init);
3164module_exit(rtrs_client_exit);
3165