1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95#include <linux/slab.h>
96#include <linux/spinlock.h>
97#include <asm/unaligned.h>
98#include <rdma/ib_verbs.h>
99#include <rdma/rdma_cm.h>
100
101#include <linux/sunrpc/xdr.h>
102#include <linux/sunrpc/debug.h>
103#include <linux/sunrpc/rpc_rdma.h>
104#include <linux/sunrpc/svc_rdma.h>
105
106#include "xprt_rdma.h"
107#include <trace/events/rpcrdma.h>
108
109static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc);
110
111static inline struct svc_rdma_recv_ctxt *
112svc_rdma_next_recv_ctxt(struct list_head *list)
113{
114 return list_first_entry_or_null(list, struct svc_rdma_recv_ctxt,
115 rc_list);
116}
117
118static void svc_rdma_recv_cid_init(struct svcxprt_rdma *rdma,
119 struct rpc_rdma_cid *cid)
120{
121 cid->ci_queue_id = rdma->sc_rq_cq->res.id;
122 cid->ci_completion_id = atomic_inc_return(&rdma->sc_completion_ids);
123}
124
125static struct svc_rdma_recv_ctxt *
126svc_rdma_recv_ctxt_alloc(struct svcxprt_rdma *rdma)
127{
128 struct svc_rdma_recv_ctxt *ctxt;
129 dma_addr_t addr;
130 void *buffer;
131
132 ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
133 if (!ctxt)
134 goto fail0;
135 buffer = kmalloc(rdma->sc_max_req_size, GFP_KERNEL);
136 if (!buffer)
137 goto fail1;
138 addr = ib_dma_map_single(rdma->sc_pd->device, buffer,
139 rdma->sc_max_req_size, DMA_FROM_DEVICE);
140 if (ib_dma_mapping_error(rdma->sc_pd->device, addr))
141 goto fail2;
142
143 svc_rdma_recv_cid_init(rdma, &ctxt->rc_cid);
144 pcl_init(&ctxt->rc_call_pcl);
145 pcl_init(&ctxt->rc_read_pcl);
146 pcl_init(&ctxt->rc_write_pcl);
147 pcl_init(&ctxt->rc_reply_pcl);
148
149 ctxt->rc_recv_wr.next = NULL;
150 ctxt->rc_recv_wr.wr_cqe = &ctxt->rc_cqe;
151 ctxt->rc_recv_wr.sg_list = &ctxt->rc_recv_sge;
152 ctxt->rc_recv_wr.num_sge = 1;
153 ctxt->rc_cqe.done = svc_rdma_wc_receive;
154 ctxt->rc_recv_sge.addr = addr;
155 ctxt->rc_recv_sge.length = rdma->sc_max_req_size;
156 ctxt->rc_recv_sge.lkey = rdma->sc_pd->local_dma_lkey;
157 ctxt->rc_recv_buf = buffer;
158 ctxt->rc_temp = false;
159 return ctxt;
160
161fail2:
162 kfree(buffer);
163fail1:
164 kfree(ctxt);
165fail0:
166 return NULL;
167}
168
169static void svc_rdma_recv_ctxt_destroy(struct svcxprt_rdma *rdma,
170 struct svc_rdma_recv_ctxt *ctxt)
171{
172 ib_dma_unmap_single(rdma->sc_pd->device, ctxt->rc_recv_sge.addr,
173 ctxt->rc_recv_sge.length, DMA_FROM_DEVICE);
174 kfree(ctxt->rc_recv_buf);
175 kfree(ctxt);
176}
177
178
179
180
181
182
183void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
184{
185 struct svc_rdma_recv_ctxt *ctxt;
186 struct llist_node *node;
187
188 while ((node = llist_del_first(&rdma->sc_recv_ctxts))) {
189 ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node);
190 svc_rdma_recv_ctxt_destroy(rdma, ctxt);
191 }
192}
193
194
195
196
197
198
199
200struct svc_rdma_recv_ctxt *svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
201{
202 struct svc_rdma_recv_ctxt *ctxt;
203 struct llist_node *node;
204
205 node = llist_del_first(&rdma->sc_recv_ctxts);
206 if (!node)
207 goto out_empty;
208 ctxt = llist_entry(node, struct svc_rdma_recv_ctxt, rc_node);
209
210out:
211 ctxt->rc_page_count = 0;
212 return ctxt;
213
214out_empty:
215 ctxt = svc_rdma_recv_ctxt_alloc(rdma);
216 if (!ctxt)
217 return NULL;
218 goto out;
219}
220
221
222
223
224
225
226
227void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
228 struct svc_rdma_recv_ctxt *ctxt)
229{
230 pcl_free(&ctxt->rc_call_pcl);
231 pcl_free(&ctxt->rc_read_pcl);
232 pcl_free(&ctxt->rc_write_pcl);
233 pcl_free(&ctxt->rc_reply_pcl);
234
235 if (!ctxt->rc_temp)
236 llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts);
237 else
238 svc_rdma_recv_ctxt_destroy(rdma, ctxt);
239}
240
241
242
243
244
245
246
247
248
249void svc_rdma_release_rqst(struct svc_rqst *rqstp)
250{
251 struct svc_rdma_recv_ctxt *ctxt = rqstp->rq_xprt_ctxt;
252 struct svc_xprt *xprt = rqstp->rq_xprt;
253 struct svcxprt_rdma *rdma =
254 container_of(xprt, struct svcxprt_rdma, sc_xprt);
255
256 rqstp->rq_xprt_ctxt = NULL;
257 if (ctxt)
258 svc_rdma_recv_ctxt_put(rdma, ctxt);
259}
260
261static bool svc_rdma_refresh_recvs(struct svcxprt_rdma *rdma,
262 unsigned int wanted, bool temp)
263{
264 const struct ib_recv_wr *bad_wr = NULL;
265 struct svc_rdma_recv_ctxt *ctxt;
266 struct ib_recv_wr *recv_chain;
267 int ret;
268
269 if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
270 return false;
271
272 recv_chain = NULL;
273 while (wanted--) {
274 ctxt = svc_rdma_recv_ctxt_get(rdma);
275 if (!ctxt)
276 break;
277
278 trace_svcrdma_post_recv(ctxt);
279 ctxt->rc_temp = temp;
280 ctxt->rc_recv_wr.next = recv_chain;
281 recv_chain = &ctxt->rc_recv_wr;
282 rdma->sc_pending_recvs++;
283 }
284 if (!recv_chain)
285 return false;
286
287 ret = ib_post_recv(rdma->sc_qp, recv_chain, &bad_wr);
288 if (ret)
289 goto err_free;
290 return true;
291
292err_free:
293 trace_svcrdma_rq_post_err(rdma, ret);
294 while (bad_wr) {
295 ctxt = container_of(bad_wr, struct svc_rdma_recv_ctxt,
296 rc_recv_wr);
297 bad_wr = bad_wr->next;
298 svc_rdma_recv_ctxt_put(rdma, ctxt);
299 }
300
301
302 return false;
303}
304
305
306
307
308
309
310
311bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma)
312{
313 return svc_rdma_refresh_recvs(rdma, rdma->sc_max_requests, true);
314}
315
316
317
318
319
320
321
322static void svc_rdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
323{
324 struct svcxprt_rdma *rdma = cq->cq_context;
325 struct ib_cqe *cqe = wc->wr_cqe;
326 struct svc_rdma_recv_ctxt *ctxt;
327
328 rdma->sc_pending_recvs--;
329
330
331 ctxt = container_of(cqe, struct svc_rdma_recv_ctxt, rc_cqe);
332
333 trace_svcrdma_wc_receive(wc, &ctxt->rc_cid);
334 if (wc->status != IB_WC_SUCCESS)
335 goto flushed;
336
337
338
339
340
341
342
343
344
345
346 if (rdma->sc_pending_recvs < rdma->sc_max_requests)
347 if (!svc_rdma_refresh_recvs(rdma, rdma->sc_recv_batch, false))
348 goto flushed;
349
350
351 ctxt->rc_byte_len = wc->byte_len;
352
353 spin_lock(&rdma->sc_rq_dto_lock);
354 list_add_tail(&ctxt->rc_list, &rdma->sc_rq_dto_q);
355
356 set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
357 spin_unlock(&rdma->sc_rq_dto_lock);
358 if (!test_bit(RDMAXPRT_CONN_PENDING, &rdma->sc_flags))
359 svc_xprt_enqueue(&rdma->sc_xprt);
360 return;
361
362flushed:
363 svc_rdma_recv_ctxt_put(rdma, ctxt);
364 svc_xprt_deferred_close(&rdma->sc_xprt);
365}
366
367
368
369
370
371
372void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma)
373{
374 struct svc_rdma_recv_ctxt *ctxt;
375
376 while ((ctxt = svc_rdma_next_recv_ctxt(&rdma->sc_rq_dto_q))) {
377 list_del(&ctxt->rc_list);
378 svc_rdma_recv_ctxt_put(rdma, ctxt);
379 }
380}
381
382static void svc_rdma_build_arg_xdr(struct svc_rqst *rqstp,
383 struct svc_rdma_recv_ctxt *ctxt)
384{
385 struct xdr_buf *arg = &rqstp->rq_arg;
386
387 arg->head[0].iov_base = ctxt->rc_recv_buf;
388 arg->head[0].iov_len = ctxt->rc_byte_len;
389 arg->tail[0].iov_base = NULL;
390 arg->tail[0].iov_len = 0;
391 arg->page_len = 0;
392 arg->page_base = 0;
393 arg->buflen = ctxt->rc_byte_len;
394 arg->len = ctxt->rc_byte_len;
395}
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417static bool xdr_count_read_segments(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
418{
419 rctxt->rc_call_pcl.cl_count = 0;
420 rctxt->rc_read_pcl.cl_count = 0;
421 while (xdr_item_is_present(p)) {
422 u32 position, handle, length;
423 u64 offset;
424
425 p = xdr_inline_decode(&rctxt->rc_stream,
426 rpcrdma_readseg_maxsz * sizeof(*p));
427 if (!p)
428 return false;
429
430 xdr_decode_read_segment(p, &position, &handle,
431 &length, &offset);
432 if (position) {
433 if (position & 3)
434 return false;
435 ++rctxt->rc_read_pcl.cl_count;
436 } else {
437 ++rctxt->rc_call_pcl.cl_count;
438 }
439
440 p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
441 if (!p)
442 return false;
443 }
444 return true;
445}
446
447
448
449
450
451
452
453
454
455
456
457
458
459static bool xdr_check_read_list(struct svc_rdma_recv_ctxt *rctxt)
460{
461 __be32 *p;
462
463 p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
464 if (!p)
465 return false;
466 if (!xdr_count_read_segments(rctxt, p))
467 return false;
468 if (!pcl_alloc_call(rctxt, p))
469 return false;
470 return pcl_alloc_read(rctxt, p);
471}
472
473static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt)
474{
475 u32 segcount;
476 __be32 *p;
477
478 if (xdr_stream_decode_u32(&rctxt->rc_stream, &segcount))
479 return false;
480
481
482 p = xdr_inline_decode(&rctxt->rc_stream,
483 segcount * rpcrdma_segment_maxsz * sizeof(*p));
484 return p != NULL;
485}
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502static bool xdr_count_write_chunks(struct svc_rdma_recv_ctxt *rctxt, __be32 *p)
503{
504 rctxt->rc_write_pcl.cl_count = 0;
505 while (xdr_item_is_present(p)) {
506 if (!xdr_check_write_chunk(rctxt))
507 return false;
508 ++rctxt->rc_write_pcl.cl_count;
509 p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
510 if (!p)
511 return false;
512 }
513 return true;
514}
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531static bool xdr_check_write_list(struct svc_rdma_recv_ctxt *rctxt)
532{
533 __be32 *p;
534
535 p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
536 if (!p)
537 return false;
538 if (!xdr_count_write_chunks(rctxt, p))
539 return false;
540 if (!pcl_alloc_write(rctxt, &rctxt->rc_write_pcl, p))
541 return false;
542
543 rctxt->rc_cur_result_payload = pcl_first_chunk(&rctxt->rc_write_pcl);
544 return true;
545}
546
547
548
549
550
551
552
553
554
555
556
557
558
559static bool xdr_check_reply_chunk(struct svc_rdma_recv_ctxt *rctxt)
560{
561 __be32 *p;
562
563 p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
564 if (!p)
565 return false;
566
567 if (!xdr_item_is_present(p))
568 return true;
569 if (!xdr_check_write_chunk(rctxt))
570 return false;
571
572 rctxt->rc_reply_pcl.cl_count = 1;
573 return pcl_alloc_write(rctxt, &rctxt->rc_reply_pcl, p);
574}
575
576
577
578
579
580
581
582
583static void svc_rdma_get_inv_rkey(struct svcxprt_rdma *rdma,
584 struct svc_rdma_recv_ctxt *ctxt)
585{
586 struct svc_rdma_segment *segment;
587 struct svc_rdma_chunk *chunk;
588 u32 inv_rkey;
589
590 ctxt->rc_inv_rkey = 0;
591
592 if (!rdma->sc_snd_w_inv)
593 return;
594
595 inv_rkey = 0;
596 pcl_for_each_chunk(chunk, &ctxt->rc_call_pcl) {
597 pcl_for_each_segment(segment, chunk) {
598 if (inv_rkey == 0)
599 inv_rkey = segment->rs_handle;
600 else if (inv_rkey != segment->rs_handle)
601 return;
602 }
603 }
604 pcl_for_each_chunk(chunk, &ctxt->rc_read_pcl) {
605 pcl_for_each_segment(segment, chunk) {
606 if (inv_rkey == 0)
607 inv_rkey = segment->rs_handle;
608 else if (inv_rkey != segment->rs_handle)
609 return;
610 }
611 }
612 pcl_for_each_chunk(chunk, &ctxt->rc_write_pcl) {
613 pcl_for_each_segment(segment, chunk) {
614 if (inv_rkey == 0)
615 inv_rkey = segment->rs_handle;
616 else if (inv_rkey != segment->rs_handle)
617 return;
618 }
619 }
620 pcl_for_each_chunk(chunk, &ctxt->rc_reply_pcl) {
621 pcl_for_each_segment(segment, chunk) {
622 if (inv_rkey == 0)
623 inv_rkey = segment->rs_handle;
624 else if (inv_rkey != segment->rs_handle)
625 return;
626 }
627 }
628 ctxt->rc_inv_rkey = inv_rkey;
629}
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647static int svc_rdma_xdr_decode_req(struct xdr_buf *rq_arg,
648 struct svc_rdma_recv_ctxt *rctxt)
649{
650 __be32 *p, *rdma_argp;
651 unsigned int hdr_len;
652
653 rdma_argp = rq_arg->head[0].iov_base;
654 xdr_init_decode(&rctxt->rc_stream, rq_arg, rdma_argp, NULL);
655
656 p = xdr_inline_decode(&rctxt->rc_stream,
657 rpcrdma_fixed_maxsz * sizeof(*p));
658 if (unlikely(!p))
659 goto out_short;
660 p++;
661 if (*p != rpcrdma_version)
662 goto out_version;
663 p += 2;
664 rctxt->rc_msgtype = *p;
665 switch (rctxt->rc_msgtype) {
666 case rdma_msg:
667 break;
668 case rdma_nomsg:
669 break;
670 case rdma_done:
671 goto out_drop;
672 case rdma_error:
673 goto out_drop;
674 default:
675 goto out_proc;
676 }
677
678 if (!xdr_check_read_list(rctxt))
679 goto out_inval;
680 if (!xdr_check_write_list(rctxt))
681 goto out_inval;
682 if (!xdr_check_reply_chunk(rctxt))
683 goto out_inval;
684
685 rq_arg->head[0].iov_base = rctxt->rc_stream.p;
686 hdr_len = xdr_stream_pos(&rctxt->rc_stream);
687 rq_arg->head[0].iov_len -= hdr_len;
688 rq_arg->len -= hdr_len;
689 trace_svcrdma_decode_rqst(rctxt, rdma_argp, hdr_len);
690 return hdr_len;
691
692out_short:
693 trace_svcrdma_decode_short_err(rctxt, rq_arg->len);
694 return -EINVAL;
695
696out_version:
697 trace_svcrdma_decode_badvers_err(rctxt, rdma_argp);
698 return -EPROTONOSUPPORT;
699
700out_drop:
701 trace_svcrdma_decode_drop_err(rctxt, rdma_argp);
702 return 0;
703
704out_proc:
705 trace_svcrdma_decode_badproc_err(rctxt, rdma_argp);
706 return -EINVAL;
707
708out_inval:
709 trace_svcrdma_decode_parse_err(rctxt, rdma_argp);
710 return -EINVAL;
711}
712
713static void svc_rdma_send_error(struct svcxprt_rdma *rdma,
714 struct svc_rdma_recv_ctxt *rctxt,
715 int status)
716{
717 struct svc_rdma_send_ctxt *sctxt;
718
719 sctxt = svc_rdma_send_ctxt_get(rdma);
720 if (!sctxt)
721 return;
722 svc_rdma_send_error_msg(rdma, sctxt, rctxt, status);
723}
724
725
726
727
728
729
730static bool svc_rdma_is_reverse_direction_reply(struct svc_xprt *xprt,
731 struct svc_rdma_recv_ctxt *rctxt)
732{
733 __be32 *p = rctxt->rc_recv_buf;
734
735 if (!xprt->xpt_bc_xprt)
736 return false;
737
738 if (rctxt->rc_msgtype != rdma_msg)
739 return false;
740
741 if (!pcl_is_empty(&rctxt->rc_call_pcl))
742 return false;
743 if (!pcl_is_empty(&rctxt->rc_read_pcl))
744 return false;
745 if (!pcl_is_empty(&rctxt->rc_write_pcl))
746 return false;
747 if (!pcl_is_empty(&rctxt->rc_reply_pcl))
748 return false;
749
750
751 if (*(p + 8) == cpu_to_be32(RPC_CALL))
752 return false;
753
754 return true;
755}
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787int svc_rdma_recvfrom(struct svc_rqst *rqstp)
788{
789 struct svc_xprt *xprt = rqstp->rq_xprt;
790 struct svcxprt_rdma *rdma_xprt =
791 container_of(xprt, struct svcxprt_rdma, sc_xprt);
792 struct svc_rdma_recv_ctxt *ctxt;
793 int ret;
794
795 rqstp->rq_xprt_ctxt = NULL;
796
797 ctxt = NULL;
798 spin_lock(&rdma_xprt->sc_rq_dto_lock);
799 ctxt = svc_rdma_next_recv_ctxt(&rdma_xprt->sc_rq_dto_q);
800 if (ctxt)
801 list_del(&ctxt->rc_list);
802 else
803
804 clear_bit(XPT_DATA, &xprt->xpt_flags);
805 spin_unlock(&rdma_xprt->sc_rq_dto_lock);
806
807
808 svc_xprt_received(xprt);
809 if (!ctxt)
810 return 0;
811
812 percpu_counter_inc(&svcrdma_stat_recv);
813 ib_dma_sync_single_for_cpu(rdma_xprt->sc_pd->device,
814 ctxt->rc_recv_sge.addr, ctxt->rc_byte_len,
815 DMA_FROM_DEVICE);
816 svc_rdma_build_arg_xdr(rqstp, ctxt);
817
818
819
820
821 rqstp->rq_respages = rqstp->rq_pages;
822 rqstp->rq_next_page = rqstp->rq_respages;
823
824 ret = svc_rdma_xdr_decode_req(&rqstp->rq_arg, ctxt);
825 if (ret < 0)
826 goto out_err;
827 if (ret == 0)
828 goto out_drop;
829 rqstp->rq_xprt_hlen = ret;
830
831 if (svc_rdma_is_reverse_direction_reply(xprt, ctxt))
832 goto out_backchannel;
833
834 svc_rdma_get_inv_rkey(rdma_xprt, ctxt);
835
836 if (!pcl_is_empty(&ctxt->rc_read_pcl) ||
837 !pcl_is_empty(&ctxt->rc_call_pcl)) {
838 ret = svc_rdma_process_read_list(rdma_xprt, rqstp, ctxt);
839 if (ret < 0)
840 goto out_readfail;
841 }
842
843 rqstp->rq_xprt_ctxt = ctxt;
844 rqstp->rq_prot = IPPROTO_MAX;
845 svc_xprt_copy_addrs(rqstp, xprt);
846 return rqstp->rq_arg.len;
847
848out_err:
849 svc_rdma_send_error(rdma_xprt, ctxt, ret);
850 svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
851 return 0;
852
853out_readfail:
854 if (ret == -EINVAL)
855 svc_rdma_send_error(rdma_xprt, ctxt, ret);
856 svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
857 return ret;
858
859out_backchannel:
860 svc_rdma_handle_bc_reply(rqstp, ctxt);
861out_drop:
862 svc_rdma_recv_ctxt_put(rdma_xprt, ctxt);
863 return 0;
864}
865