1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24#include <linux/tcp.h>
25#include <linux/slab.h>
26#include <linux/sunrpc/xprt.h>
27#include <linux/export.h>
28#include <linux/sunrpc/bc_xprt.h>
29
30#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
31#define RPCDBG_FACILITY RPCDBG_TRANS
32#endif
33
34#define BC_MAX_SLOTS 64U
35
36unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
37{
38 return BC_MAX_SLOTS;
39}
40
41
42
43
44
45static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
46{
47 return xprt->bc_alloc_count < xprt->bc_alloc_max;
48}
49
50
51
52
53
54static void xprt_free_allocation(struct rpc_rqst *req)
55{
56 struct xdr_buf *xbufp;
57
58 dprintk("RPC: free allocations for req= %p\n", req);
59 WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
60 xbufp = &req->rq_rcv_buf;
61 free_page((unsigned long)xbufp->head[0].iov_base);
62 xbufp = &req->rq_snd_buf;
63 free_page((unsigned long)xbufp->head[0].iov_base);
64 kfree(req);
65}
66
67static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
68{
69 struct page *page;
70
71 page = alloc_page(gfp_flags);
72 if (page == NULL)
73 return -ENOMEM;
74 xdr_buf_init(buf, page_address(page), PAGE_SIZE);
75 return 0;
76}
77
78static
79struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt, gfp_t gfp_flags)
80{
81 struct rpc_rqst *req;
82
83
84 req = kzalloc(sizeof(*req), gfp_flags);
85 if (req == NULL)
86 return NULL;
87
88 req->rq_xprt = xprt;
89 INIT_LIST_HEAD(&req->rq_bc_list);
90
91
92 if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
93 printk(KERN_ERR "Failed to create bc receive xbuf\n");
94 goto out_free;
95 }
96 req->rq_rcv_buf.len = PAGE_SIZE;
97
98
99 if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) {
100 printk(KERN_ERR "Failed to create bc snd xbuf\n");
101 goto out_free;
102 }
103 return req;
104out_free:
105 xprt_free_allocation(req);
106 return NULL;
107}
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
128{
129 if (!xprt->ops->bc_setup)
130 return 0;
131 return xprt->ops->bc_setup(xprt, min_reqs);
132}
133EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
134
135int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
136{
137 struct rpc_rqst *req;
138 struct list_head tmp_list;
139 int i;
140
141 dprintk("RPC: setup backchannel transport\n");
142
143 if (min_reqs > BC_MAX_SLOTS)
144 min_reqs = BC_MAX_SLOTS;
145
146
147
148
149
150
151
152
153
154 INIT_LIST_HEAD(&tmp_list);
155 for (i = 0; i < min_reqs; i++) {
156
157 req = xprt_alloc_bc_req(xprt, GFP_KERNEL);
158 if (req == NULL) {
159 printk(KERN_ERR "Failed to create bc rpc_rqst\n");
160 goto out_free;
161 }
162
163
164 dprintk("RPC: adding req= %p\n", req);
165 list_add(&req->rq_bc_pa_list, &tmp_list);
166 }
167
168
169
170
171 spin_lock(&xprt->bc_pa_lock);
172 list_splice(&tmp_list, &xprt->bc_pa_list);
173 xprt->bc_alloc_count += min_reqs;
174 xprt->bc_alloc_max += min_reqs;
175 atomic_add(min_reqs, &xprt->bc_slot_count);
176 spin_unlock(&xprt->bc_pa_lock);
177
178 dprintk("RPC: setup backchannel transport done\n");
179 return 0;
180
181out_free:
182
183
184
185 while (!list_empty(&tmp_list)) {
186 req = list_first_entry(&tmp_list,
187 struct rpc_rqst,
188 rq_bc_pa_list);
189 list_del(&req->rq_bc_pa_list);
190 xprt_free_allocation(req);
191 }
192
193 dprintk("RPC: setup backchannel transport failed\n");
194 return -ENOMEM;
195}
196
197
198
199
200
201
202
203
204
205
206void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
207{
208 if (xprt->ops->bc_destroy)
209 xprt->ops->bc_destroy(xprt, max_reqs);
210}
211EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
212
213void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
214{
215 struct rpc_rqst *req = NULL, *tmp = NULL;
216
217 dprintk("RPC: destroy backchannel transport\n");
218
219 if (max_reqs == 0)
220 goto out;
221
222 spin_lock_bh(&xprt->bc_pa_lock);
223 xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max);
224 list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
225 dprintk("RPC: req=%p\n", req);
226 list_del(&req->rq_bc_pa_list);
227 xprt_free_allocation(req);
228 xprt->bc_alloc_count--;
229 atomic_dec(&xprt->bc_slot_count);
230 if (--max_reqs == 0)
231 break;
232 }
233 spin_unlock_bh(&xprt->bc_pa_lock);
234
235out:
236 dprintk("RPC: backchannel list empty= %s\n",
237 list_empty(&xprt->bc_pa_list) ? "true" : "false");
238}
239
240static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
241 struct rpc_rqst *new)
242{
243 struct rpc_rqst *req = NULL;
244
245 dprintk("RPC: allocate a backchannel request\n");
246 if (list_empty(&xprt->bc_pa_list)) {
247 if (!new)
248 goto not_found;
249 if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
250 goto not_found;
251 list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
252 xprt->bc_alloc_count++;
253 atomic_inc(&xprt->bc_slot_count);
254 }
255 req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
256 rq_bc_pa_list);
257 req->rq_reply_bytes_recvd = 0;
258 memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
259 sizeof(req->rq_private_buf));
260 req->rq_xid = xid;
261 req->rq_connect_cookie = xprt->connect_cookie;
262 dprintk("RPC: backchannel req=%p\n", req);
263not_found:
264 return req;
265}
266
267
268
269
270
271void xprt_free_bc_request(struct rpc_rqst *req)
272{
273 struct rpc_xprt *xprt = req->rq_xprt;
274
275 xprt->ops->bc_free_rqst(req);
276}
277
278void xprt_free_bc_rqst(struct rpc_rqst *req)
279{
280 struct rpc_xprt *xprt = req->rq_xprt;
281
282 dprintk("RPC: free backchannel req=%p\n", req);
283
284 req->rq_connect_cookie = xprt->connect_cookie - 1;
285 smp_mb__before_atomic();
286 clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
287 smp_mb__after_atomic();
288
289
290
291
292
293 spin_lock_bh(&xprt->bc_pa_lock);
294 if (xprt_need_to_requeue(xprt)) {
295 list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
296 xprt->bc_alloc_count++;
297 atomic_inc(&xprt->bc_slot_count);
298 req = NULL;
299 }
300 spin_unlock_bh(&xprt->bc_pa_lock);
301 if (req != NULL) {
302
303
304
305
306
307
308 dprintk("RPC: Last session removed req=%p\n", req);
309 xprt_free_allocation(req);
310 }
311 xprt_put(xprt);
312}
313
314
315
316
317
318
319
320
321
322
323
324
325struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
326{
327 struct rpc_rqst *req, *new = NULL;
328
329 do {
330 spin_lock(&xprt->bc_pa_lock);
331 list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
332 if (req->rq_connect_cookie != xprt->connect_cookie)
333 continue;
334 if (req->rq_xid == xid)
335 goto found;
336 }
337 req = xprt_get_bc_request(xprt, xid, new);
338found:
339 spin_unlock(&xprt->bc_pa_lock);
340 if (new) {
341 if (req != new)
342 xprt_free_allocation(new);
343 break;
344 } else if (req)
345 break;
346 new = xprt_alloc_bc_req(xprt, GFP_KERNEL);
347 } while (new);
348 return req;
349}
350
351
352
353
354
355
356
357void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
358{
359 struct rpc_xprt *xprt = req->rq_xprt;
360 struct svc_serv *bc_serv = xprt->bc_serv;
361
362 spin_lock(&xprt->bc_pa_lock);
363 list_del(&req->rq_bc_pa_list);
364 xprt->bc_alloc_count--;
365 spin_unlock(&xprt->bc_pa_lock);
366
367 req->rq_private_buf.len = copied;
368 set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
369
370 dprintk("RPC: add callback request to list\n");
371 xprt_get(xprt);
372 spin_lock(&bc_serv->sv_cb_lock);
373 list_add(&req->rq_bc_list, &bc_serv->sv_cb_list);
374 wake_up(&bc_serv->sv_cb_waitq);
375 spin_unlock(&bc_serv->sv_cb_lock);
376}
377