1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40#include <linux/module.h>
41
42#include <linux/types.h>
43#include <linux/interrupt.h>
44#include <linux/workqueue.h>
45#include <linux/net.h>
46
47#include <linux/sunrpc/clnt.h>
48#include <linux/sunrpc/metrics.h>
49
50#include "sunrpc.h"
51
52
53
54
55
56#ifdef RPC_DEBUG
57# define RPCDBG_FACILITY RPCDBG_XPRT
58#endif
59
60
61
62
63static void xprt_request_init(struct rpc_task *, struct rpc_xprt *);
64static inline void do_xprt_reserve(struct rpc_task *);
65static void xprt_connect_status(struct rpc_task *task);
66static int __xprt_get_cong(struct rpc_xprt *, struct rpc_task *);
67
68static DEFINE_SPINLOCK(xprt_list_lock);
69static LIST_HEAD(xprt_list);
70
71
72
73
74
75
76
77
78
79
80
81
82#define RPC_CWNDSHIFT (8U)
83#define RPC_CWNDSCALE (1U << RPC_CWNDSHIFT)
84#define RPC_INITCWND RPC_CWNDSCALE
85#define RPC_MAXCWND(xprt) ((xprt)->max_reqs << RPC_CWNDSHIFT)
86
87#define RPCXPRT_CONGESTED(xprt) ((xprt)->cong >= (xprt)->cwnd)
88
89
90
91
92
93
94
95
96
97
98
99
100
101int xprt_register_transport(struct xprt_class *transport)
102{
103 struct xprt_class *t;
104 int result;
105
106 result = -EEXIST;
107 spin_lock(&xprt_list_lock);
108 list_for_each_entry(t, &xprt_list, list) {
109
110 if (t->ident == transport->ident)
111 goto out;
112 }
113
114 list_add_tail(&transport->list, &xprt_list);
115 printk(KERN_INFO "RPC: Registered %s transport module.\n",
116 transport->name);
117 result = 0;
118
119out:
120 spin_unlock(&xprt_list_lock);
121 return result;
122}
123EXPORT_SYMBOL_GPL(xprt_register_transport);
124
125
126
127
128
129
130
131
132
133int xprt_unregister_transport(struct xprt_class *transport)
134{
135 struct xprt_class *t;
136 int result;
137
138 result = 0;
139 spin_lock(&xprt_list_lock);
140 list_for_each_entry(t, &xprt_list, list) {
141 if (t == transport) {
142 printk(KERN_INFO
143 "RPC: Unregistered %s transport module.\n",
144 transport->name);
145 list_del_init(&transport->list);
146 goto out;
147 }
148 }
149 result = -ENOENT;
150
151out:
152 spin_unlock(&xprt_list_lock);
153 return result;
154}
155EXPORT_SYMBOL_GPL(xprt_unregister_transport);
156
157
158
159
160
161
162
163
164
165int xprt_load_transport(const char *transport_name)
166{
167 struct xprt_class *t;
168 char module_name[sizeof t->name + 5];
169 int result;
170
171 result = 0;
172 spin_lock(&xprt_list_lock);
173 list_for_each_entry(t, &xprt_list, list) {
174 if (strcmp(t->name, transport_name) == 0) {
175 spin_unlock(&xprt_list_lock);
176 goto out;
177 }
178 }
179 spin_unlock(&xprt_list_lock);
180 strcpy(module_name, "xprt");
181 strncat(module_name, transport_name, sizeof t->name);
182 result = request_module(module_name);
183out:
184 return result;
185}
186EXPORT_SYMBOL_GPL(xprt_load_transport);
187
188
189
190
191
192
193
194
195
196int xprt_reserve_xprt(struct rpc_task *task)
197{
198 struct rpc_rqst *req = task->tk_rqstp;
199 struct rpc_xprt *xprt = req->rq_xprt;
200
201 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
202 if (task == xprt->snd_task)
203 return 1;
204 if (task == NULL)
205 return 0;
206 goto out_sleep;
207 }
208 xprt->snd_task = task;
209 if (req) {
210 req->rq_bytes_sent = 0;
211 req->rq_ntrans++;
212 }
213 return 1;
214
215out_sleep:
216 dprintk("RPC: %5u failed to lock transport %p\n",
217 task->tk_pid, xprt);
218 task->tk_timeout = 0;
219 task->tk_status = -EAGAIN;
220 if (req && req->rq_ntrans)
221 rpc_sleep_on(&xprt->resend, task, NULL);
222 else
223 rpc_sleep_on(&xprt->sending, task, NULL);
224 return 0;
225}
226EXPORT_SYMBOL_GPL(xprt_reserve_xprt);
227
228static void xprt_clear_locked(struct rpc_xprt *xprt)
229{
230 xprt->snd_task = NULL;
231 if (!test_bit(XPRT_CLOSE_WAIT, &xprt->state) || xprt->shutdown) {
232 smp_mb__before_clear_bit();
233 clear_bit(XPRT_LOCKED, &xprt->state);
234 smp_mb__after_clear_bit();
235 } else
236 queue_work(rpciod_workqueue, &xprt->task_cleanup);
237}
238
239
240
241
242
243
244
245
246
247int xprt_reserve_xprt_cong(struct rpc_task *task)
248{
249 struct rpc_xprt *xprt = task->tk_xprt;
250 struct rpc_rqst *req = task->tk_rqstp;
251
252 if (test_and_set_bit(XPRT_LOCKED, &xprt->state)) {
253 if (task == xprt->snd_task)
254 return 1;
255 goto out_sleep;
256 }
257 if (__xprt_get_cong(xprt, task)) {
258 xprt->snd_task = task;
259 if (req) {
260 req->rq_bytes_sent = 0;
261 req->rq_ntrans++;
262 }
263 return 1;
264 }
265 xprt_clear_locked(xprt);
266out_sleep:
267 dprintk("RPC: %5u failed to lock transport %p\n", task->tk_pid, xprt);
268 task->tk_timeout = 0;
269 task->tk_status = -EAGAIN;
270 if (req && req->rq_ntrans)
271 rpc_sleep_on(&xprt->resend, task, NULL);
272 else
273 rpc_sleep_on(&xprt->sending, task, NULL);
274 return 0;
275}
276EXPORT_SYMBOL_GPL(xprt_reserve_xprt_cong);
277
278static inline int xprt_lock_write(struct rpc_xprt *xprt, struct rpc_task *task)
279{
280 int retval;
281
282 spin_lock_bh(&xprt->transport_lock);
283 retval = xprt->ops->reserve_xprt(task);
284 spin_unlock_bh(&xprt->transport_lock);
285 return retval;
286}
287
288static void __xprt_lock_write_next(struct rpc_xprt *xprt)
289{
290 struct rpc_task *task;
291 struct rpc_rqst *req;
292
293 if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
294 return;
295
296 task = rpc_wake_up_next(&xprt->resend);
297 if (!task) {
298 task = rpc_wake_up_next(&xprt->sending);
299 if (!task)
300 goto out_unlock;
301 }
302
303 req = task->tk_rqstp;
304 xprt->snd_task = task;
305 if (req) {
306 req->rq_bytes_sent = 0;
307 req->rq_ntrans++;
308 }
309 return;
310
311out_unlock:
312 xprt_clear_locked(xprt);
313}
314
315static void __xprt_lock_write_next_cong(struct rpc_xprt *xprt)
316{
317 struct rpc_task *task;
318
319 if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
320 return;
321 if (RPCXPRT_CONGESTED(xprt))
322 goto out_unlock;
323 task = rpc_wake_up_next(&xprt->resend);
324 if (!task) {
325 task = rpc_wake_up_next(&xprt->sending);
326 if (!task)
327 goto out_unlock;
328 }
329 if (__xprt_get_cong(xprt, task)) {
330 struct rpc_rqst *req = task->tk_rqstp;
331 xprt->snd_task = task;
332 if (req) {
333 req->rq_bytes_sent = 0;
334 req->rq_ntrans++;
335 }
336 return;
337 }
338out_unlock:
339 xprt_clear_locked(xprt);
340}
341
342
343
344
345
346
347
348
349void xprt_release_xprt(struct rpc_xprt *xprt, struct rpc_task *task)
350{
351 if (xprt->snd_task == task) {
352 xprt_clear_locked(xprt);
353 __xprt_lock_write_next(xprt);
354 }
355}
356EXPORT_SYMBOL_GPL(xprt_release_xprt);
357
358
359
360
361
362
363
364
365
366void xprt_release_xprt_cong(struct rpc_xprt *xprt, struct rpc_task *task)
367{
368 if (xprt->snd_task == task) {
369 xprt_clear_locked(xprt);
370 __xprt_lock_write_next_cong(xprt);
371 }
372}
373EXPORT_SYMBOL_GPL(xprt_release_xprt_cong);
374
375static inline void xprt_release_write(struct rpc_xprt *xprt, struct rpc_task *task)
376{
377 spin_lock_bh(&xprt->transport_lock);
378 xprt->ops->release_xprt(xprt, task);
379 spin_unlock_bh(&xprt->transport_lock);
380}
381
382
383
384
385
386static int
387__xprt_get_cong(struct rpc_xprt *xprt, struct rpc_task *task)
388{
389 struct rpc_rqst *req = task->tk_rqstp;
390
391 if (req->rq_cong)
392 return 1;
393 dprintk("RPC: %5u xprt_cwnd_limited cong = %lu cwnd = %lu\n",
394 task->tk_pid, xprt->cong, xprt->cwnd);
395 if (RPCXPRT_CONGESTED(xprt))
396 return 0;
397 req->rq_cong = 1;
398 xprt->cong += RPC_CWNDSCALE;
399 return 1;
400}
401
402
403
404
405
406static void
407__xprt_put_cong(struct rpc_xprt *xprt, struct rpc_rqst *req)
408{
409 if (!req->rq_cong)
410 return;
411 req->rq_cong = 0;
412 xprt->cong -= RPC_CWNDSCALE;
413 __xprt_lock_write_next_cong(xprt);
414}
415
416
417
418
419
420
421
422void xprt_release_rqst_cong(struct rpc_task *task)
423{
424 __xprt_put_cong(task->tk_xprt, task->tk_rqstp);
425}
426EXPORT_SYMBOL_GPL(xprt_release_rqst_cong);
427
428
429
430
431
432
433
434
435void xprt_adjust_cwnd(struct rpc_task *task, int result)
436{
437 struct rpc_rqst *req = task->tk_rqstp;
438 struct rpc_xprt *xprt = task->tk_xprt;
439 unsigned long cwnd = xprt->cwnd;
440
441 if (result >= 0 && cwnd <= xprt->cong) {
442
443
444 cwnd += (RPC_CWNDSCALE * RPC_CWNDSCALE + (cwnd >> 1)) / cwnd;
445 if (cwnd > RPC_MAXCWND(xprt))
446 cwnd = RPC_MAXCWND(xprt);
447 __xprt_lock_write_next_cong(xprt);
448 } else if (result == -ETIMEDOUT) {
449 cwnd >>= 1;
450 if (cwnd < RPC_CWNDSCALE)
451 cwnd = RPC_CWNDSCALE;
452 }
453 dprintk("RPC: cong %ld, cwnd was %ld, now %ld\n",
454 xprt->cong, xprt->cwnd, cwnd);
455 xprt->cwnd = cwnd;
456 __xprt_put_cong(xprt, req);
457}
458EXPORT_SYMBOL_GPL(xprt_adjust_cwnd);
459
460
461
462
463
464
465
466void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
467{
468 if (status < 0)
469 rpc_wake_up_status(&xprt->pending, status);
470 else
471 rpc_wake_up(&xprt->pending);
472}
473EXPORT_SYMBOL_GPL(xprt_wake_pending_tasks);
474
475
476
477
478
479
480void xprt_wait_for_buffer_space(struct rpc_task *task, rpc_action action)
481{
482 struct rpc_rqst *req = task->tk_rqstp;
483 struct rpc_xprt *xprt = req->rq_xprt;
484
485 task->tk_timeout = req->rq_timeout;
486 rpc_sleep_on(&xprt->pending, task, action);
487}
488EXPORT_SYMBOL_GPL(xprt_wait_for_buffer_space);
489
490
491
492
493
494
495
496void xprt_write_space(struct rpc_xprt *xprt)
497{
498 if (unlikely(xprt->shutdown))
499 return;
500
501 spin_lock_bh(&xprt->transport_lock);
502 if (xprt->snd_task) {
503 dprintk("RPC: write space: waking waiting task on "
504 "xprt %p\n", xprt);
505 rpc_wake_up_queued_task(&xprt->pending, xprt->snd_task);
506 }
507 spin_unlock_bh(&xprt->transport_lock);
508}
509EXPORT_SYMBOL_GPL(xprt_write_space);
510
511
512
513
514
515
516
517
518
519void xprt_set_retrans_timeout_def(struct rpc_task *task)
520{
521 task->tk_timeout = task->tk_rqstp->rq_timeout;
522}
523EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_def);
524
525
526
527
528
529
530
531void xprt_set_retrans_timeout_rtt(struct rpc_task *task)
532{
533 int timer = task->tk_msg.rpc_proc->p_timer;
534 struct rpc_clnt *clnt = task->tk_client;
535 struct rpc_rtt *rtt = clnt->cl_rtt;
536 struct rpc_rqst *req = task->tk_rqstp;
537 unsigned long max_timeout = clnt->cl_timeout->to_maxval;
538
539 task->tk_timeout = rpc_calc_rto(rtt, timer);
540 task->tk_timeout <<= rpc_ntimeo(rtt, timer) + req->rq_retries;
541 if (task->tk_timeout > max_timeout || task->tk_timeout == 0)
542 task->tk_timeout = max_timeout;
543}
544EXPORT_SYMBOL_GPL(xprt_set_retrans_timeout_rtt);
545
546static void xprt_reset_majortimeo(struct rpc_rqst *req)
547{
548 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
549
550 req->rq_majortimeo = req->rq_timeout;
551 if (to->to_exponential)
552 req->rq_majortimeo <<= to->to_retries;
553 else
554 req->rq_majortimeo += to->to_increment * to->to_retries;
555 if (req->rq_majortimeo > to->to_maxval || req->rq_majortimeo == 0)
556 req->rq_majortimeo = to->to_maxval;
557 req->rq_majortimeo += jiffies;
558}
559
560
561
562
563
564
565int xprt_adjust_timeout(struct rpc_rqst *req)
566{
567 struct rpc_xprt *xprt = req->rq_xprt;
568 const struct rpc_timeout *to = req->rq_task->tk_client->cl_timeout;
569 int status = 0;
570
571 if (time_before(jiffies, req->rq_majortimeo)) {
572 if (to->to_exponential)
573 req->rq_timeout <<= 1;
574 else
575 req->rq_timeout += to->to_increment;
576 if (to->to_maxval && req->rq_timeout >= to->to_maxval)
577 req->rq_timeout = to->to_maxval;
578 req->rq_retries++;
579 } else {
580 req->rq_timeout = to->to_initval;
581 req->rq_retries = 0;
582 xprt_reset_majortimeo(req);
583
584 spin_lock_bh(&xprt->transport_lock);
585 rpc_init_rtt(req->rq_task->tk_client->cl_rtt, to->to_initval);
586 spin_unlock_bh(&xprt->transport_lock);
587 status = -ETIMEDOUT;
588 }
589
590 if (req->rq_timeout == 0) {
591 printk(KERN_WARNING "xprt_adjust_timeout: rq_timeout = 0!\n");
592 req->rq_timeout = 5 * HZ;
593 }
594 return status;
595}
596
597static void xprt_autoclose(struct work_struct *work)
598{
599 struct rpc_xprt *xprt =
600 container_of(work, struct rpc_xprt, task_cleanup);
601
602 xprt->ops->close(xprt);
603 clear_bit(XPRT_CLOSE_WAIT, &xprt->state);
604 xprt_release_write(xprt, NULL);
605}
606
607
608
609
610
611
612void xprt_disconnect_done(struct rpc_xprt *xprt)
613{
614 dprintk("RPC: disconnected transport %p\n", xprt);
615 spin_lock_bh(&xprt->transport_lock);
616 xprt_clear_connected(xprt);
617 xprt_wake_pending_tasks(xprt, -EAGAIN);
618 spin_unlock_bh(&xprt->transport_lock);
619}
620EXPORT_SYMBOL_GPL(xprt_disconnect_done);
621
622
623
624
625
626
627void xprt_force_disconnect(struct rpc_xprt *xprt)
628{
629
630 spin_lock_bh(&xprt->transport_lock);
631 set_bit(XPRT_CLOSE_WAIT, &xprt->state);
632
633 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
634 queue_work(rpciod_workqueue, &xprt->task_cleanup);
635 xprt_wake_pending_tasks(xprt, -EAGAIN);
636 spin_unlock_bh(&xprt->transport_lock);
637}
638
639
640
641
642
643
644
645
646
647
648
649
650void xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie)
651{
652
653 spin_lock_bh(&xprt->transport_lock);
654 if (cookie != xprt->connect_cookie)
655 goto out;
656 if (test_bit(XPRT_CLOSING, &xprt->state) || !xprt_connected(xprt))
657 goto out;
658 set_bit(XPRT_CLOSE_WAIT, &xprt->state);
659
660 if (test_and_set_bit(XPRT_LOCKED, &xprt->state) == 0)
661 queue_work(rpciod_workqueue, &xprt->task_cleanup);
662 xprt_wake_pending_tasks(xprt, -EAGAIN);
663out:
664 spin_unlock_bh(&xprt->transport_lock);
665}
666
667static void
668xprt_init_autodisconnect(unsigned long data)
669{
670 struct rpc_xprt *xprt = (struct rpc_xprt *)data;
671
672 spin_lock(&xprt->transport_lock);
673 if (!list_empty(&xprt->recv) || xprt->shutdown)
674 goto out_abort;
675 if (test_and_set_bit(XPRT_LOCKED, &xprt->state))
676 goto out_abort;
677 spin_unlock(&xprt->transport_lock);
678 set_bit(XPRT_CONNECTION_CLOSE, &xprt->state);
679 queue_work(rpciod_workqueue, &xprt->task_cleanup);
680 return;
681out_abort:
682 spin_unlock(&xprt->transport_lock);
683}
684
685
686
687
688
689
690void xprt_connect(struct rpc_task *task)
691{
692 struct rpc_xprt *xprt = task->tk_xprt;
693
694 dprintk("RPC: %5u xprt_connect xprt %p %s connected\n", task->tk_pid,
695 xprt, (xprt_connected(xprt) ? "is" : "is not"));
696
697 if (!xprt_bound(xprt)) {
698 task->tk_status = -EAGAIN;
699 return;
700 }
701 if (!xprt_lock_write(xprt, task))
702 return;
703 if (xprt_connected(xprt))
704 xprt_release_write(xprt, task);
705 else {
706 if (task->tk_rqstp)
707 task->tk_rqstp->rq_bytes_sent = 0;
708
709 task->tk_timeout = xprt->connect_timeout;
710 rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
711 xprt->stat.connect_start = jiffies;
712 xprt->ops->connect(task);
713 }
714 return;
715}
716
717static void xprt_connect_status(struct rpc_task *task)
718{
719 struct rpc_xprt *xprt = task->tk_xprt;
720
721 if (task->tk_status == 0) {
722 xprt->stat.connect_count++;
723 xprt->stat.connect_time += (long)jiffies - xprt->stat.connect_start;
724 dprintk("RPC: %5u xprt_connect_status: connection established\n",
725 task->tk_pid);
726 return;
727 }
728
729 switch (task->tk_status) {
730 case -EAGAIN:
731 dprintk("RPC: %5u xprt_connect_status: retrying\n", task->tk_pid);
732 break;
733 case -ETIMEDOUT:
734 dprintk("RPC: %5u xprt_connect_status: connect attempt timed "
735 "out\n", task->tk_pid);
736 break;
737 default:
738 dprintk("RPC: %5u xprt_connect_status: error %d connecting to "
739 "server %s\n", task->tk_pid, -task->tk_status,
740 task->tk_client->cl_server);
741 xprt_release_write(xprt, task);
742 task->tk_status = -EIO;
743 }
744}
745
746
747
748
749
750
751
752struct rpc_rqst *xprt_lookup_rqst(struct rpc_xprt *xprt, __be32 xid)
753{
754 struct list_head *pos;
755
756 list_for_each(pos, &xprt->recv) {
757 struct rpc_rqst *entry = list_entry(pos, struct rpc_rqst, rq_list);
758 if (entry->rq_xid == xid)
759 return entry;
760 }
761
762 dprintk("RPC: xprt_lookup_rqst did not find xid %08x\n",
763 ntohl(xid));
764 xprt->stat.bad_xids++;
765 return NULL;
766}
767EXPORT_SYMBOL_GPL(xprt_lookup_rqst);
768
769
770
771
772
773
774void xprt_update_rtt(struct rpc_task *task)
775{
776 struct rpc_rqst *req = task->tk_rqstp;
777 struct rpc_rtt *rtt = task->tk_client->cl_rtt;
778 unsigned timer = task->tk_msg.rpc_proc->p_timer;
779
780 if (timer) {
781 if (req->rq_ntrans == 1)
782 rpc_update_rtt(rtt, timer,
783 (long)jiffies - req->rq_xtime);
784 rpc_set_timeo(rtt, timer, req->rq_ntrans - 1);
785 }
786}
787EXPORT_SYMBOL_GPL(xprt_update_rtt);
788
789
790
791
792
793
794
795
796void xprt_complete_rqst(struct rpc_task *task, int copied)
797{
798 struct rpc_rqst *req = task->tk_rqstp;
799 struct rpc_xprt *xprt = req->rq_xprt;
800
801 dprintk("RPC: %5u xid %08x complete (%d bytes received)\n",
802 task->tk_pid, ntohl(req->rq_xid), copied);
803
804 xprt->stat.recvs++;
805 task->tk_rtt = (long)jiffies - req->rq_xtime;
806
807 list_del_init(&req->rq_list);
808 req->rq_private_buf.len = copied;
809
810
811 smp_wmb();
812 req->rq_reply_bytes_recvd = copied;
813 rpc_wake_up_queued_task(&xprt->pending, task);
814}
815EXPORT_SYMBOL_GPL(xprt_complete_rqst);
816
817static void xprt_timer(struct rpc_task *task)
818{
819 struct rpc_rqst *req = task->tk_rqstp;
820 struct rpc_xprt *xprt = req->rq_xprt;
821
822 if (task->tk_status != -ETIMEDOUT)
823 return;
824 dprintk("RPC: %5u xprt_timer\n", task->tk_pid);
825
826 spin_lock_bh(&xprt->transport_lock);
827 if (!req->rq_reply_bytes_recvd) {
828 if (xprt->ops->timer)
829 xprt->ops->timer(task);
830 } else
831 task->tk_status = 0;
832 spin_unlock_bh(&xprt->transport_lock);
833}
834
835static inline int xprt_has_timer(struct rpc_xprt *xprt)
836{
837 return xprt->idle_timeout != 0;
838}
839
840
841
842
843
844
845int xprt_prepare_transmit(struct rpc_task *task)
846{
847 struct rpc_rqst *req = task->tk_rqstp;
848 struct rpc_xprt *xprt = req->rq_xprt;
849 int err = 0;
850
851 dprintk("RPC: %5u xprt_prepare_transmit\n", task->tk_pid);
852
853 spin_lock_bh(&xprt->transport_lock);
854 if (req->rq_reply_bytes_recvd && !req->rq_bytes_sent) {
855 err = req->rq_reply_bytes_recvd;
856 goto out_unlock;
857 }
858 if (!xprt->ops->reserve_xprt(task))
859 err = -EAGAIN;
860out_unlock:
861 spin_unlock_bh(&xprt->transport_lock);
862 return err;
863}
864
865void xprt_end_transmit(struct rpc_task *task)
866{
867 xprt_release_write(task->tk_rqstp->rq_xprt, task);
868}
869
870
871
872
873
874
875
876void xprt_transmit(struct rpc_task *task)
877{
878 struct rpc_rqst *req = task->tk_rqstp;
879 struct rpc_xprt *xprt = req->rq_xprt;
880 int status;
881
882 dprintk("RPC: %5u xprt_transmit(%u)\n", task->tk_pid, req->rq_slen);
883
884 if (!req->rq_reply_bytes_recvd) {
885 if (list_empty(&req->rq_list) && rpc_reply_expected(task)) {
886
887
888
889 spin_lock_bh(&xprt->transport_lock);
890
891 memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
892 sizeof(req->rq_private_buf));
893
894 list_add_tail(&req->rq_list, &xprt->recv);
895 spin_unlock_bh(&xprt->transport_lock);
896 xprt_reset_majortimeo(req);
897
898 del_singleshot_timer_sync(&xprt->timer);
899 }
900 } else if (!req->rq_bytes_sent)
901 return;
902
903 req->rq_connect_cookie = xprt->connect_cookie;
904 req->rq_xtime = jiffies;
905 status = xprt->ops->send_request(task);
906 if (status != 0) {
907 task->tk_status = status;
908 return;
909 }
910
911 dprintk("RPC: %5u xmit complete\n", task->tk_pid);
912 spin_lock_bh(&xprt->transport_lock);
913
914 xprt->ops->set_retrans_timeout(task);
915
916 xprt->stat.sends++;
917 xprt->stat.req_u += xprt->stat.sends - xprt->stat.recvs;
918 xprt->stat.bklog_u += xprt->backlog.qlen;
919
920
921 if (!xprt_connected(xprt))
922 task->tk_status = -ENOTCONN;
923 else if (!req->rq_reply_bytes_recvd && rpc_reply_expected(task)) {
924
925
926
927
928 rpc_sleep_on(&xprt->pending, task, xprt_timer);
929 }
930 spin_unlock_bh(&xprt->transport_lock);
931}
932
933static inline void do_xprt_reserve(struct rpc_task *task)
934{
935 struct rpc_xprt *xprt = task->tk_xprt;
936
937 task->tk_status = 0;
938 if (task->tk_rqstp)
939 return;
940 if (!list_empty(&xprt->free)) {
941 struct rpc_rqst *req = list_entry(xprt->free.next, struct rpc_rqst, rq_list);
942 list_del_init(&req->rq_list);
943 task->tk_rqstp = req;
944 xprt_request_init(task, xprt);
945 return;
946 }
947 dprintk("RPC: waiting for request slot\n");
948 task->tk_status = -EAGAIN;
949 task->tk_timeout = 0;
950 rpc_sleep_on(&xprt->backlog, task, NULL);
951}
952
953
954
955
956
957
958
959
960void xprt_reserve(struct rpc_task *task)
961{
962 struct rpc_xprt *xprt = task->tk_xprt;
963
964 task->tk_status = -EIO;
965 spin_lock(&xprt->reserve_lock);
966 do_xprt_reserve(task);
967 spin_unlock(&xprt->reserve_lock);
968}
969
970static inline __be32 xprt_alloc_xid(struct rpc_xprt *xprt)
971{
972 return xprt->xid++;
973}
974
975static inline void xprt_init_xid(struct rpc_xprt *xprt)
976{
977 xprt->xid = net_random();
978}
979
980static void xprt_request_init(struct rpc_task *task, struct rpc_xprt *xprt)
981{
982 struct rpc_rqst *req = task->tk_rqstp;
983
984 req->rq_timeout = task->tk_client->cl_timeout->to_initval;
985 req->rq_task = task;
986 req->rq_xprt = xprt;
987 req->rq_buffer = NULL;
988 req->rq_xid = xprt_alloc_xid(xprt);
989 req->rq_release_snd_buf = NULL;
990 xprt_reset_majortimeo(req);
991 dprintk("RPC: %5u reserved req %p xid %08x\n", task->tk_pid,
992 req, ntohl(req->rq_xid));
993}
994
995
996
997
998
999
1000void xprt_release(struct rpc_task *task)
1001{
1002 struct rpc_xprt *xprt;
1003 struct rpc_rqst *req;
1004 int is_bc_request;
1005
1006 if (!(req = task->tk_rqstp))
1007 return;
1008
1009
1010 is_bc_request = bc_prealloc(req);
1011
1012 xprt = req->rq_xprt;
1013 rpc_count_iostats(task);
1014 spin_lock_bh(&xprt->transport_lock);
1015 xprt->ops->release_xprt(xprt, task);
1016 if (xprt->ops->release_request)
1017 xprt->ops->release_request(task);
1018 if (!list_empty(&req->rq_list))
1019 list_del(&req->rq_list);
1020 xprt->last_used = jiffies;
1021 if (list_empty(&xprt->recv) && xprt_has_timer(xprt))
1022 mod_timer(&xprt->timer,
1023 xprt->last_used + xprt->idle_timeout);
1024 spin_unlock_bh(&xprt->transport_lock);
1025 if (!bc_prealloc(req))
1026 xprt->ops->buf_free(req->rq_buffer);
1027 task->tk_rqstp = NULL;
1028 if (req->rq_release_snd_buf)
1029 req->rq_release_snd_buf(req);
1030
1031
1032
1033
1034
1035 if (is_bc_request)
1036 return;
1037
1038 memset(req, 0, sizeof(*req));
1039
1040 dprintk("RPC: %5u release request %p\n", task->tk_pid, req);
1041
1042 spin_lock(&xprt->reserve_lock);
1043 list_add(&req->rq_list, &xprt->free);
1044 rpc_wake_up_next(&xprt->backlog);
1045 spin_unlock(&xprt->reserve_lock);
1046}
1047
1048
1049
1050
1051
1052
1053struct rpc_xprt *xprt_create_transport(struct xprt_create *args)
1054{
1055 struct rpc_xprt *xprt;
1056 struct rpc_rqst *req;
1057 struct xprt_class *t;
1058
1059 spin_lock(&xprt_list_lock);
1060 list_for_each_entry(t, &xprt_list, list) {
1061 if (t->ident == args->ident) {
1062 spin_unlock(&xprt_list_lock);
1063 goto found;
1064 }
1065 }
1066 spin_unlock(&xprt_list_lock);
1067 printk(KERN_ERR "RPC: transport (%d) not supported\n", args->ident);
1068 return ERR_PTR(-EIO);
1069
1070found:
1071 xprt = t->setup(args);
1072 if (IS_ERR(xprt)) {
1073 dprintk("RPC: xprt_create_transport: failed, %ld\n",
1074 -PTR_ERR(xprt));
1075 return xprt;
1076 }
1077
1078 kref_init(&xprt->kref);
1079 spin_lock_init(&xprt->transport_lock);
1080 spin_lock_init(&xprt->reserve_lock);
1081
1082 INIT_LIST_HEAD(&xprt->free);
1083 INIT_LIST_HEAD(&xprt->recv);
1084#if defined(CONFIG_NFS_V4_1)
1085 spin_lock_init(&xprt->bc_pa_lock);
1086 INIT_LIST_HEAD(&xprt->bc_pa_list);
1087#endif
1088
1089 INIT_WORK(&xprt->task_cleanup, xprt_autoclose);
1090 if (xprt_has_timer(xprt))
1091 setup_timer(&xprt->timer, xprt_init_autodisconnect,
1092 (unsigned long)xprt);
1093 else
1094 init_timer(&xprt->timer);
1095 xprt->last_used = jiffies;
1096 xprt->cwnd = RPC_INITCWND;
1097 xprt->bind_index = 0;
1098
1099 rpc_init_wait_queue(&xprt->binding, "xprt_binding");
1100 rpc_init_wait_queue(&xprt->pending, "xprt_pending");
1101 rpc_init_wait_queue(&xprt->sending, "xprt_sending");
1102 rpc_init_wait_queue(&xprt->resend, "xprt_resend");
1103 rpc_init_priority_wait_queue(&xprt->backlog, "xprt_backlog");
1104
1105
1106 for (req = &xprt->slot[xprt->max_reqs-1]; req >= &xprt->slot[0]; req--)
1107 list_add(&req->rq_list, &xprt->free);
1108
1109 xprt_init_xid(xprt);
1110
1111 dprintk("RPC: created transport %p with %u slots\n", xprt,
1112 xprt->max_reqs);
1113 return xprt;
1114}
1115
1116
1117
1118
1119
1120
1121static void xprt_destroy(struct kref *kref)
1122{
1123 struct rpc_xprt *xprt = container_of(kref, struct rpc_xprt, kref);
1124
1125 dprintk("RPC: destroying transport %p\n", xprt);
1126 xprt->shutdown = 1;
1127 del_timer_sync(&xprt->timer);
1128
1129 rpc_destroy_wait_queue(&xprt->binding);
1130 rpc_destroy_wait_queue(&xprt->pending);
1131 rpc_destroy_wait_queue(&xprt->sending);
1132 rpc_destroy_wait_queue(&xprt->resend);
1133 rpc_destroy_wait_queue(&xprt->backlog);
1134
1135
1136
1137 xprt->ops->destroy(xprt);
1138}
1139
1140
1141
1142
1143
1144
1145void xprt_put(struct rpc_xprt *xprt)
1146{
1147 kref_put(&xprt->kref, xprt_destroy);
1148}
1149
1150
1151
1152
1153
1154
1155struct rpc_xprt *xprt_get(struct rpc_xprt *xprt)
1156{
1157 kref_get(&xprt->kref);
1158 return xprt;
1159}
1160