1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42#include <linux/sunrpc/debug.h>
43#include <linux/sunrpc/rpc_rdma.h>
44#include <linux/spinlock.h>
45#include <asm/unaligned.h>
46#include <rdma/ib_verbs.h>
47#include <rdma/rdma_cm.h>
48#include <linux/sunrpc/svc_rdma.h>
49
50#define RPCDBG_FACILITY RPCDBG_SVCXPRT
51
52
53
54
55
56
57static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
58 struct svc_rdma_op_ctxt *ctxt,
59 u32 byte_count)
60{
61 struct page *page;
62 u32 bc;
63 int sge_no;
64
65
66 page = ctxt->pages[0];
67 put_page(rqstp->rq_pages[0]);
68 rqstp->rq_pages[0] = page;
69
70
71 rqstp->rq_arg.head[0].iov_base = page_address(page);
72 rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length);
73 rqstp->rq_arg.len = byte_count;
74 rqstp->rq_arg.buflen = byte_count;
75
76
77 bc = byte_count - rqstp->rq_arg.head[0].iov_len;
78
79
80 rqstp->rq_arg.page_len = bc;
81 rqstp->rq_arg.page_base = 0;
82 rqstp->rq_arg.pages = &rqstp->rq_pages[1];
83 sge_no = 1;
84 while (bc && sge_no < ctxt->count) {
85 page = ctxt->pages[sge_no];
86 put_page(rqstp->rq_pages[sge_no]);
87 rqstp->rq_pages[sge_no] = page;
88 bc -= min(bc, ctxt->sge[sge_no].length);
89 rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
90 sge_no++;
91 }
92 rqstp->rq_respages = &rqstp->rq_pages[sge_no];
93
94
95
96
97 BUG_ON(bc && (sge_no == ctxt->count));
98 BUG_ON((rqstp->rq_arg.head[0].iov_len + rqstp->rq_arg.page_len)
99 != byte_count);
100 BUG_ON(rqstp->rq_arg.len != byte_count);
101
102
103 bc = sge_no;
104 while (sge_no < ctxt->count) {
105 page = ctxt->pages[sge_no++];
106 put_page(page);
107 }
108 ctxt->count = bc;
109
110
111 rqstp->rq_arg.tail[0].iov_base = NULL;
112 rqstp->rq_arg.tail[0].iov_len = 0;
113}
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128static int map_read_chunks(struct svcxprt_rdma *xprt,
129 struct svc_rqst *rqstp,
130 struct svc_rdma_op_ctxt *head,
131 struct rpcrdma_msg *rmsgp,
132 struct svc_rdma_req_map *rpl_map,
133 struct svc_rdma_req_map *chl_map,
134 int ch_count,
135 int byte_count)
136{
137 int sge_no;
138 int sge_bytes;
139 int page_off;
140 int page_no;
141 int ch_bytes;
142 int ch_no;
143 struct rpcrdma_read_chunk *ch;
144
145 sge_no = 0;
146 page_no = 0;
147 page_off = 0;
148 ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
149 ch_no = 0;
150 ch_bytes = ntohl(ch->rc_target.rs_length);
151 head->arg.head[0] = rqstp->rq_arg.head[0];
152 head->arg.tail[0] = rqstp->rq_arg.tail[0];
153 head->arg.pages = &head->pages[head->count];
154 head->hdr_count = head->count;
155 head->arg.page_base = 0;
156 head->arg.page_len = ch_bytes;
157 head->arg.len = rqstp->rq_arg.len + ch_bytes;
158 head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
159 head->count++;
160 chl_map->ch[0].start = 0;
161 while (byte_count) {
162 rpl_map->sge[sge_no].iov_base =
163 page_address(rqstp->rq_arg.pages[page_no]) + page_off;
164 sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
165 rpl_map->sge[sge_no].iov_len = sge_bytes;
166
167
168
169
170 head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
171 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1];
172
173 byte_count -= sge_bytes;
174 ch_bytes -= sge_bytes;
175 sge_no++;
176
177
178
179
180 if (ch_bytes == 0) {
181 chl_map->ch[ch_no].count =
182 sge_no - chl_map->ch[ch_no].start;
183 ch_no++;
184 ch++;
185 chl_map->ch[ch_no].start = sge_no;
186 ch_bytes = ntohl(ch->rc_target.rs_length);
187
188 if (byte_count) {
189 head->arg.page_len += ch_bytes;
190 head->arg.len += ch_bytes;
191 head->arg.buflen += ch_bytes;
192 }
193 }
194
195
196
197
198 if ((sge_bytes + page_off) == PAGE_SIZE) {
199 page_no++;
200 page_off = 0;
201
202
203
204
205 if (byte_count)
206 head->count++;
207 } else
208 page_off += sge_bytes;
209 }
210 BUG_ON(byte_count != 0);
211 return sge_no;
212}
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228static int fast_reg_read_chunks(struct svcxprt_rdma *xprt,
229 struct svc_rqst *rqstp,
230 struct svc_rdma_op_ctxt *head,
231 struct rpcrdma_msg *rmsgp,
232 struct svc_rdma_req_map *rpl_map,
233 struct svc_rdma_req_map *chl_map,
234 int ch_count,
235 int byte_count)
236{
237 int page_no;
238 int ch_no;
239 u32 offset;
240 struct rpcrdma_read_chunk *ch;
241 struct svc_rdma_fastreg_mr *frmr;
242 int ret = 0;
243
244 frmr = svc_rdma_get_frmr(xprt);
245 if (IS_ERR(frmr))
246 return -ENOMEM;
247
248 head->frmr = frmr;
249 head->arg.head[0] = rqstp->rq_arg.head[0];
250 head->arg.tail[0] = rqstp->rq_arg.tail[0];
251 head->arg.pages = &head->pages[head->count];
252 head->hdr_count = head->count;
253 head->arg.page_base = 0;
254 head->arg.page_len = byte_count;
255 head->arg.len = rqstp->rq_arg.len + byte_count;
256 head->arg.buflen = rqstp->rq_arg.buflen + byte_count;
257
258
259 frmr->kva = page_address(rqstp->rq_arg.pages[0]);
260 frmr->direction = DMA_FROM_DEVICE;
261 frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
262 frmr->map_len = byte_count;
263 frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT;
264 for (page_no = 0; page_no < frmr->page_list_len; page_no++) {
265 frmr->page_list->page_list[page_no] =
266 ib_dma_map_page(xprt->sc_cm_id->device,
267 rqstp->rq_arg.pages[page_no], 0,
268 PAGE_SIZE, DMA_FROM_DEVICE);
269 if (ib_dma_mapping_error(xprt->sc_cm_id->device,
270 frmr->page_list->page_list[page_no]))
271 goto fatal_err;
272 atomic_inc(&xprt->sc_dma_used);
273 head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
274 }
275 head->count += page_no;
276
277
278 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
279
280
281 offset = 0;
282 ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
283 for (ch_no = 0; ch_no < ch_count; ch_no++) {
284 int len = ntohl(ch->rc_target.rs_length);
285 rpl_map->sge[ch_no].iov_base = frmr->kva + offset;
286 rpl_map->sge[ch_no].iov_len = len;
287 chl_map->ch[ch_no].count = 1;
288 chl_map->ch[ch_no].start = ch_no;
289 offset += len;
290 ch++;
291 }
292
293 ret = svc_rdma_fastreg(xprt, frmr);
294 if (ret)
295 goto fatal_err;
296
297 return ch_no;
298
299 fatal_err:
300 printk("svcrdma: error fast registering xdr for xprt %p", xprt);
301 svc_rdma_put_frmr(xprt, frmr);
302 return -EIO;
303}
304
305static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt,
306 struct svc_rdma_op_ctxt *ctxt,
307 struct svc_rdma_fastreg_mr *frmr,
308 struct kvec *vec,
309 u64 *sgl_offset,
310 int count)
311{
312 int i;
313 unsigned long off;
314
315 ctxt->count = count;
316 ctxt->direction = DMA_FROM_DEVICE;
317 for (i = 0; i < count; i++) {
318 ctxt->sge[i].length = 0;
319 if (!frmr) {
320 BUG_ON(!virt_to_page(vec[i].iov_base));
321 off = (unsigned long)vec[i].iov_base & ~PAGE_MASK;
322 ctxt->sge[i].addr =
323 ib_dma_map_page(xprt->sc_cm_id->device,
324 virt_to_page(vec[i].iov_base),
325 off,
326 vec[i].iov_len,
327 DMA_FROM_DEVICE);
328 if (ib_dma_mapping_error(xprt->sc_cm_id->device,
329 ctxt->sge[i].addr))
330 return -EINVAL;
331 ctxt->sge[i].lkey = xprt->sc_dma_lkey;
332 atomic_inc(&xprt->sc_dma_used);
333 } else {
334 ctxt->sge[i].addr = (unsigned long)vec[i].iov_base;
335 ctxt->sge[i].lkey = frmr->mr->lkey;
336 }
337 ctxt->sge[i].length = vec[i].iov_len;
338 *sgl_offset = *sgl_offset + vec[i].iov_len;
339 }
340 return 0;
341}
342
343static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
344{
345 if ((rdma_node_get_transport(xprt->sc_cm_id->device->node_type) ==
346 RDMA_TRANSPORT_IWARP) &&
347 sge_count > 1)
348 return 1;
349 else
350 return min_t(int, sge_count, xprt->sc_max_sge);
351}
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381static int rdma_read_xdr(struct svcxprt_rdma *xprt,
382 struct rpcrdma_msg *rmsgp,
383 struct svc_rqst *rqstp,
384 struct svc_rdma_op_ctxt *hdr_ctxt)
385{
386 struct ib_send_wr read_wr;
387 struct ib_send_wr inv_wr;
388 int err = 0;
389 int ch_no;
390 int ch_count;
391 int byte_count;
392 int sge_count;
393 u64 sgl_offset;
394 struct rpcrdma_read_chunk *ch;
395 struct svc_rdma_op_ctxt *ctxt = NULL;
396 struct svc_rdma_req_map *rpl_map;
397 struct svc_rdma_req_map *chl_map;
398
399
400 ch = svc_rdma_get_read_chunk(rmsgp);
401 if (!ch)
402 return 0;
403
404 svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);
405 if (ch_count > RPCSVC_MAXPAGES)
406 return -EINVAL;
407
408
409 rpl_map = svc_rdma_get_req_map();
410 chl_map = svc_rdma_get_req_map();
411
412 if (!xprt->sc_frmr_pg_list_len)
413 sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
414 rpl_map, chl_map, ch_count,
415 byte_count);
416 else
417 sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp,
418 rpl_map, chl_map, ch_count,
419 byte_count);
420 if (sge_count < 0) {
421 err = -EIO;
422 goto out;
423 }
424
425 sgl_offset = 0;
426 ch_no = 0;
427
428 for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
429 ch->rc_discrim != 0; ch++, ch_no++) {
430 u64 rs_offset;
431next_sge:
432 ctxt = svc_rdma_get_context(xprt);
433 ctxt->direction = DMA_FROM_DEVICE;
434 ctxt->frmr = hdr_ctxt->frmr;
435 ctxt->read_hdr = NULL;
436 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
437 clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
438
439
440 memset(&read_wr, 0, sizeof read_wr);
441 read_wr.wr_id = (unsigned long)ctxt;
442 read_wr.opcode = IB_WR_RDMA_READ;
443 ctxt->wr_op = read_wr.opcode;
444 read_wr.send_flags = IB_SEND_SIGNALED;
445 read_wr.wr.rdma.rkey = ntohl(ch->rc_target.rs_handle);
446 xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
447 &rs_offset);
448 read_wr.wr.rdma.remote_addr = rs_offset + sgl_offset;
449 read_wr.sg_list = ctxt->sge;
450 read_wr.num_sge =
451 rdma_read_max_sge(xprt, chl_map->ch[ch_no].count);
452 err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr,
453 &rpl_map->sge[chl_map->ch[ch_no].start],
454 &sgl_offset,
455 read_wr.num_sge);
456 if (err) {
457 svc_rdma_unmap_dma(ctxt);
458 svc_rdma_put_context(ctxt, 0);
459 goto out;
460 }
461 if (((ch+1)->rc_discrim == 0) &&
462 (read_wr.num_sge == chl_map->ch[ch_no].count)) {
463
464
465
466
467
468 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
469 if (hdr_ctxt->frmr) {
470 set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
471
472
473
474
475 if (xprt->sc_dev_caps &
476 SVCRDMA_DEVCAP_READ_W_INV) {
477 read_wr.opcode =
478 IB_WR_RDMA_READ_WITH_INV;
479 ctxt->wr_op = read_wr.opcode;
480 read_wr.ex.invalidate_rkey =
481 ctxt->frmr->mr->lkey;
482 } else {
483
484 memset(&inv_wr, 0, sizeof inv_wr);
485 inv_wr.opcode = IB_WR_LOCAL_INV;
486 inv_wr.send_flags = IB_SEND_SIGNALED;
487 inv_wr.ex.invalidate_rkey =
488 hdr_ctxt->frmr->mr->lkey;
489 read_wr.next = &inv_wr;
490 }
491 }
492 ctxt->read_hdr = hdr_ctxt;
493 }
494
495 err = svc_rdma_send(xprt, &read_wr);
496 if (err) {
497 printk(KERN_ERR "svcrdma: Error %d posting RDMA_READ\n",
498 err);
499 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
500 svc_rdma_unmap_dma(ctxt);
501 svc_rdma_put_context(ctxt, 0);
502 goto out;
503 }
504 atomic_inc(&rdma_stat_read);
505
506 if (read_wr.num_sge < chl_map->ch[ch_no].count) {
507 chl_map->ch[ch_no].count -= read_wr.num_sge;
508 chl_map->ch[ch_no].start += read_wr.num_sge;
509 goto next_sge;
510 }
511 sgl_offset = 0;
512 err = 1;
513 }
514
515 out:
516 svc_rdma_put_req_map(rpl_map);
517 svc_rdma_put_req_map(chl_map);
518
519
520 for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++)
521 rqstp->rq_pages[ch_no] = NULL;
522
523
524
525
526
527 while (rqstp->rq_next_page != rqstp->rq_respages)
528 *(--rqstp->rq_next_page) = NULL;
529
530 return err;
531}
532
533static int rdma_read_complete(struct svc_rqst *rqstp,
534 struct svc_rdma_op_ctxt *head)
535{
536 int page_no;
537 int ret;
538
539 BUG_ON(!head);
540
541
542 for (page_no = 0; page_no < head->count; page_no++) {
543 put_page(rqstp->rq_pages[page_no]);
544 rqstp->rq_pages[page_no] = head->pages[page_no];
545 }
546
547 rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
548 rqstp->rq_arg.page_len = head->arg.page_len;
549 rqstp->rq_arg.page_base = head->arg.page_base;
550
551
552 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
553 rqstp->rq_next_page = &rqstp->rq_arg.pages[page_no];
554
555
556 rqstp->rq_arg.head[0] = head->arg.head[0];
557 rqstp->rq_arg.tail[0] = head->arg.tail[0];
558 rqstp->rq_arg.len = head->arg.len;
559 rqstp->rq_arg.buflen = head->arg.buflen;
560
561
562 svc_rdma_put_context(head, 0);
563
564
565 rqstp->rq_prot = IPPROTO_MAX;
566 svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt);
567
568 ret = rqstp->rq_arg.head[0].iov_len
569 + rqstp->rq_arg.page_len
570 + rqstp->rq_arg.tail[0].iov_len;
571 dprintk("svcrdma: deferred read ret=%d, rq_arg.len =%d, "
572 "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
573 ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base,
574 rqstp->rq_arg.head[0].iov_len);
575
576 return ret;
577}
578
579
580
581
582
583
584int svc_rdma_recvfrom(struct svc_rqst *rqstp)
585{
586 struct svc_xprt *xprt = rqstp->rq_xprt;
587 struct svcxprt_rdma *rdma_xprt =
588 container_of(xprt, struct svcxprt_rdma, sc_xprt);
589 struct svc_rdma_op_ctxt *ctxt = NULL;
590 struct rpcrdma_msg *rmsgp;
591 int ret = 0;
592 int len;
593
594 dprintk("svcrdma: rqstp=%p\n", rqstp);
595
596 spin_lock_bh(&rdma_xprt->sc_rq_dto_lock);
597 if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
598 ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
599 struct svc_rdma_op_ctxt,
600 dto_q);
601 list_del_init(&ctxt->dto_q);
602 }
603 if (ctxt) {
604 spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
605 return rdma_read_complete(rqstp, ctxt);
606 }
607
608 if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
609 ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
610 struct svc_rdma_op_ctxt,
611 dto_q);
612 list_del_init(&ctxt->dto_q);
613 } else {
614 atomic_inc(&rdma_stat_rq_starve);
615 clear_bit(XPT_DATA, &xprt->xpt_flags);
616 ctxt = NULL;
617 }
618 spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
619 if (!ctxt) {
620
621
622
623
624
625 if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
626 goto close_out;
627
628 BUG_ON(ret);
629 goto out;
630 }
631 dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
632 ctxt, rdma_xprt, rqstp, ctxt->wc_status);
633 BUG_ON(ctxt->wc_status != IB_WC_SUCCESS);
634 atomic_inc(&rdma_stat_recv);
635
636
637 rdma_build_arg_xdr(rqstp, ctxt, ctxt->byte_len);
638
639
640 len = svc_rdma_xdr_decode_req(&rmsgp, rqstp);
641 rqstp->rq_xprt_hlen = len;
642
643
644 if (len < 0) {
645 if (len == -ENOSYS)
646 svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
647 goto close_out;
648 }
649
650
651 ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt);
652 if (ret > 0) {
653
654 goto defer;
655 }
656 if (ret < 0) {
657
658 svc_rdma_put_context(ctxt, 1);
659 return 0;
660 }
661
662 ret = rqstp->rq_arg.head[0].iov_len
663 + rqstp->rq_arg.page_len
664 + rqstp->rq_arg.tail[0].iov_len;
665 svc_rdma_put_context(ctxt, 0);
666 out:
667 dprintk("svcrdma: ret = %d, rq_arg.len =%d, "
668 "rq_arg.head[0].iov_base=%p, rq_arg.head[0].iov_len = %zd\n",
669 ret, rqstp->rq_arg.len,
670 rqstp->rq_arg.head[0].iov_base,
671 rqstp->rq_arg.head[0].iov_len);
672 rqstp->rq_prot = IPPROTO_MAX;
673 svc_xprt_copy_addrs(rqstp, xprt);
674 return ret;
675
676 close_out:
677 if (ctxt)
678 svc_rdma_put_context(ctxt, 1);
679 dprintk("svcrdma: transport %p is closing\n", xprt);
680
681
682
683
684 set_bit(XPT_CLOSE, &xprt->xpt_flags);
685defer:
686 return 0;
687}
688