1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33#include <linux/kernel.h>
34#include <linux/slab.h>
35#include <net/sock.h>
36#include <linux/in.h>
37#include <linux/export.h>
38#include <linux/time.h>
39#include <linux/rds.h>
40
41#include "rds.h"
42
43void rds_inc_init(struct rds_incoming *inc, struct rds_connection *conn,
44 __be32 saddr)
45{
46 atomic_set(&inc->i_refcount, 1);
47 INIT_LIST_HEAD(&inc->i_item);
48 inc->i_conn = conn;
49 inc->i_saddr = saddr;
50 inc->i_rdma_cookie = 0;
51 inc->i_rx_tstamp.tv_sec = 0;
52 inc->i_rx_tstamp.tv_usec = 0;
53}
54EXPORT_SYMBOL_GPL(rds_inc_init);
55
56void rds_inc_path_init(struct rds_incoming *inc, struct rds_conn_path *cp,
57 __be32 saddr)
58{
59 atomic_set(&inc->i_refcount, 1);
60 INIT_LIST_HEAD(&inc->i_item);
61 inc->i_conn = cp->cp_conn;
62 inc->i_conn_path = cp;
63 inc->i_saddr = saddr;
64 inc->i_rdma_cookie = 0;
65 inc->i_rx_tstamp.tv_sec = 0;
66 inc->i_rx_tstamp.tv_usec = 0;
67}
68EXPORT_SYMBOL_GPL(rds_inc_path_init);
69
70static void rds_inc_addref(struct rds_incoming *inc)
71{
72 rdsdebug("addref inc %p ref %d\n", inc, atomic_read(&inc->i_refcount));
73 atomic_inc(&inc->i_refcount);
74}
75
76void rds_inc_put(struct rds_incoming *inc)
77{
78 rdsdebug("put inc %p ref %d\n", inc, atomic_read(&inc->i_refcount));
79 if (atomic_dec_and_test(&inc->i_refcount)) {
80 BUG_ON(!list_empty(&inc->i_item));
81
82 inc->i_conn->c_trans->inc_free(inc);
83 }
84}
85EXPORT_SYMBOL_GPL(rds_inc_put);
86
87static void rds_recv_rcvbuf_delta(struct rds_sock *rs, struct sock *sk,
88 struct rds_cong_map *map,
89 int delta, __be16 port)
90{
91 int now_congested;
92
93 if (delta == 0)
94 return;
95
96 rs->rs_rcv_bytes += delta;
97 now_congested = rs->rs_rcv_bytes > rds_sk_rcvbuf(rs);
98
99 rdsdebug("rs %p (%pI4:%u) recv bytes %d buf %d "
100 "now_cong %d delta %d\n",
101 rs, &rs->rs_bound_addr,
102 ntohs(rs->rs_bound_port), rs->rs_rcv_bytes,
103 rds_sk_rcvbuf(rs), now_congested, delta);
104
105
106 if (!rs->rs_congested && now_congested) {
107 rs->rs_congested = 1;
108 rds_cong_set_bit(map, port);
109 rds_cong_queue_updates(map);
110 }
111
112
113
114 else if (rs->rs_congested && (rs->rs_rcv_bytes < (rds_sk_rcvbuf(rs)/2))) {
115 rs->rs_congested = 0;
116 rds_cong_clear_bit(map, port);
117 rds_cong_queue_updates(map);
118 }
119
120
121}
122
123
124
125
126static void rds_recv_incoming_exthdrs(struct rds_incoming *inc, struct rds_sock *rs)
127{
128 struct rds_header *hdr = &inc->i_hdr;
129 unsigned int pos = 0, type, len;
130 union {
131 struct rds_ext_header_version version;
132 struct rds_ext_header_rdma rdma;
133 struct rds_ext_header_rdma_dest rdma_dest;
134 } buffer;
135
136 while (1) {
137 len = sizeof(buffer);
138 type = rds_message_next_extension(hdr, &pos, &buffer, &len);
139 if (type == RDS_EXTHDR_NONE)
140 break;
141
142 switch (type) {
143 case RDS_EXTHDR_RDMA:
144 rds_rdma_unuse(rs, be32_to_cpu(buffer.rdma.h_rdma_rkey), 0);
145 break;
146
147 case RDS_EXTHDR_RDMA_DEST:
148
149
150 inc->i_rdma_cookie = rds_rdma_make_cookie(
151 be32_to_cpu(buffer.rdma_dest.h_rdma_rkey),
152 be32_to_cpu(buffer.rdma_dest.h_rdma_offset));
153
154 break;
155 }
156 }
157}
158
159static void rds_recv_hs_exthdrs(struct rds_header *hdr,
160 struct rds_connection *conn)
161{
162 unsigned int pos = 0, type, len;
163 union {
164 struct rds_ext_header_version version;
165 u16 rds_npaths;
166 } buffer;
167
168 while (1) {
169 len = sizeof(buffer);
170 type = rds_message_next_extension(hdr, &pos, &buffer, &len);
171 if (type == RDS_EXTHDR_NONE)
172 break;
173
174 switch (type) {
175 case RDS_EXTHDR_NPATHS:
176 conn->c_npaths = min_t(int, RDS_MPATH_WORKERS,
177 buffer.rds_npaths);
178 break;
179 default:
180 pr_warn_ratelimited("ignoring unknown exthdr type "
181 "0x%x\n", type);
182 }
183 }
184
185 conn->c_npaths = max_t(int, conn->c_npaths, 1);
186}
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207static void rds_start_mprds(struct rds_connection *conn)
208{
209 int i;
210 struct rds_conn_path *cp;
211
212 if (conn->c_npaths > 1 && conn->c_laddr < conn->c_faddr) {
213 for (i = 1; i < conn->c_npaths; i++) {
214 cp = &conn->c_path[i];
215 rds_conn_path_connect_if_down(cp);
216 }
217 }
218}
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236void rds_recv_incoming(struct rds_connection *conn, __be32 saddr, __be32 daddr,
237 struct rds_incoming *inc, gfp_t gfp)
238{
239 struct rds_sock *rs = NULL;
240 struct sock *sk;
241 unsigned long flags;
242 struct rds_conn_path *cp;
243
244 inc->i_conn = conn;
245 inc->i_rx_jiffies = jiffies;
246 if (conn->c_trans->t_mp_capable)
247 cp = inc->i_conn_path;
248 else
249 cp = &conn->c_path[0];
250
251 rdsdebug("conn %p next %llu inc %p seq %llu len %u sport %u dport %u "
252 "flags 0x%x rx_jiffies %lu\n", conn,
253 (unsigned long long)cp->cp_next_rx_seq,
254 inc,
255 (unsigned long long)be64_to_cpu(inc->i_hdr.h_sequence),
256 be32_to_cpu(inc->i_hdr.h_len),
257 be16_to_cpu(inc->i_hdr.h_sport),
258 be16_to_cpu(inc->i_hdr.h_dport),
259 inc->i_hdr.h_flags,
260 inc->i_rx_jiffies);
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282 if (be64_to_cpu(inc->i_hdr.h_sequence) < cp->cp_next_rx_seq &&
283 (inc->i_hdr.h_flags & RDS_FLAG_RETRANSMITTED)) {
284 rds_stats_inc(s_recv_drop_old_seq);
285 goto out;
286 }
287 cp->cp_next_rx_seq = be64_to_cpu(inc->i_hdr.h_sequence) + 1;
288
289 if (rds_sysctl_ping_enable && inc->i_hdr.h_dport == 0) {
290 if (inc->i_hdr.h_sport == 0) {
291 rdsdebug("ignore ping with 0 sport from 0x%x\n", saddr);
292 goto out;
293 }
294 rds_stats_inc(s_recv_ping);
295 rds_send_pong(cp, inc->i_hdr.h_sport);
296
297 if (RDS_HS_PROBE(inc->i_hdr.h_sport, inc->i_hdr.h_dport)) {
298 rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
299 rds_start_mprds(cp->cp_conn);
300 }
301 goto out;
302 }
303
304 if (inc->i_hdr.h_dport == RDS_FLAG_PROBE_PORT &&
305 inc->i_hdr.h_sport == 0) {
306 rds_recv_hs_exthdrs(&inc->i_hdr, cp->cp_conn);
307
308 rds_start_mprds(cp->cp_conn);
309 wake_up(&cp->cp_conn->c_hs_waitq);
310 goto out;
311 }
312
313 rs = rds_find_bound(daddr, inc->i_hdr.h_dport);
314 if (!rs) {
315 rds_stats_inc(s_recv_drop_no_sock);
316 goto out;
317 }
318
319
320 rds_recv_incoming_exthdrs(inc, rs);
321
322
323 sk = rds_rs_to_sk(rs);
324
325
326 write_lock_irqsave(&rs->rs_recv_lock, flags);
327 if (!sock_flag(sk, SOCK_DEAD)) {
328 rdsdebug("adding inc %p to rs %p's recv queue\n", inc, rs);
329 rds_stats_inc(s_recv_queued);
330 rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
331 be32_to_cpu(inc->i_hdr.h_len),
332 inc->i_hdr.h_dport);
333 if (sock_flag(sk, SOCK_RCVTSTAMP))
334 do_gettimeofday(&inc->i_rx_tstamp);
335 rds_inc_addref(inc);
336 list_add_tail(&inc->i_item, &rs->rs_recv_queue);
337 __rds_wake_sk_sleep(sk);
338 } else {
339 rds_stats_inc(s_recv_drop_dead_sock);
340 }
341 write_unlock_irqrestore(&rs->rs_recv_lock, flags);
342
343out:
344 if (rs)
345 rds_sock_put(rs);
346}
347EXPORT_SYMBOL_GPL(rds_recv_incoming);
348
349
350
351
352
353static int rds_next_incoming(struct rds_sock *rs, struct rds_incoming **inc)
354{
355 unsigned long flags;
356
357 if (!*inc) {
358 read_lock_irqsave(&rs->rs_recv_lock, flags);
359 if (!list_empty(&rs->rs_recv_queue)) {
360 *inc = list_entry(rs->rs_recv_queue.next,
361 struct rds_incoming,
362 i_item);
363 rds_inc_addref(*inc);
364 }
365 read_unlock_irqrestore(&rs->rs_recv_lock, flags);
366 }
367
368 return *inc != NULL;
369}
370
371static int rds_still_queued(struct rds_sock *rs, struct rds_incoming *inc,
372 int drop)
373{
374 struct sock *sk = rds_rs_to_sk(rs);
375 int ret = 0;
376 unsigned long flags;
377
378 write_lock_irqsave(&rs->rs_recv_lock, flags);
379 if (!list_empty(&inc->i_item)) {
380 ret = 1;
381 if (drop) {
382
383 rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
384 -be32_to_cpu(inc->i_hdr.h_len),
385 inc->i_hdr.h_dport);
386 list_del_init(&inc->i_item);
387 rds_inc_put(inc);
388 }
389 }
390 write_unlock_irqrestore(&rs->rs_recv_lock, flags);
391
392 rdsdebug("inc %p rs %p still %d dropped %d\n", inc, rs, ret, drop);
393 return ret;
394}
395
396
397
398
399
400int rds_notify_queue_get(struct rds_sock *rs, struct msghdr *msghdr)
401{
402 struct rds_notifier *notifier;
403 struct rds_rdma_notify cmsg = { 0 };
404 unsigned int count = 0, max_messages = ~0U;
405 unsigned long flags;
406 LIST_HEAD(copy);
407 int err = 0;
408
409
410
411
412
413
414
415
416
417 if (msghdr) {
418 max_messages = msghdr->msg_controllen / CMSG_SPACE(sizeof(cmsg));
419 if (!max_messages)
420 max_messages = 1;
421 }
422
423 spin_lock_irqsave(&rs->rs_lock, flags);
424 while (!list_empty(&rs->rs_notify_queue) && count < max_messages) {
425 notifier = list_entry(rs->rs_notify_queue.next,
426 struct rds_notifier, n_list);
427 list_move(¬ifier->n_list, ©);
428 count++;
429 }
430 spin_unlock_irqrestore(&rs->rs_lock, flags);
431
432 if (!count)
433 return 0;
434
435 while (!list_empty(©)) {
436 notifier = list_entry(copy.next, struct rds_notifier, n_list);
437
438 if (msghdr) {
439 cmsg.user_token = notifier->n_user_token;
440 cmsg.status = notifier->n_status;
441
442 err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_RDMA_STATUS,
443 sizeof(cmsg), &cmsg);
444 if (err)
445 break;
446 }
447
448 list_del_init(¬ifier->n_list);
449 kfree(notifier);
450 }
451
452
453
454
455 if (!list_empty(©)) {
456 spin_lock_irqsave(&rs->rs_lock, flags);
457 list_splice(©, &rs->rs_notify_queue);
458 spin_unlock_irqrestore(&rs->rs_lock, flags);
459 }
460
461 return err;
462}
463
464
465
466
467static int rds_notify_cong(struct rds_sock *rs, struct msghdr *msghdr)
468{
469 uint64_t notify = rs->rs_cong_notify;
470 unsigned long flags;
471 int err;
472
473 err = put_cmsg(msghdr, SOL_RDS, RDS_CMSG_CONG_UPDATE,
474 sizeof(notify), ¬ify);
475 if (err)
476 return err;
477
478 spin_lock_irqsave(&rs->rs_lock, flags);
479 rs->rs_cong_notify &= ~notify;
480 spin_unlock_irqrestore(&rs->rs_lock, flags);
481
482 return 0;
483}
484
485
486
487
488static int rds_cmsg_recv(struct rds_incoming *inc, struct msghdr *msg,
489 struct rds_sock *rs)
490{
491 int ret = 0;
492
493 if (inc->i_rdma_cookie) {
494 ret = put_cmsg(msg, SOL_RDS, RDS_CMSG_RDMA_DEST,
495 sizeof(inc->i_rdma_cookie), &inc->i_rdma_cookie);
496 if (ret)
497 return ret;
498 }
499
500 if ((inc->i_rx_tstamp.tv_sec != 0) &&
501 sock_flag(rds_rs_to_sk(rs), SOCK_RCVTSTAMP)) {
502 ret = put_cmsg(msg, SOL_SOCKET, SCM_TIMESTAMP,
503 sizeof(struct timeval),
504 &inc->i_rx_tstamp);
505 if (ret)
506 return ret;
507 }
508
509 return 0;
510}
511
512int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
513 int msg_flags)
514{
515 struct sock *sk = sock->sk;
516 struct rds_sock *rs = rds_sk_to_rs(sk);
517 long timeo;
518 int ret = 0, nonblock = msg_flags & MSG_DONTWAIT;
519 DECLARE_SOCKADDR(struct sockaddr_in *, sin, msg->msg_name);
520 struct rds_incoming *inc = NULL;
521
522
523 timeo = sock_rcvtimeo(sk, nonblock);
524
525 rdsdebug("size %zu flags 0x%x timeo %ld\n", size, msg_flags, timeo);
526
527 if (msg_flags & MSG_OOB)
528 goto out;
529
530 while (1) {
531 struct iov_iter save;
532
533 if (!list_empty(&rs->rs_notify_queue)) {
534 ret = rds_notify_queue_get(rs, msg);
535 break;
536 }
537
538 if (rs->rs_cong_notify) {
539 ret = rds_notify_cong(rs, msg);
540 break;
541 }
542
543 if (!rds_next_incoming(rs, &inc)) {
544 if (nonblock) {
545 ret = -EAGAIN;
546 break;
547 }
548
549 timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
550 (!list_empty(&rs->rs_notify_queue) ||
551 rs->rs_cong_notify ||
552 rds_next_incoming(rs, &inc)), timeo);
553 rdsdebug("recvmsg woke inc %p timeo %ld\n", inc,
554 timeo);
555 if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
556 continue;
557
558 ret = timeo;
559 if (ret == 0)
560 ret = -ETIMEDOUT;
561 break;
562 }
563
564 rdsdebug("copying inc %p from %pI4:%u to user\n", inc,
565 &inc->i_conn->c_faddr,
566 ntohs(inc->i_hdr.h_sport));
567 save = msg->msg_iter;
568 ret = inc->i_conn->c_trans->inc_copy_to_user(inc, &msg->msg_iter);
569 if (ret < 0)
570 break;
571
572
573
574
575
576
577 if (!rds_still_queued(rs, inc, !(msg_flags & MSG_PEEK))) {
578 rds_inc_put(inc);
579 inc = NULL;
580 rds_stats_inc(s_recv_deliver_raced);
581 msg->msg_iter = save;
582 continue;
583 }
584
585 if (ret < be32_to_cpu(inc->i_hdr.h_len)) {
586 if (msg_flags & MSG_TRUNC)
587 ret = be32_to_cpu(inc->i_hdr.h_len);
588 msg->msg_flags |= MSG_TRUNC;
589 }
590
591 if (rds_cmsg_recv(inc, msg, rs)) {
592 ret = -EFAULT;
593 goto out;
594 }
595
596 rds_stats_inc(s_recv_delivered);
597
598 if (sin) {
599 sin->sin_family = AF_INET;
600 sin->sin_port = inc->i_hdr.h_sport;
601 sin->sin_addr.s_addr = inc->i_saddr;
602 memset(sin->sin_zero, 0, sizeof(sin->sin_zero));
603 msg->msg_namelen = sizeof(*sin);
604 }
605 break;
606 }
607
608 if (inc)
609 rds_inc_put(inc);
610
611out:
612 return ret;
613}
614
615
616
617
618
619
620void rds_clear_recv_queue(struct rds_sock *rs)
621{
622 struct sock *sk = rds_rs_to_sk(rs);
623 struct rds_incoming *inc, *tmp;
624 unsigned long flags;
625
626 write_lock_irqsave(&rs->rs_recv_lock, flags);
627 list_for_each_entry_safe(inc, tmp, &rs->rs_recv_queue, i_item) {
628 rds_recv_rcvbuf_delta(rs, sk, inc->i_conn->c_lcong,
629 -be32_to_cpu(inc->i_hdr.h_len),
630 inc->i_hdr.h_dport);
631 list_del_init(&inc->i_item);
632 rds_inc_put(inc);
633 }
634 write_unlock_irqrestore(&rs->rs_recv_lock, flags);
635}
636
637
638
639
640
641void rds_inc_info_copy(struct rds_incoming *inc,
642 struct rds_info_iterator *iter,
643 __be32 saddr, __be32 daddr, int flip)
644{
645 struct rds_info_message minfo;
646
647 minfo.seq = be64_to_cpu(inc->i_hdr.h_sequence);
648 minfo.len = be32_to_cpu(inc->i_hdr.h_len);
649
650 if (flip) {
651 minfo.laddr = daddr;
652 minfo.faddr = saddr;
653 minfo.lport = inc->i_hdr.h_dport;
654 minfo.fport = inc->i_hdr.h_sport;
655 } else {
656 minfo.laddr = saddr;
657 minfo.faddr = daddr;
658 minfo.lport = inc->i_hdr.h_sport;
659 minfo.fport = inc->i_hdr.h_dport;
660 }
661
662 minfo.flags = 0;
663
664 rds_info_copy(iter, &minfo, sizeof(minfo));
665}
666