1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33#include <linux/kernel.h>
34#include <linux/random.h>
35#include <linux/export.h>
36
37#include "rds.h"
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71struct workqueue_struct *rds_wq;
72EXPORT_SYMBOL_GPL(rds_wq);
73
74void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
75{
76 if (!rds_conn_path_transition(cp, curr, RDS_CONN_UP)) {
77 printk(KERN_WARNING "%s: Cannot transition to state UP, "
78 "current state is %d\n",
79 __func__,
80 atomic_read(&cp->cp_state));
81 rds_conn_path_drop(cp);
82 return;
83 }
84
85 rdsdebug("conn %p for %pI4 to %pI4 complete\n",
86 cp->cp_conn, &cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr);
87
88 cp->cp_reconnect_jiffies = 0;
89 set_bit(0, &cp->cp_conn->c_map_queued);
90 queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
91 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
92}
93EXPORT_SYMBOL_GPL(rds_connect_path_complete);
94
95void rds_connect_complete(struct rds_connection *conn)
96{
97 rds_connect_path_complete(&conn->c_path[0], RDS_CONN_CONNECTING);
98}
99EXPORT_SYMBOL_GPL(rds_connect_complete);
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119void rds_queue_reconnect(struct rds_conn_path *cp)
120{
121 unsigned long rand;
122 struct rds_connection *conn = cp->cp_conn;
123
124 rdsdebug("conn %p for %pI4 to %pI4 reconnect jiffies %lu\n",
125 conn, &conn->c_laddr, &conn->c_faddr,
126 cp->cp_reconnect_jiffies);
127
128
129 if (conn->c_trans->t_type == RDS_TRANS_TCP &&
130 conn->c_laddr > conn->c_faddr)
131 return;
132
133 set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
134 if (cp->cp_reconnect_jiffies == 0) {
135 cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
136 queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
137 return;
138 }
139
140 get_random_bytes(&rand, sizeof(rand));
141 rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n",
142 rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies,
143 conn, &conn->c_laddr, &conn->c_faddr);
144 queue_delayed_work(rds_wq, &cp->cp_conn_w,
145 rand % cp->cp_reconnect_jiffies);
146
147 cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2,
148 rds_sysctl_reconnect_max_jiffies);
149}
150
151void rds_connect_worker(struct work_struct *work)
152{
153 struct rds_conn_path *cp = container_of(work,
154 struct rds_conn_path,
155 cp_conn_w.work);
156 struct rds_connection *conn = cp->cp_conn;
157 int ret;
158
159 if (cp->cp_index > 1 && cp->cp_conn->c_laddr > cp->cp_conn->c_faddr)
160 return;
161 clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
162 ret = rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING);
163 if (ret) {
164 ret = conn->c_trans->conn_path_connect(cp);
165 rdsdebug("conn %p for %pI4 to %pI4 dispatched, ret %d\n",
166 conn, &conn->c_laddr, &conn->c_faddr, ret);
167
168 if (ret) {
169 if (rds_conn_path_transition(cp,
170 RDS_CONN_CONNECTING,
171 RDS_CONN_DOWN))
172 rds_queue_reconnect(cp);
173 else
174 rds_conn_path_error(cp, "connect failed\n");
175 }
176 }
177}
178
179void rds_send_worker(struct work_struct *work)
180{
181 struct rds_conn_path *cp = container_of(work,
182 struct rds_conn_path,
183 cp_send_w.work);
184 int ret;
185
186 if (rds_conn_path_state(cp) == RDS_CONN_UP) {
187 clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
188 ret = rds_send_xmit(cp);
189 cond_resched();
190 rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
191 switch (ret) {
192 case -EAGAIN:
193 rds_stats_inc(s_send_immediate_retry);
194 queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
195 break;
196 case -ENOMEM:
197 rds_stats_inc(s_send_delayed_retry);
198 queue_delayed_work(rds_wq, &cp->cp_send_w, 2);
199 default:
200 break;
201 }
202 }
203}
204
205void rds_recv_worker(struct work_struct *work)
206{
207 struct rds_conn_path *cp = container_of(work,
208 struct rds_conn_path,
209 cp_recv_w.work);
210 int ret;
211
212 if (rds_conn_path_state(cp) == RDS_CONN_UP) {
213 ret = cp->cp_conn->c_trans->recv_path(cp);
214 rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
215 switch (ret) {
216 case -EAGAIN:
217 rds_stats_inc(s_recv_immediate_retry);
218 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
219 break;
220 case -ENOMEM:
221 rds_stats_inc(s_recv_delayed_retry);
222 queue_delayed_work(rds_wq, &cp->cp_recv_w, 2);
223 default:
224 break;
225 }
226 }
227}
228
229void rds_shutdown_worker(struct work_struct *work)
230{
231 struct rds_conn_path *cp = container_of(work,
232 struct rds_conn_path,
233 cp_down_w);
234
235 rds_conn_shutdown(cp);
236}
237
238void rds_threads_exit(void)
239{
240 destroy_workqueue(rds_wq);
241}
242
243int rds_threads_init(void)
244{
245 rds_wq = create_singlethread_workqueue("krdsd");
246 if (!rds_wq)
247 return -ENOMEM;
248
249 return 0;
250}
251