1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33#include <linux/kernel.h>
34#include <linux/moduleparam.h>
35#include <linux/gfp.h>
36#include <net/sock.h>
37#include <linux/in.h>
38#include <linux/list.h>
39#include <linux/ratelimit.h>
40#include <linux/export.h>
41#include <linux/sizes.h>
42
43#include "rds.h"
44
45
46
47
48
49
50
51
52
53
54
55static int send_batch_count = SZ_1K;
56module_param(send_batch_count, int, 0444);
57MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
58
59static void rds_send_remove_from_sock(struct list_head *messages, int status);
60
61
62
63
64
65void rds_send_path_reset(struct rds_conn_path *cp)
66{
67 struct rds_message *rm, *tmp;
68 unsigned long flags;
69
70 if (cp->cp_xmit_rm) {
71 rm = cp->cp_xmit_rm;
72 cp->cp_xmit_rm = NULL;
73
74
75
76
77 rds_message_unmapped(rm);
78 rds_message_put(rm);
79 }
80
81 cp->cp_xmit_sg = 0;
82 cp->cp_xmit_hdr_off = 0;
83 cp->cp_xmit_data_off = 0;
84 cp->cp_xmit_atomic_sent = 0;
85 cp->cp_xmit_rdma_sent = 0;
86 cp->cp_xmit_data_sent = 0;
87
88 cp->cp_conn->c_map_queued = 0;
89
90 cp->cp_unacked_packets = rds_sysctl_max_unacked_packets;
91 cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes;
92
93
94 spin_lock_irqsave(&cp->cp_lock, flags);
95 list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
96 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
97 set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
98 }
99 list_splice_init(&cp->cp_retrans, &cp->cp_send_queue);
100 spin_unlock_irqrestore(&cp->cp_lock, flags);
101}
102EXPORT_SYMBOL_GPL(rds_send_path_reset);
103
104static int acquire_in_xmit(struct rds_conn_path *cp)
105{
106 return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0;
107}
108
109static void release_in_xmit(struct rds_conn_path *cp)
110{
111 clear_bit(RDS_IN_XMIT, &cp->cp_flags);
112 smp_mb__after_atomic();
113
114
115
116
117
118
119 if (waitqueue_active(&cp->cp_waitq))
120 wake_up_all(&cp->cp_waitq);
121}
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137int rds_send_xmit(struct rds_conn_path *cp)
138{
139 struct rds_connection *conn = cp->cp_conn;
140 struct rds_message *rm;
141 unsigned long flags;
142 unsigned int tmp;
143 struct scatterlist *sg;
144 int ret = 0;
145 LIST_HEAD(to_be_dropped);
146 int batch_count;
147 unsigned long send_gen = 0;
148
149restart:
150 batch_count = 0;
151
152
153
154
155
156
157
158
159 if (!acquire_in_xmit(cp)) {
160 rds_stats_inc(s_send_lock_contention);
161 ret = -ENOMEM;
162 goto out;
163 }
164
165
166
167
168
169
170
171
172
173 send_gen = READ_ONCE(cp->cp_send_gen) + 1;
174 WRITE_ONCE(cp->cp_send_gen, send_gen);
175
176
177
178
179
180 if (!rds_conn_path_up(cp)) {
181 release_in_xmit(cp);
182 ret = 0;
183 goto out;
184 }
185
186 if (conn->c_trans->xmit_path_prepare)
187 conn->c_trans->xmit_path_prepare(cp);
188
189
190
191
192
193 while (1) {
194
195 rm = cp->cp_xmit_rm;
196
197
198
199
200
201 if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
202 rm = rds_cong_update_alloc(conn);
203 if (IS_ERR(rm)) {
204 ret = PTR_ERR(rm);
205 break;
206 }
207 rm->data.op_active = 1;
208 rm->m_inc.i_conn_path = cp;
209 rm->m_inc.i_conn = cp->cp_conn;
210
211 cp->cp_xmit_rm = rm;
212 }
213
214
215
216
217
218
219
220
221 if (!rm) {
222 unsigned int len;
223
224 batch_count++;
225
226
227
228
229
230
231 if (batch_count >= send_batch_count)
232 goto over_batch;
233
234 spin_lock_irqsave(&cp->cp_lock, flags);
235
236 if (!list_empty(&cp->cp_send_queue)) {
237 rm = list_entry(cp->cp_send_queue.next,
238 struct rds_message,
239 m_conn_item);
240 rds_message_addref(rm);
241
242
243
244
245
246 list_move_tail(&rm->m_conn_item,
247 &cp->cp_retrans);
248 }
249
250 spin_unlock_irqrestore(&cp->cp_lock, flags);
251
252 if (!rm)
253 break;
254
255
256
257
258
259
260
261
262 if (test_bit(RDS_MSG_FLUSH, &rm->m_flags) ||
263 (rm->rdma.op_active &&
264 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))) {
265 spin_lock_irqsave(&cp->cp_lock, flags);
266 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
267 list_move(&rm->m_conn_item, &to_be_dropped);
268 spin_unlock_irqrestore(&cp->cp_lock, flags);
269 continue;
270 }
271
272
273 len = ntohl(rm->m_inc.i_hdr.h_len);
274 if (cp->cp_unacked_packets == 0 ||
275 cp->cp_unacked_bytes < len) {
276 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
277
278 cp->cp_unacked_packets =
279 rds_sysctl_max_unacked_packets;
280 cp->cp_unacked_bytes =
281 rds_sysctl_max_unacked_bytes;
282 rds_stats_inc(s_send_ack_required);
283 } else {
284 cp->cp_unacked_bytes -= len;
285 cp->cp_unacked_packets--;
286 }
287
288 cp->cp_xmit_rm = rm;
289 }
290
291
292 if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) {
293 rm->m_final_op = &rm->rdma;
294
295
296
297 set_bit(RDS_MSG_MAPPED, &rm->m_flags);
298 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
299 if (ret) {
300 clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
301 wake_up_interruptible(&rm->m_flush_wait);
302 break;
303 }
304 cp->cp_xmit_rdma_sent = 1;
305
306 }
307
308 if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) {
309 rm->m_final_op = &rm->atomic;
310
311
312
313 set_bit(RDS_MSG_MAPPED, &rm->m_flags);
314 ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
315 if (ret) {
316 clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
317 wake_up_interruptible(&rm->m_flush_wait);
318 break;
319 }
320 cp->cp_xmit_atomic_sent = 1;
321
322 }
323
324
325
326
327
328
329
330
331 if (rm->data.op_nents == 0) {
332 int ops_present;
333 int all_ops_are_silent = 1;
334
335 ops_present = (rm->atomic.op_active || rm->rdma.op_active);
336 if (rm->atomic.op_active && !rm->atomic.op_silent)
337 all_ops_are_silent = 0;
338 if (rm->rdma.op_active && !rm->rdma.op_silent)
339 all_ops_are_silent = 0;
340
341 if (ops_present && all_ops_are_silent
342 && !rm->m_rdma_cookie)
343 rm->data.op_active = 0;
344 }
345
346 if (rm->data.op_active && !cp->cp_xmit_data_sent) {
347 rm->m_final_op = &rm->data;
348
349 ret = conn->c_trans->xmit(conn, rm,
350 cp->cp_xmit_hdr_off,
351 cp->cp_xmit_sg,
352 cp->cp_xmit_data_off);
353 if (ret <= 0)
354 break;
355
356 if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) {
357 tmp = min_t(int, ret,
358 sizeof(struct rds_header) -
359 cp->cp_xmit_hdr_off);
360 cp->cp_xmit_hdr_off += tmp;
361 ret -= tmp;
362 }
363
364 sg = &rm->data.op_sg[cp->cp_xmit_sg];
365 while (ret) {
366 tmp = min_t(int, ret, sg->length -
367 cp->cp_xmit_data_off);
368 cp->cp_xmit_data_off += tmp;
369 ret -= tmp;
370 if (cp->cp_xmit_data_off == sg->length) {
371 cp->cp_xmit_data_off = 0;
372 sg++;
373 cp->cp_xmit_sg++;
374 BUG_ON(ret != 0 && cp->cp_xmit_sg ==
375 rm->data.op_nents);
376 }
377 }
378
379 if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) &&
380 (cp->cp_xmit_sg == rm->data.op_nents))
381 cp->cp_xmit_data_sent = 1;
382 }
383
384
385
386
387
388
389 if (!rm->data.op_active || cp->cp_xmit_data_sent) {
390 cp->cp_xmit_rm = NULL;
391 cp->cp_xmit_sg = 0;
392 cp->cp_xmit_hdr_off = 0;
393 cp->cp_xmit_data_off = 0;
394 cp->cp_xmit_rdma_sent = 0;
395 cp->cp_xmit_atomic_sent = 0;
396 cp->cp_xmit_data_sent = 0;
397
398 rds_message_put(rm);
399 }
400 }
401
402over_batch:
403 if (conn->c_trans->xmit_path_complete)
404 conn->c_trans->xmit_path_complete(cp);
405 release_in_xmit(cp);
406
407
408 if (!list_empty(&to_be_dropped)) {
409
410 list_for_each_entry(rm, &to_be_dropped, m_conn_item)
411 rds_message_put(rm);
412 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
413 }
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430 if (ret == 0) {
431 bool raced;
432
433 smp_mb();
434 raced = send_gen != READ_ONCE(cp->cp_send_gen);
435
436 if ((test_bit(0, &conn->c_map_queued) ||
437 !list_empty(&cp->cp_send_queue)) && !raced) {
438 if (batch_count < send_batch_count)
439 goto restart;
440 queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
441 } else if (raced) {
442 rds_stats_inc(s_send_lock_queue_raced);
443 }
444 }
445out:
446 return ret;
447}
448EXPORT_SYMBOL_GPL(rds_send_xmit);
449
450static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
451{
452 u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
453
454 assert_spin_locked(&rs->rs_lock);
455
456 BUG_ON(rs->rs_snd_bytes < len);
457 rs->rs_snd_bytes -= len;
458
459 if (rs->rs_snd_bytes == 0)
460 rds_stats_inc(s_send_queue_empty);
461}
462
463static inline int rds_send_is_acked(struct rds_message *rm, u64 ack,
464 is_acked_func is_acked)
465{
466 if (is_acked)
467 return is_acked(rm, ack);
468 return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack;
469}
470
471
472
473
474
475
476
477void rds_rdma_send_complete(struct rds_message *rm, int status)
478{
479 struct rds_sock *rs = NULL;
480 struct rm_rdma_op *ro;
481 struct rds_notifier *notifier;
482 unsigned long flags;
483 unsigned int notify = 0;
484
485 spin_lock_irqsave(&rm->m_rs_lock, flags);
486
487 notify = rm->rdma.op_notify | rm->data.op_notify;
488 ro = &rm->rdma;
489 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
490 ro->op_active && notify && ro->op_notifier) {
491 notifier = ro->op_notifier;
492 rs = rm->m_rs;
493 sock_hold(rds_rs_to_sk(rs));
494
495 notifier->n_status = status;
496 spin_lock(&rs->rs_lock);
497 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue);
498 spin_unlock(&rs->rs_lock);
499
500 ro->op_notifier = NULL;
501 }
502
503 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
504
505 if (rs) {
506 rds_wake_sk_sleep(rs);
507 sock_put(rds_rs_to_sk(rs));
508 }
509}
510EXPORT_SYMBOL_GPL(rds_rdma_send_complete);
511
512
513
514
515void rds_atomic_send_complete(struct rds_message *rm, int status)
516{
517 struct rds_sock *rs = NULL;
518 struct rm_atomic_op *ao;
519 struct rds_notifier *notifier;
520 unsigned long flags;
521
522 spin_lock_irqsave(&rm->m_rs_lock, flags);
523
524 ao = &rm->atomic;
525 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
526 && ao->op_active && ao->op_notify && ao->op_notifier) {
527 notifier = ao->op_notifier;
528 rs = rm->m_rs;
529 sock_hold(rds_rs_to_sk(rs));
530
531 notifier->n_status = status;
532 spin_lock(&rs->rs_lock);
533 list_add_tail(¬ifier->n_list, &rs->rs_notify_queue);
534 spin_unlock(&rs->rs_lock);
535
536 ao->op_notifier = NULL;
537 }
538
539 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
540
541 if (rs) {
542 rds_wake_sk_sleep(rs);
543 sock_put(rds_rs_to_sk(rs));
544 }
545}
546EXPORT_SYMBOL_GPL(rds_atomic_send_complete);
547
548
549
550
551
552
553static inline void
554__rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status)
555{
556 struct rm_rdma_op *ro;
557 struct rm_atomic_op *ao;
558
559 ro = &rm->rdma;
560 if (ro->op_active && ro->op_notify && ro->op_notifier) {
561 ro->op_notifier->n_status = status;
562 list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue);
563 ro->op_notifier = NULL;
564 }
565
566 ao = &rm->atomic;
567 if (ao->op_active && ao->op_notify && ao->op_notifier) {
568 ao->op_notifier->n_status = status;
569 list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue);
570 ao->op_notifier = NULL;
571 }
572
573
574}
575
576
577
578
579
580
581
582
583
584static void rds_send_remove_from_sock(struct list_head *messages, int status)
585{
586 unsigned long flags;
587 struct rds_sock *rs = NULL;
588 struct rds_message *rm;
589
590 while (!list_empty(messages)) {
591 int was_on_sock = 0;
592
593 rm = list_entry(messages->next, struct rds_message,
594 m_conn_item);
595 list_del_init(&rm->m_conn_item);
596
597
598
599
600
601
602
603
604
605
606
607 spin_lock_irqsave(&rm->m_rs_lock, flags);
608 if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags))
609 goto unlock_and_drop;
610
611 if (rs != rm->m_rs) {
612 if (rs) {
613 rds_wake_sk_sleep(rs);
614 sock_put(rds_rs_to_sk(rs));
615 }
616 rs = rm->m_rs;
617 if (rs)
618 sock_hold(rds_rs_to_sk(rs));
619 }
620 if (!rs)
621 goto unlock_and_drop;
622 spin_lock(&rs->rs_lock);
623
624 if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) {
625 struct rm_rdma_op *ro = &rm->rdma;
626 struct rds_notifier *notifier;
627
628 list_del_init(&rm->m_sock_item);
629 rds_send_sndbuf_remove(rs, rm);
630
631 if (ro->op_active && ro->op_notifier &&
632 (ro->op_notify || (ro->op_recverr && status))) {
633 notifier = ro->op_notifier;
634 list_add_tail(¬ifier->n_list,
635 &rs->rs_notify_queue);
636 if (!notifier->n_status)
637 notifier->n_status = status;
638 rm->rdma.op_notifier = NULL;
639 }
640 was_on_sock = 1;
641 rm->m_rs = NULL;
642 }
643 spin_unlock(&rs->rs_lock);
644
645unlock_and_drop:
646 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
647 rds_message_put(rm);
648 if (was_on_sock)
649 rds_message_put(rm);
650 }
651
652 if (rs) {
653 rds_wake_sk_sleep(rs);
654 sock_put(rds_rs_to_sk(rs));
655 }
656}
657
658
659
660
661
662
663
664
665
666void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack,
667 is_acked_func is_acked)
668{
669 struct rds_message *rm, *tmp;
670 unsigned long flags;
671 LIST_HEAD(list);
672
673 spin_lock_irqsave(&cp->cp_lock, flags);
674
675 list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
676 if (!rds_send_is_acked(rm, ack, is_acked))
677 break;
678
679 list_move(&rm->m_conn_item, &list);
680 clear_bit(RDS_MSG_ON_CONN, &rm->m_flags);
681 }
682
683
684 if (!list_empty(&list))
685 smp_mb__after_atomic();
686
687 spin_unlock_irqrestore(&cp->cp_lock, flags);
688
689
690 rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
691}
692EXPORT_SYMBOL_GPL(rds_send_path_drop_acked);
693
694void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
695 is_acked_func is_acked)
696{
697 WARN_ON(conn->c_trans->t_mp_capable);
698 rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked);
699}
700EXPORT_SYMBOL_GPL(rds_send_drop_acked);
701
702void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
703{
704 struct rds_message *rm, *tmp;
705 struct rds_connection *conn;
706 struct rds_conn_path *cp;
707 unsigned long flags;
708 LIST_HEAD(list);
709
710
711 spin_lock_irqsave(&rs->rs_lock, flags);
712
713 list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) {
714 if (dest && (dest->sin_addr.s_addr != rm->m_daddr ||
715 dest->sin_port != rm->m_inc.i_hdr.h_dport))
716 continue;
717
718 list_move(&rm->m_sock_item, &list);
719 rds_send_sndbuf_remove(rs, rm);
720 clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
721 }
722
723
724 smp_mb__after_atomic();
725
726 spin_unlock_irqrestore(&rs->rs_lock, flags);
727
728 if (list_empty(&list))
729 return;
730
731
732 list_for_each_entry(rm, &list, m_sock_item) {
733
734 conn = rm->m_inc.i_conn;
735 if (conn->c_trans->t_mp_capable)
736 cp = rm->m_inc.i_conn_path;
737 else
738 cp = &conn->c_path[0];
739
740 spin_lock_irqsave(&cp->cp_lock, flags);
741
742
743
744
745
746 if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
747 spin_unlock_irqrestore(&cp->cp_lock, flags);
748 spin_lock_irqsave(&rm->m_rs_lock, flags);
749 rm->m_rs = NULL;
750 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
751 continue;
752 }
753 list_del_init(&rm->m_conn_item);
754 spin_unlock_irqrestore(&cp->cp_lock, flags);
755
756
757
758
759
760 spin_lock_irqsave(&rm->m_rs_lock, flags);
761
762 spin_lock(&rs->rs_lock);
763 __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
764 spin_unlock(&rs->rs_lock);
765
766 rm->m_rs = NULL;
767 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
768
769 rds_message_put(rm);
770 }
771
772 rds_wake_sk_sleep(rs);
773
774 while (!list_empty(&list)) {
775 rm = list_entry(list.next, struct rds_message, m_sock_item);
776 list_del_init(&rm->m_sock_item);
777 rds_message_wait(rm);
778
779
780
781
782
783
784 spin_lock_irqsave(&rm->m_rs_lock, flags);
785
786 spin_lock(&rs->rs_lock);
787 __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
788 spin_unlock(&rs->rs_lock);
789
790 rm->m_rs = NULL;
791 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
792
793 rds_message_put(rm);
794 }
795}
796
797
798
799
800
801
802static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
803 struct rds_conn_path *cp,
804 struct rds_message *rm, __be16 sport,
805 __be16 dport, int *queued)
806{
807 unsigned long flags;
808 u32 len;
809
810 if (*queued)
811 goto out;
812
813 len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
814
815
816
817 spin_lock_irqsave(&rs->rs_lock, flags);
818
819
820
821
822
823
824
825
826
827 if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) {
828 rs->rs_snd_bytes += len;
829
830
831
832
833
834
835 if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2)
836 set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
837
838 list_add_tail(&rm->m_sock_item, &rs->rs_send_queue);
839 set_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
840 rds_message_addref(rm);
841 rm->m_rs = rs;
842
843
844
845 rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0);
846 rm->m_inc.i_conn = conn;
847 rm->m_inc.i_conn_path = cp;
848 rds_message_addref(rm);
849
850 spin_lock(&cp->cp_lock);
851 rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++);
852 list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
853 set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
854 spin_unlock(&cp->cp_lock);
855
856 rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n",
857 rm, len, rs, rs->rs_snd_bytes,
858 (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence));
859
860 *queued = 1;
861 }
862
863 spin_unlock_irqrestore(&rs->rs_lock, flags);
864out:
865 return *queued;
866}
867
868
869
870
871
872static int rds_rm_size(struct msghdr *msg, int data_len)
873{
874 struct cmsghdr *cmsg;
875 int size = 0;
876 int cmsg_groups = 0;
877 int retval;
878
879 for_each_cmsghdr(cmsg, msg) {
880 if (!CMSG_OK(msg, cmsg))
881 return -EINVAL;
882
883 if (cmsg->cmsg_level != SOL_RDS)
884 continue;
885
886 switch (cmsg->cmsg_type) {
887 case RDS_CMSG_RDMA_ARGS:
888 cmsg_groups |= 1;
889 retval = rds_rdma_extra_size(CMSG_DATA(cmsg));
890 if (retval < 0)
891 return retval;
892 size += retval;
893
894 break;
895
896 case RDS_CMSG_RDMA_DEST:
897 case RDS_CMSG_RDMA_MAP:
898 cmsg_groups |= 2;
899
900 break;
901
902 case RDS_CMSG_ATOMIC_CSWP:
903 case RDS_CMSG_ATOMIC_FADD:
904 case RDS_CMSG_MASKED_ATOMIC_CSWP:
905 case RDS_CMSG_MASKED_ATOMIC_FADD:
906 cmsg_groups |= 1;
907 size += sizeof(struct scatterlist);
908 break;
909
910 default:
911 return -EINVAL;
912 }
913
914 }
915
916 size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist);
917
918
919 if (cmsg_groups == 3)
920 return -EINVAL;
921
922 return size;
923}
924
925static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
926 struct msghdr *msg, int *allocated_mr)
927{
928 struct cmsghdr *cmsg;
929 int ret = 0;
930
931 for_each_cmsghdr(cmsg, msg) {
932 if (!CMSG_OK(msg, cmsg))
933 return -EINVAL;
934
935 if (cmsg->cmsg_level != SOL_RDS)
936 continue;
937
938
939
940
941 switch (cmsg->cmsg_type) {
942 case RDS_CMSG_RDMA_ARGS:
943 ret = rds_cmsg_rdma_args(rs, rm, cmsg);
944 break;
945
946 case RDS_CMSG_RDMA_DEST:
947 ret = rds_cmsg_rdma_dest(rs, rm, cmsg);
948 break;
949
950 case RDS_CMSG_RDMA_MAP:
951 ret = rds_cmsg_rdma_map(rs, rm, cmsg);
952 if (!ret)
953 *allocated_mr = 1;
954 else if (ret == -ENODEV)
955
956
957
958 ret = -EAGAIN;
959 break;
960 case RDS_CMSG_ATOMIC_CSWP:
961 case RDS_CMSG_ATOMIC_FADD:
962 case RDS_CMSG_MASKED_ATOMIC_CSWP:
963 case RDS_CMSG_MASKED_ATOMIC_FADD:
964 ret = rds_cmsg_atomic(rs, rm, cmsg);
965 break;
966
967 default:
968 return -EINVAL;
969 }
970
971 if (ret)
972 break;
973 }
974
975 return ret;
976}
977
978static int rds_send_mprds_hash(struct rds_sock *rs, struct rds_connection *conn)
979{
980 int hash;
981
982 if (conn->c_npaths == 0)
983 hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
984 else
985 hash = RDS_MPATH_HASH(rs, conn->c_npaths);
986 if (conn->c_npaths == 0 && hash != 0) {
987 rds_send_ping(conn, 0);
988
989 if (conn->c_npaths == 0) {
990 wait_event_interruptible(conn->c_hs_waitq,
991 (conn->c_npaths != 0));
992 }
993 if (conn->c_npaths == 1)
994 hash = 0;
995 }
996 return hash;
997}
998
999static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes)
1000{
1001 struct rds_rdma_args *args;
1002 struct cmsghdr *cmsg;
1003
1004 for_each_cmsghdr(cmsg, msg) {
1005 if (!CMSG_OK(msg, cmsg))
1006 return -EINVAL;
1007
1008 if (cmsg->cmsg_level != SOL_RDS)
1009 continue;
1010
1011 if (cmsg->cmsg_type == RDS_CMSG_RDMA_ARGS) {
1012 args = CMSG_DATA(cmsg);
1013 *rdma_bytes += args->remote_vec.bytes;
1014 }
1015 }
1016 return 0;
1017}
1018
1019int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
1020{
1021 struct sock *sk = sock->sk;
1022 struct rds_sock *rs = rds_sk_to_rs(sk);
1023 DECLARE_SOCKADDR(struct sockaddr_in *, usin, msg->msg_name);
1024 __be32 daddr;
1025 __be16 dport;
1026 struct rds_message *rm = NULL;
1027 struct rds_connection *conn;
1028 int ret = 0;
1029 int queued = 0, allocated_mr = 0;
1030 int nonblock = msg->msg_flags & MSG_DONTWAIT;
1031 long timeo = sock_sndtimeo(sk, nonblock);
1032 struct rds_conn_path *cpath;
1033 size_t total_payload_len = payload_len, rdma_payload_len = 0;
1034
1035
1036
1037 if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT)) {
1038 ret = -EOPNOTSUPP;
1039 goto out;
1040 }
1041
1042 if (msg->msg_namelen) {
1043
1044 if (msg->msg_namelen < sizeof(*usin) || usin->sin_family != AF_INET) {
1045 ret = -EINVAL;
1046 goto out;
1047 }
1048 daddr = usin->sin_addr.s_addr;
1049 dport = usin->sin_port;
1050 } else {
1051
1052 lock_sock(sk);
1053 daddr = rs->rs_conn_addr;
1054 dport = rs->rs_conn_port;
1055 release_sock(sk);
1056 }
1057
1058 lock_sock(sk);
1059 if (daddr == 0 || rs->rs_bound_addr == 0) {
1060 release_sock(sk);
1061 ret = -ENOTCONN;
1062 goto out;
1063 }
1064 release_sock(sk);
1065
1066 ret = rds_rdma_bytes(msg, &rdma_payload_len);
1067 if (ret)
1068 goto out;
1069
1070 total_payload_len += rdma_payload_len;
1071 if (max_t(size_t, payload_len, rdma_payload_len) > RDS_MAX_MSG_SIZE) {
1072 ret = -EMSGSIZE;
1073 goto out;
1074 }
1075
1076 if (payload_len > rds_sk_sndbuf(rs)) {
1077 ret = -EMSGSIZE;
1078 goto out;
1079 }
1080
1081
1082 ret = rds_rm_size(msg, payload_len);
1083 if (ret < 0)
1084 goto out;
1085
1086 rm = rds_message_alloc(ret, GFP_KERNEL);
1087 if (!rm) {
1088 ret = -ENOMEM;
1089 goto out;
1090 }
1091
1092
1093 if (payload_len) {
1094 rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE));
1095 if (!rm->data.op_sg) {
1096 ret = -ENOMEM;
1097 goto out;
1098 }
1099 ret = rds_message_copy_from_user(rm, &msg->msg_iter);
1100 if (ret)
1101 goto out;
1102 }
1103 rm->data.op_active = 1;
1104
1105 rm->m_daddr = daddr;
1106
1107
1108
1109 if (rs->rs_conn && rs->rs_conn->c_faddr == daddr)
1110 conn = rs->rs_conn;
1111 else {
1112 conn = rds_conn_create_outgoing(sock_net(sock->sk),
1113 rs->rs_bound_addr, daddr,
1114 rs->rs_transport,
1115 sock->sk->sk_allocation);
1116 if (IS_ERR(conn)) {
1117 ret = PTR_ERR(conn);
1118 goto out;
1119 }
1120 rs->rs_conn = conn;
1121 }
1122
1123
1124 ret = rds_cmsg_send(rs, rm, msg, &allocated_mr);
1125 if (ret) {
1126
1127 if (ret == -EAGAIN)
1128 rds_conn_connect_if_down(conn);
1129 goto out;
1130 }
1131
1132 if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
1133 printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
1134 &rm->rdma, conn->c_trans->xmit_rdma);
1135 ret = -EOPNOTSUPP;
1136 goto out;
1137 }
1138
1139 if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) {
1140 printk_ratelimited(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n",
1141 &rm->atomic, conn->c_trans->xmit_atomic);
1142 ret = -EOPNOTSUPP;
1143 goto out;
1144 }
1145
1146 if (conn->c_trans->t_mp_capable)
1147 cpath = &conn->c_path[rds_send_mprds_hash(rs, conn)];
1148 else
1149 cpath = &conn->c_path[0];
1150
1151 rds_conn_path_connect_if_down(cpath);
1152
1153 ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
1154 if (ret) {
1155 rs->rs_seen_congestion = 1;
1156 goto out;
1157 }
1158 while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port,
1159 dport, &queued)) {
1160 rds_stats_inc(s_send_queue_full);
1161
1162 if (nonblock) {
1163 ret = -EAGAIN;
1164 goto out;
1165 }
1166
1167 timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
1168 rds_send_queue_rm(rs, conn, cpath, rm,
1169 rs->rs_bound_port,
1170 dport,
1171 &queued),
1172 timeo);
1173 rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo);
1174 if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
1175 continue;
1176
1177 ret = timeo;
1178 if (ret == 0)
1179 ret = -ETIMEDOUT;
1180 goto out;
1181 }
1182
1183
1184
1185
1186
1187 rds_stats_inc(s_send_queued);
1188
1189 ret = rds_send_xmit(cpath);
1190 if (ret == -ENOMEM || ret == -EAGAIN)
1191 queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
1192
1193 rds_message_put(rm);
1194 return payload_len;
1195
1196out:
1197
1198
1199
1200 if (allocated_mr)
1201 rds_rdma_unuse(rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1);
1202
1203 if (rm)
1204 rds_message_put(rm);
1205 return ret;
1206}
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216static int
1217rds_send_probe(struct rds_conn_path *cp, __be16 sport,
1218 __be16 dport, u8 h_flags)
1219{
1220 struct rds_message *rm;
1221 unsigned long flags;
1222 int ret = 0;
1223
1224 rm = rds_message_alloc(0, GFP_ATOMIC);
1225 if (!rm) {
1226 ret = -ENOMEM;
1227 goto out;
1228 }
1229
1230 rm->m_daddr = cp->cp_conn->c_faddr;
1231 rm->data.op_active = 1;
1232
1233 rds_conn_path_connect_if_down(cp);
1234
1235 ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL);
1236 if (ret)
1237 goto out;
1238
1239 spin_lock_irqsave(&cp->cp_lock, flags);
1240 list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
1241 set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
1242 rds_message_addref(rm);
1243 rm->m_inc.i_conn = cp->cp_conn;
1244 rm->m_inc.i_conn_path = cp;
1245
1246 rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport,
1247 cp->cp_next_tx_seq);
1248 rm->m_inc.i_hdr.h_flags |= h_flags;
1249 cp->cp_next_tx_seq++;
1250
1251 if (RDS_HS_PROBE(be16_to_cpu(sport), be16_to_cpu(dport)) &&
1252 cp->cp_conn->c_trans->t_mp_capable) {
1253 u16 npaths = cpu_to_be16(RDS_MPATH_WORKERS);
1254 u32 my_gen_num = cpu_to_be32(cp->cp_conn->c_my_gen_num);
1255
1256 rds_message_add_extension(&rm->m_inc.i_hdr,
1257 RDS_EXTHDR_NPATHS, &npaths,
1258 sizeof(npaths));
1259 rds_message_add_extension(&rm->m_inc.i_hdr,
1260 RDS_EXTHDR_GEN_NUM,
1261 &my_gen_num,
1262 sizeof(u32));
1263 }
1264 spin_unlock_irqrestore(&cp->cp_lock, flags);
1265
1266 rds_stats_inc(s_send_queued);
1267 rds_stats_inc(s_send_pong);
1268
1269
1270 queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
1271
1272 rds_message_put(rm);
1273 return 0;
1274
1275out:
1276 if (rm)
1277 rds_message_put(rm);
1278 return ret;
1279}
1280
1281int
1282rds_send_pong(struct rds_conn_path *cp, __be16 dport)
1283{
1284 return rds_send_probe(cp, 0, dport, 0);
1285}
1286
1287void
1288rds_send_ping(struct rds_connection *conn, int cp_index)
1289{
1290 unsigned long flags;
1291 struct rds_conn_path *cp = &conn->c_path[cp_index];
1292
1293 spin_lock_irqsave(&cp->cp_lock, flags);
1294 if (conn->c_ping_triggered) {
1295 spin_unlock_irqrestore(&cp->cp_lock, flags);
1296 return;
1297 }
1298 conn->c_ping_triggered = 1;
1299 spin_unlock_irqrestore(&cp->cp_lock, flags);
1300 rds_send_probe(cp, cpu_to_be16(RDS_FLAG_PROBE_PORT), 0, 0);
1301}
1302EXPORT_SYMBOL_GPL(rds_send_ping);
1303