1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33#include <linux/kernel.h>
34#include <linux/random.h>
35#include <linux/export.h>
36
37#include "rds.h"
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71struct workqueue_struct *rds_wq;
72EXPORT_SYMBOL_GPL(rds_wq);
73
74void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
75{
76 if (!rds_conn_path_transition(cp, curr, RDS_CONN_UP)) {
77 printk(KERN_WARNING "%s: Cannot transition to state UP, "
78 "current state is %d\n",
79 __func__,
80 atomic_read(&cp->cp_state));
81 rds_conn_path_drop(cp, false);
82 return;
83 }
84
85 rdsdebug("conn %p for %pI6c to %pI6c complete\n",
86 cp->cp_conn, &cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr);
87
88 cp->cp_reconnect_jiffies = 0;
89 set_bit(0, &cp->cp_conn->c_map_queued);
90 rcu_read_lock();
91 if (!rds_destroy_pending(cp->cp_conn)) {
92 queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
93 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
94 }
95 rcu_read_unlock();
96 cp->cp_conn->c_proposed_version = RDS_PROTOCOL_VERSION;
97}
98EXPORT_SYMBOL_GPL(rds_connect_path_complete);
99
100void rds_connect_complete(struct rds_connection *conn)
101{
102 rds_connect_path_complete(&conn->c_path[0], RDS_CONN_CONNECTING);
103}
104EXPORT_SYMBOL_GPL(rds_connect_complete);
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124void rds_queue_reconnect(struct rds_conn_path *cp)
125{
126 unsigned long rand;
127 struct rds_connection *conn = cp->cp_conn;
128
129 rdsdebug("conn %p for %pI6c to %pI6c reconnect jiffies %lu\n",
130 conn, &conn->c_laddr, &conn->c_faddr,
131 cp->cp_reconnect_jiffies);
132
133
134 if (conn->c_trans->t_type == RDS_TRANS_TCP &&
135 rds_addr_cmp(&conn->c_laddr, &conn->c_faddr) >= 0)
136 return;
137
138 set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
139 if (cp->cp_reconnect_jiffies == 0) {
140 cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
141 rcu_read_lock();
142 if (!rds_destroy_pending(cp->cp_conn))
143 queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
144 rcu_read_unlock();
145 return;
146 }
147
148 get_random_bytes(&rand, sizeof(rand));
149 rdsdebug("%lu delay %lu ceil conn %p for %pI6c -> %pI6c\n",
150 rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies,
151 conn, &conn->c_laddr, &conn->c_faddr);
152 rcu_read_lock();
153 if (!rds_destroy_pending(cp->cp_conn))
154 queue_delayed_work(rds_wq, &cp->cp_conn_w,
155 rand % cp->cp_reconnect_jiffies);
156 rcu_read_unlock();
157
158 cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2,
159 rds_sysctl_reconnect_max_jiffies);
160}
161
162void rds_connect_worker(struct work_struct *work)
163{
164 struct rds_conn_path *cp = container_of(work,
165 struct rds_conn_path,
166 cp_conn_w.work);
167 struct rds_connection *conn = cp->cp_conn;
168 int ret;
169
170 if (cp->cp_index > 0 &&
171 rds_addr_cmp(&cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr) >= 0)
172 return;
173 clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
174 ret = rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING);
175 if (ret) {
176 ret = conn->c_trans->conn_path_connect(cp);
177 rdsdebug("conn %p for %pI6c to %pI6c dispatched, ret %d\n",
178 conn, &conn->c_laddr, &conn->c_faddr, ret);
179
180 if (ret) {
181 if (rds_conn_path_transition(cp,
182 RDS_CONN_CONNECTING,
183 RDS_CONN_DOWN))
184 rds_queue_reconnect(cp);
185 else
186 rds_conn_path_error(cp, "connect failed\n");
187 }
188 }
189}
190
191void rds_send_worker(struct work_struct *work)
192{
193 struct rds_conn_path *cp = container_of(work,
194 struct rds_conn_path,
195 cp_send_w.work);
196 int ret;
197
198 if (rds_conn_path_state(cp) == RDS_CONN_UP) {
199 clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
200 ret = rds_send_xmit(cp);
201 cond_resched();
202 rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
203 switch (ret) {
204 case -EAGAIN:
205 rds_stats_inc(s_send_immediate_retry);
206 queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
207 break;
208 case -ENOMEM:
209 rds_stats_inc(s_send_delayed_retry);
210 queue_delayed_work(rds_wq, &cp->cp_send_w, 2);
211 break;
212 default:
213 break;
214 }
215 }
216}
217
218void rds_recv_worker(struct work_struct *work)
219{
220 struct rds_conn_path *cp = container_of(work,
221 struct rds_conn_path,
222 cp_recv_w.work);
223 int ret;
224
225 if (rds_conn_path_state(cp) == RDS_CONN_UP) {
226 ret = cp->cp_conn->c_trans->recv_path(cp);
227 rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
228 switch (ret) {
229 case -EAGAIN:
230 rds_stats_inc(s_recv_immediate_retry);
231 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
232 break;
233 case -ENOMEM:
234 rds_stats_inc(s_recv_delayed_retry);
235 queue_delayed_work(rds_wq, &cp->cp_recv_w, 2);
236 break;
237 default:
238 break;
239 }
240 }
241}
242
243void rds_shutdown_worker(struct work_struct *work)
244{
245 struct rds_conn_path *cp = container_of(work,
246 struct rds_conn_path,
247 cp_down_w);
248
249 rds_conn_shutdown(cp);
250}
251
252void rds_threads_exit(void)
253{
254 destroy_workqueue(rds_wq);
255}
256
257int rds_threads_init(void)
258{
259 rds_wq = create_singlethread_workqueue("krdsd");
260 if (!rds_wq)
261 return -ENOMEM;
262
263 return 0;
264}
265
266
267
268
269int rds_addr_cmp(const struct in6_addr *addr1,
270 const struct in6_addr *addr2)
271{
272#if defined(CONFIG_HAVE_EFFICIENT_UNALIGNED_ACCESS) && BITS_PER_LONG == 64
273 const __be64 *a1, *a2;
274 u64 x, y;
275
276 a1 = (__be64 *)addr1;
277 a2 = (__be64 *)addr2;
278
279 if (*a1 != *a2) {
280 if (be64_to_cpu(*a1) < be64_to_cpu(*a2))
281 return -1;
282 else
283 return 1;
284 } else {
285 x = be64_to_cpu(*++a1);
286 y = be64_to_cpu(*++a2);
287 if (x < y)
288 return -1;
289 else if (x > y)
290 return 1;
291 else
292 return 0;
293 }
294#else
295 u32 a, b;
296 int i;
297
298 for (i = 0; i < 4; i++) {
299 if (addr1->s6_addr32[i] != addr2->s6_addr32[i]) {
300 a = ntohl(addr1->s6_addr32[i]);
301 b = ntohl(addr2->s6_addr32[i]);
302 if (a < b)
303 return -1;
304 else if (a > b)
305 return 1;
306 }
307 }
308 return 0;
309#endif
310}
311EXPORT_SYMBOL_GPL(rds_addr_cmp);
312