1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33#include <linux/kernel.h>
34#include <linux/random.h>
35#include <linux/export.h>
36
37#include "rds.h"
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71struct workqueue_struct *rds_wq;
72EXPORT_SYMBOL_GPL(rds_wq);
73
74void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
75{
76 if (!rds_conn_path_transition(cp, curr, RDS_CONN_UP)) {
77 printk(KERN_WARNING "%s: Cannot transition to state UP, "
78 "current state is %d\n",
79 __func__,
80 atomic_read(&cp->cp_state));
81 rds_conn_path_drop(cp, false);
82 return;
83 }
84
85 rdsdebug("conn %p for %pI6c to %pI6c complete\n",
86 cp->cp_conn, &cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr);
87
88 cp->cp_reconnect_jiffies = 0;
89 set_bit(0, &cp->cp_conn->c_map_queued);
90 rcu_read_lock();
91 if (!rds_destroy_pending(cp->cp_conn)) {
92 queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
93 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
94 }
95 rcu_read_unlock();
96}
97EXPORT_SYMBOL_GPL(rds_connect_path_complete);
98
99void rds_connect_complete(struct rds_connection *conn)
100{
101 rds_connect_path_complete(&conn->c_path[0], RDS_CONN_CONNECTING);
102}
103EXPORT_SYMBOL_GPL(rds_connect_complete);
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123void rds_queue_reconnect(struct rds_conn_path *cp)
124{
125 unsigned long rand;
126 struct rds_connection *conn = cp->cp_conn;
127
128 rdsdebug("conn %p for %pI6c to %pI6c reconnect jiffies %lu\n",
129 conn, &conn->c_laddr, &conn->c_faddr,
130 cp->cp_reconnect_jiffies);
131
132
133 if (conn->c_trans->t_type == RDS_TRANS_TCP &&
134 rds_addr_cmp(&conn->c_laddr, &conn->c_faddr) >= 0)
135 return;
136
137 set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
138 if (cp->cp_reconnect_jiffies == 0) {
139 cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
140 rcu_read_lock();
141 if (!rds_destroy_pending(cp->cp_conn))
142 queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
143 rcu_read_unlock();
144 return;
145 }
146
147 get_random_bytes(&rand, sizeof(rand));
148 rdsdebug("%lu delay %lu ceil conn %p for %pI6c -> %pI6c\n",
149 rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies,
150 conn, &conn->c_laddr, &conn->c_faddr);
151 rcu_read_lock();
152 if (!rds_destroy_pending(cp->cp_conn))
153 queue_delayed_work(rds_wq, &cp->cp_conn_w,
154 rand % cp->cp_reconnect_jiffies);
155 rcu_read_unlock();
156
157 cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2,
158 rds_sysctl_reconnect_max_jiffies);
159}
160
161void rds_connect_worker(struct work_struct *work)
162{
163 struct rds_conn_path *cp = container_of(work,
164 struct rds_conn_path,
165 cp_conn_w.work);
166 struct rds_connection *conn = cp->cp_conn;
167 int ret;
168
169 if (cp->cp_index > 0 &&
170 rds_addr_cmp(&cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr) >= 0)
171 return;
172 clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
173 ret = rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING);
174 if (ret) {
175 ret = conn->c_trans->conn_path_connect(cp);
176 rdsdebug("conn %p for %pI6c to %pI6c dispatched, ret %d\n",
177 conn, &conn->c_laddr, &conn->c_faddr, ret);
178
179 if (ret) {
180 if (rds_conn_path_transition(cp,
181 RDS_CONN_CONNECTING,
182 RDS_CONN_DOWN))
183 rds_queue_reconnect(cp);
184 else
185 rds_conn_path_error(cp, "connect failed\n");
186 }
187 }
188}
189
190void rds_send_worker(struct work_struct *work)
191{
192 struct rds_conn_path *cp = container_of(work,
193 struct rds_conn_path,
194 cp_send_w.work);
195 int ret;
196
197 if (rds_conn_path_state(cp) == RDS_CONN_UP) {
198 clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
199 ret = rds_send_xmit(cp);
200 cond_resched();
201 rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
202 switch (ret) {
203 case -EAGAIN:
204 rds_stats_inc(s_send_immediate_retry);
205 queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
206 break;
207 case -ENOMEM:
208 rds_stats_inc(s_send_delayed_retry);
209 queue_delayed_work(rds_wq, &cp->cp_send_w, 2);
210 default:
211 break;
212 }
213 }
214}
215
216void rds_recv_worker(struct work_struct *work)
217{
218 struct rds_conn_path *cp = container_of(work,
219 struct rds_conn_path,
220 cp_recv_w.work);
221 int ret;
222
223 if (rds_conn_path_state(cp) == RDS_CONN_UP) {
224 ret = cp->cp_conn->c_trans->recv_path(cp);
225 rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
226 switch (ret) {
227 case -EAGAIN:
228 rds_stats_inc(s_recv_immediate_retry);
229 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
230 break;
231 case -ENOMEM:
232 rds_stats_inc(s_recv_delayed_retry);
233 queue_delayed_work(rds_wq, &cp->cp_recv_w, 2);
234 default:
235 break;
236 }
237 }
238}
239
240void rds_shutdown_worker(struct work_struct *work)
241{
242 struct rds_conn_path *cp = container_of(work,
243 struct rds_conn_path,
244 cp_down_w);
245
246 rds_conn_shutdown(cp);
247}
248
249void rds_threads_exit(void)
250{
251 destroy_workqueue(rds_wq);
252}
253
254int rds_threads_init(void)
255{
256 rds_wq = create_singlethread_workqueue("krdsd");
257 if (!rds_wq)
258 return -ENOMEM;
259
260 return 0;
261}
262
263
264
265
266int rds_addr_cmp(const struct in6_addr *addr1,
267 const struct in6_addr *addr2)
268{
269#if defined(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS) && BITS_PER_LONG == 64
270 const __be64 *a1, *a2;
271 u64 x, y;
272
273 a1 = (__be64 *)addr1;
274 a2 = (__be64 *)addr2;
275
276 if (*a1 != *a2) {
277 if (be64_to_cpu(*a1) < be64_to_cpu(*a2))
278 return -1;
279 else
280 return 1;
281 } else {
282 x = be64_to_cpu(*++a1);
283 y = be64_to_cpu(*++a2);
284 if (x < y)
285 return -1;
286 else if (x > y)
287 return 1;
288 else
289 return 0;
290 }
291#else
292 u32 a, b;
293 int i;
294
295 for (i = 0; i < 4; i++) {
296 if (addr1->s6_addr32[i] != addr2->s6_addr32[i]) {
297 a = ntohl(addr1->s6_addr32[i]);
298 b = ntohl(addr2->s6_addr32[i]);
299 if (a < b)
300 return -1;
301 else if (a > b)
302 return 1;
303 }
304 }
305 return 0;
306#endif
307}
308EXPORT_SYMBOL_GPL(rds_addr_cmp);
309