1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33#include <linux/kernel.h>
34#include <linux/random.h>
35#include <linux/export.h>
36
37#include "rds.h"
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71struct workqueue_struct *rds_wq;
72EXPORT_SYMBOL_GPL(rds_wq);
73
74void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
75{
76 if (!rds_conn_path_transition(cp, curr, RDS_CONN_UP)) {
77 printk(KERN_WARNING "%s: Cannot transition to state UP, "
78 "current state is %d\n",
79 __func__,
80 atomic_read(&cp->cp_state));
81 rds_conn_path_drop(cp);
82 return;
83 }
84
85 rdsdebug("conn %p for %pI4 to %pI4 complete\n",
86 cp->cp_conn, &cp->cp_conn->c_laddr, &cp->cp_conn->c_faddr);
87
88 cp->cp_reconnect_jiffies = 0;
89 set_bit(0, &cp->cp_conn->c_map_queued);
90 queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
91 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
92}
93EXPORT_SYMBOL_GPL(rds_connect_path_complete);
94
95void rds_connect_complete(struct rds_connection *conn)
96{
97 rds_connect_path_complete(&conn->c_path[0], RDS_CONN_CONNECTING);
98}
99EXPORT_SYMBOL_GPL(rds_connect_complete);
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119void rds_queue_reconnect(struct rds_conn_path *cp)
120{
121 unsigned long rand;
122 struct rds_connection *conn = cp->cp_conn;
123
124 rdsdebug("conn %p for %pI4 to %pI4 reconnect jiffies %lu\n",
125 conn, &conn->c_laddr, &conn->c_faddr,
126 cp->cp_reconnect_jiffies);
127
128
129 if (conn->c_trans->t_type == RDS_TRANS_TCP &&
130 !IS_CANONICAL(conn->c_laddr, conn->c_faddr))
131 return;
132
133 set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
134 if (cp->cp_reconnect_jiffies == 0) {
135 cp->cp_reconnect_jiffies = rds_sysctl_reconnect_min_jiffies;
136 queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
137 return;
138 }
139
140 get_random_bytes(&rand, sizeof(rand));
141 rdsdebug("%lu delay %lu ceil conn %p for %pI4 -> %pI4\n",
142 rand % cp->cp_reconnect_jiffies, cp->cp_reconnect_jiffies,
143 conn, &conn->c_laddr, &conn->c_faddr);
144 queue_delayed_work(rds_wq, &cp->cp_conn_w,
145 rand % cp->cp_reconnect_jiffies);
146
147 cp->cp_reconnect_jiffies = min(cp->cp_reconnect_jiffies * 2,
148 rds_sysctl_reconnect_max_jiffies);
149}
150
151void rds_connect_worker(struct work_struct *work)
152{
153 struct rds_conn_path *cp = container_of(work,
154 struct rds_conn_path,
155 cp_conn_w.work);
156 struct rds_connection *conn = cp->cp_conn;
157 int ret;
158
159 if (cp->cp_index > 0 &&
160 !IS_CANONICAL(cp->cp_conn->c_laddr, cp->cp_conn->c_faddr))
161 return;
162 clear_bit(RDS_RECONNECT_PENDING, &cp->cp_flags);
163 ret = rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_CONNECTING);
164 if (ret) {
165 ret = conn->c_trans->conn_path_connect(cp);
166 rdsdebug("conn %p for %pI4 to %pI4 dispatched, ret %d\n",
167 conn, &conn->c_laddr, &conn->c_faddr, ret);
168
169 if (ret) {
170 if (rds_conn_path_transition(cp,
171 RDS_CONN_CONNECTING,
172 RDS_CONN_DOWN))
173 rds_queue_reconnect(cp);
174 else
175 rds_conn_path_error(cp, "connect failed\n");
176 }
177 }
178}
179
180void rds_send_worker(struct work_struct *work)
181{
182 struct rds_conn_path *cp = container_of(work,
183 struct rds_conn_path,
184 cp_send_w.work);
185 int ret;
186
187 if (rds_conn_path_state(cp) == RDS_CONN_UP) {
188 clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
189 ret = rds_send_xmit(cp);
190 cond_resched();
191 rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
192 switch (ret) {
193 case -EAGAIN:
194 rds_stats_inc(s_send_immediate_retry);
195 queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
196 break;
197 case -ENOMEM:
198 rds_stats_inc(s_send_delayed_retry);
199 queue_delayed_work(rds_wq, &cp->cp_send_w, 2);
200 default:
201 break;
202 }
203 }
204}
205
206void rds_recv_worker(struct work_struct *work)
207{
208 struct rds_conn_path *cp = container_of(work,
209 struct rds_conn_path,
210 cp_recv_w.work);
211 int ret;
212
213 if (rds_conn_path_state(cp) == RDS_CONN_UP) {
214 ret = cp->cp_conn->c_trans->recv_path(cp);
215 rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
216 switch (ret) {
217 case -EAGAIN:
218 rds_stats_inc(s_recv_immediate_retry);
219 queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
220 break;
221 case -ENOMEM:
222 rds_stats_inc(s_recv_delayed_retry);
223 queue_delayed_work(rds_wq, &cp->cp_recv_w, 2);
224 default:
225 break;
226 }
227 }
228}
229
230void rds_shutdown_worker(struct work_struct *work)
231{
232 struct rds_conn_path *cp = container_of(work,
233 struct rds_conn_path,
234 cp_down_w);
235
236 rds_conn_shutdown(cp);
237}
238
239void rds_threads_exit(void)
240{
241 destroy_workqueue(rds_wq);
242}
243
244int rds_threads_init(void)
245{
246 rds_wq = create_singlethread_workqueue("krdsd");
247 if (!rds_wq)
248 return -ENOMEM;
249
250 return 0;
251}
252