1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41#define DEBUG_SUBSYSTEM S_LNET
42
43#include <linux/lnet/lib-lnet.h>
44
45static int local_nid_dist_zero = 1;
46module_param(local_nid_dist_zero, int, 0444);
47MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
48
49int
50lnet_fail_nid(lnet_nid_t nid, unsigned int threshold)
51{
52 lnet_test_peer_t *tp;
53 struct list_head *el;
54 struct list_head *next;
55 struct list_head cull;
56
57 LASSERT(the_lnet.ln_init);
58
59
60 if (threshold != 0) {
61
62 LIBCFS_ALLOC(tp, sizeof(*tp));
63 if (tp == NULL)
64 return -ENOMEM;
65
66 tp->tp_nid = nid;
67 tp->tp_threshold = threshold;
68
69 lnet_net_lock(0);
70 list_add_tail(&tp->tp_list, &the_lnet.ln_test_peers);
71 lnet_net_unlock(0);
72 return 0;
73 }
74
75
76 INIT_LIST_HEAD(&cull);
77
78 lnet_net_lock(0);
79
80 list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
81 tp = list_entry(el, lnet_test_peer_t, tp_list);
82
83 if (tp->tp_threshold == 0 ||
84 nid == LNET_NID_ANY ||
85 tp->tp_nid == nid) {
86 list_del(&tp->tp_list);
87 list_add(&tp->tp_list, &cull);
88 }
89 }
90
91 lnet_net_unlock(0);
92
93 while (!list_empty(&cull)) {
94 tp = list_entry(cull.next, lnet_test_peer_t, tp_list);
95
96 list_del(&tp->tp_list);
97 LIBCFS_FREE(tp, sizeof(*tp));
98 }
99 return 0;
100}
101
102static int
103fail_peer(lnet_nid_t nid, int outgoing)
104{
105 lnet_test_peer_t *tp;
106 struct list_head *el;
107 struct list_head *next;
108 struct list_head cull;
109 int fail = 0;
110
111 INIT_LIST_HEAD(&cull);
112
113
114 lnet_net_lock(0);
115
116 list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
117 tp = list_entry(el, lnet_test_peer_t, tp_list);
118
119 if (tp->tp_threshold == 0) {
120
121 if (outgoing) {
122
123
124
125 list_del(&tp->tp_list);
126 list_add(&tp->tp_list, &cull);
127 }
128 continue;
129 }
130
131 if (tp->tp_nid == LNET_NID_ANY ||
132 nid == tp->tp_nid) {
133 fail = 1;
134
135 if (tp->tp_threshold != LNET_MD_THRESH_INF) {
136 tp->tp_threshold--;
137 if (outgoing &&
138 tp->tp_threshold == 0) {
139
140 list_del(&tp->tp_list);
141 list_add(&tp->tp_list, &cull);
142 }
143 }
144 break;
145 }
146 }
147
148 lnet_net_unlock(0);
149
150 while (!list_empty(&cull)) {
151 tp = list_entry(cull.next, lnet_test_peer_t, tp_list);
152 list_del(&tp->tp_list);
153
154 LIBCFS_FREE(tp, sizeof(*tp));
155 }
156
157 return fail;
158}
159
160unsigned int
161lnet_iov_nob(unsigned int niov, struct iovec *iov)
162{
163 unsigned int nob = 0;
164
165 while (niov-- > 0)
166 nob += (iov++)->iov_len;
167
168 return nob;
169}
170EXPORT_SYMBOL(lnet_iov_nob);
171
172void
173lnet_copy_iov2iov(unsigned int ndiov, struct iovec *diov, unsigned int doffset,
174 unsigned int nsiov, struct iovec *siov, unsigned int soffset,
175 unsigned int nob)
176{
177
178 unsigned int this_nob;
179
180 if (nob == 0)
181 return;
182
183
184 LASSERT(ndiov > 0);
185 while (doffset >= diov->iov_len) {
186 doffset -= diov->iov_len;
187 diov++;
188 ndiov--;
189 LASSERT(ndiov > 0);
190 }
191
192
193 LASSERT(nsiov > 0);
194 while (soffset >= siov->iov_len) {
195 soffset -= siov->iov_len;
196 siov++;
197 nsiov--;
198 LASSERT(nsiov > 0);
199 }
200
201 do {
202 LASSERT(ndiov > 0);
203 LASSERT(nsiov > 0);
204 this_nob = MIN(diov->iov_len - doffset,
205 siov->iov_len - soffset);
206 this_nob = MIN(this_nob, nob);
207
208 memcpy((char *)diov->iov_base + doffset,
209 (char *)siov->iov_base + soffset, this_nob);
210 nob -= this_nob;
211
212 if (diov->iov_len > doffset + this_nob) {
213 doffset += this_nob;
214 } else {
215 diov++;
216 ndiov--;
217 doffset = 0;
218 }
219
220 if (siov->iov_len > soffset + this_nob) {
221 soffset += this_nob;
222 } else {
223 siov++;
224 nsiov--;
225 soffset = 0;
226 }
227 } while (nob > 0);
228}
229EXPORT_SYMBOL(lnet_copy_iov2iov);
230
231int
232lnet_extract_iov(int dst_niov, struct iovec *dst,
233 int src_niov, struct iovec *src,
234 unsigned int offset, unsigned int len)
235{
236
237
238
239 unsigned int frag_len;
240 unsigned int niov;
241
242 if (len == 0)
243 return 0;
244
245 LASSERT(src_niov > 0);
246 while (offset >= src->iov_len) {
247 offset -= src->iov_len;
248 src_niov--;
249 src++;
250 LASSERT(src_niov > 0);
251 }
252
253 niov = 1;
254 for (;;) {
255 LASSERT(src_niov > 0);
256 LASSERT((int)niov <= dst_niov);
257
258 frag_len = src->iov_len - offset;
259 dst->iov_base = ((char *)src->iov_base) + offset;
260
261 if (len <= frag_len) {
262 dst->iov_len = len;
263 return niov;
264 }
265
266 dst->iov_len = frag_len;
267
268 len -= frag_len;
269 dst++;
270 src++;
271 niov++;
272 src_niov--;
273 offset = 0;
274 }
275}
276EXPORT_SYMBOL(lnet_extract_iov);
277
278
279unsigned int
280lnet_kiov_nob(unsigned int niov, lnet_kiov_t *kiov)
281{
282 unsigned int nob = 0;
283
284 while (niov-- > 0)
285 nob += (kiov++)->kiov_len;
286
287 return nob;
288}
289EXPORT_SYMBOL(lnet_kiov_nob);
290
291void
292lnet_copy_kiov2kiov(unsigned int ndiov, lnet_kiov_t *diov, unsigned int doffset,
293 unsigned int nsiov, lnet_kiov_t *siov, unsigned int soffset,
294 unsigned int nob)
295{
296
297 unsigned int this_nob;
298 char *daddr = NULL;
299 char *saddr = NULL;
300
301 if (nob == 0)
302 return;
303
304 LASSERT(!in_interrupt());
305
306 LASSERT(ndiov > 0);
307 while (doffset >= diov->kiov_len) {
308 doffset -= diov->kiov_len;
309 diov++;
310 ndiov--;
311 LASSERT(ndiov > 0);
312 }
313
314 LASSERT(nsiov > 0);
315 while (soffset >= siov->kiov_len) {
316 soffset -= siov->kiov_len;
317 siov++;
318 nsiov--;
319 LASSERT(nsiov > 0);
320 }
321
322 do {
323 LASSERT(ndiov > 0);
324 LASSERT(nsiov > 0);
325 this_nob = MIN(diov->kiov_len - doffset,
326 siov->kiov_len - soffset);
327 this_nob = MIN(this_nob, nob);
328
329 if (daddr == NULL)
330 daddr = ((char *)kmap(diov->kiov_page)) +
331 diov->kiov_offset + doffset;
332 if (saddr == NULL)
333 saddr = ((char *)kmap(siov->kiov_page)) +
334 siov->kiov_offset + soffset;
335
336
337
338
339
340 memcpy(daddr, saddr, this_nob);
341 nob -= this_nob;
342
343 if (diov->kiov_len > doffset + this_nob) {
344 daddr += this_nob;
345 doffset += this_nob;
346 } else {
347 kunmap(diov->kiov_page);
348 daddr = NULL;
349 diov++;
350 ndiov--;
351 doffset = 0;
352 }
353
354 if (siov->kiov_len > soffset + this_nob) {
355 saddr += this_nob;
356 soffset += this_nob;
357 } else {
358 kunmap(siov->kiov_page);
359 saddr = NULL;
360 siov++;
361 nsiov--;
362 soffset = 0;
363 }
364 } while (nob > 0);
365
366 if (daddr != NULL)
367 kunmap(diov->kiov_page);
368 if (saddr != NULL)
369 kunmap(siov->kiov_page);
370}
371EXPORT_SYMBOL(lnet_copy_kiov2kiov);
372
373void
374lnet_copy_kiov2iov(unsigned int niov, struct iovec *iov, unsigned int iovoffset,
375 unsigned int nkiov, lnet_kiov_t *kiov,
376 unsigned int kiovoffset, unsigned int nob)
377{
378
379 unsigned int this_nob;
380 char *addr = NULL;
381
382 if (nob == 0)
383 return;
384
385 LASSERT(!in_interrupt());
386
387 LASSERT(niov > 0);
388 while (iovoffset >= iov->iov_len) {
389 iovoffset -= iov->iov_len;
390 iov++;
391 niov--;
392 LASSERT(niov > 0);
393 }
394
395 LASSERT(nkiov > 0);
396 while (kiovoffset >= kiov->kiov_len) {
397 kiovoffset -= kiov->kiov_len;
398 kiov++;
399 nkiov--;
400 LASSERT(nkiov > 0);
401 }
402
403 do {
404 LASSERT(niov > 0);
405 LASSERT(nkiov > 0);
406 this_nob = MIN(iov->iov_len - iovoffset,
407 kiov->kiov_len - kiovoffset);
408 this_nob = MIN(this_nob, nob);
409
410 if (addr == NULL)
411 addr = ((char *)kmap(kiov->kiov_page)) +
412 kiov->kiov_offset + kiovoffset;
413
414 memcpy((char *)iov->iov_base + iovoffset, addr, this_nob);
415 nob -= this_nob;
416
417 if (iov->iov_len > iovoffset + this_nob) {
418 iovoffset += this_nob;
419 } else {
420 iov++;
421 niov--;
422 iovoffset = 0;
423 }
424
425 if (kiov->kiov_len > kiovoffset + this_nob) {
426 addr += this_nob;
427 kiovoffset += this_nob;
428 } else {
429 kunmap(kiov->kiov_page);
430 addr = NULL;
431 kiov++;
432 nkiov--;
433 kiovoffset = 0;
434 }
435
436 } while (nob > 0);
437
438 if (addr != NULL)
439 kunmap(kiov->kiov_page);
440}
441EXPORT_SYMBOL(lnet_copy_kiov2iov);
442
443void
444lnet_copy_iov2kiov(unsigned int nkiov, lnet_kiov_t *kiov,
445 unsigned int kiovoffset, unsigned int niov,
446 struct iovec *iov, unsigned int iovoffset,
447 unsigned int nob)
448{
449
450 unsigned int this_nob;
451 char *addr = NULL;
452
453 if (nob == 0)
454 return;
455
456 LASSERT(!in_interrupt());
457
458 LASSERT(nkiov > 0);
459 while (kiovoffset >= kiov->kiov_len) {
460 kiovoffset -= kiov->kiov_len;
461 kiov++;
462 nkiov--;
463 LASSERT(nkiov > 0);
464 }
465
466 LASSERT(niov > 0);
467 while (iovoffset >= iov->iov_len) {
468 iovoffset -= iov->iov_len;
469 iov++;
470 niov--;
471 LASSERT(niov > 0);
472 }
473
474 do {
475 LASSERT(nkiov > 0);
476 LASSERT(niov > 0);
477 this_nob = MIN(kiov->kiov_len - kiovoffset,
478 iov->iov_len - iovoffset);
479 this_nob = MIN(this_nob, nob);
480
481 if (addr == NULL)
482 addr = ((char *)kmap(kiov->kiov_page)) +
483 kiov->kiov_offset + kiovoffset;
484
485 memcpy(addr, (char *)iov->iov_base + iovoffset, this_nob);
486 nob -= this_nob;
487
488 if (kiov->kiov_len > kiovoffset + this_nob) {
489 addr += this_nob;
490 kiovoffset += this_nob;
491 } else {
492 kunmap(kiov->kiov_page);
493 addr = NULL;
494 kiov++;
495 nkiov--;
496 kiovoffset = 0;
497 }
498
499 if (iov->iov_len > iovoffset + this_nob) {
500 iovoffset += this_nob;
501 } else {
502 iov++;
503 niov--;
504 iovoffset = 0;
505 }
506 } while (nob > 0);
507
508 if (addr != NULL)
509 kunmap(kiov->kiov_page);
510}
511EXPORT_SYMBOL(lnet_copy_iov2kiov);
512
513int
514lnet_extract_kiov(int dst_niov, lnet_kiov_t *dst,
515 int src_niov, lnet_kiov_t *src,
516 unsigned int offset, unsigned int len)
517{
518
519
520
521 unsigned int frag_len;
522 unsigned int niov;
523
524 if (len == 0)
525 return 0;
526
527 LASSERT(src_niov > 0);
528 while (offset >= src->kiov_len) {
529 offset -= src->kiov_len;
530 src_niov--;
531 src++;
532 LASSERT(src_niov > 0);
533 }
534
535 niov = 1;
536 for (;;) {
537 LASSERT(src_niov > 0);
538 LASSERT((int)niov <= dst_niov);
539
540 frag_len = src->kiov_len - offset;
541 dst->kiov_page = src->kiov_page;
542 dst->kiov_offset = src->kiov_offset + offset;
543
544 if (len <= frag_len) {
545 dst->kiov_len = len;
546 LASSERT(dst->kiov_offset + dst->kiov_len
547 <= PAGE_CACHE_SIZE);
548 return niov;
549 }
550
551 dst->kiov_len = frag_len;
552 LASSERT(dst->kiov_offset + dst->kiov_len <= PAGE_CACHE_SIZE);
553
554 len -= frag_len;
555 dst++;
556 src++;
557 niov++;
558 src_niov--;
559 offset = 0;
560 }
561}
562EXPORT_SYMBOL(lnet_extract_kiov);
563
564void
565lnet_ni_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
566 unsigned int offset, unsigned int mlen, unsigned int rlen)
567{
568 unsigned int niov = 0;
569 struct iovec *iov = NULL;
570 lnet_kiov_t *kiov = NULL;
571 int rc;
572
573 LASSERT(!in_interrupt());
574 LASSERT(mlen == 0 || msg != NULL);
575
576 if (msg != NULL) {
577 LASSERT(msg->msg_receiving);
578 LASSERT(!msg->msg_sending);
579 LASSERT(rlen == msg->msg_len);
580 LASSERT(mlen <= msg->msg_len);
581 LASSERT(msg->msg_offset == offset);
582 LASSERT(msg->msg_wanted == mlen);
583
584 msg->msg_receiving = 0;
585
586 if (mlen != 0) {
587 niov = msg->msg_niov;
588 iov = msg->msg_iov;
589 kiov = msg->msg_kiov;
590
591 LASSERT(niov > 0);
592 LASSERT((iov == NULL) != (kiov == NULL));
593 }
594 }
595
596 rc = (ni->ni_lnd->lnd_recv)(ni, private, msg, delayed,
597 niov, iov, kiov, offset, mlen, rlen);
598 if (rc < 0)
599 lnet_finalize(ni, msg, rc);
600}
601
602void
603lnet_setpayloadbuffer(lnet_msg_t *msg)
604{
605 lnet_libmd_t *md = msg->msg_md;
606
607 LASSERT(msg->msg_len > 0);
608 LASSERT(!msg->msg_routing);
609 LASSERT(md != NULL);
610 LASSERT(msg->msg_niov == 0);
611 LASSERT(msg->msg_iov == NULL);
612 LASSERT(msg->msg_kiov == NULL);
613
614 msg->msg_niov = md->md_niov;
615 if ((md->md_options & LNET_MD_KIOV) != 0)
616 msg->msg_kiov = md->md_iov.kiov;
617 else
618 msg->msg_iov = md->md_iov.iov;
619}
620
621void
622lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
623 unsigned int offset, unsigned int len)
624{
625 msg->msg_type = type;
626 msg->msg_target = target;
627 msg->msg_len = len;
628 msg->msg_offset = offset;
629
630 if (len != 0)
631 lnet_setpayloadbuffer(msg);
632
633 memset(&msg->msg_hdr, 0, sizeof(msg->msg_hdr));
634 msg->msg_hdr.type = cpu_to_le32(type);
635 msg->msg_hdr.dest_nid = cpu_to_le64(target.nid);
636 msg->msg_hdr.dest_pid = cpu_to_le32(target.pid);
637
638 msg->msg_hdr.src_pid = cpu_to_le32(the_lnet.ln_pid);
639 msg->msg_hdr.payload_length = cpu_to_le32(len);
640}
641
642void
643lnet_ni_send(lnet_ni_t *ni, lnet_msg_t *msg)
644{
645 void *priv = msg->msg_private;
646 int rc;
647
648 LASSERT(!in_interrupt());
649 LASSERT(LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
650 (msg->msg_txcredit && msg->msg_peertxcredit));
651
652 rc = (ni->ni_lnd->lnd_send)(ni, priv, msg);
653 if (rc < 0)
654 lnet_finalize(ni, msg, rc);
655}
656
657int
658lnet_ni_eager_recv(lnet_ni_t *ni, lnet_msg_t *msg)
659{
660 int rc;
661
662 LASSERT(!msg->msg_sending);
663 LASSERT(msg->msg_receiving);
664 LASSERT(!msg->msg_rx_ready_delay);
665 LASSERT(ni->ni_lnd->lnd_eager_recv != NULL);
666
667 msg->msg_rx_ready_delay = 1;
668 rc = (ni->ni_lnd->lnd_eager_recv)(ni, msg->msg_private, msg,
669 &msg->msg_private);
670 if (rc != 0) {
671 CERROR("recv from %s / send to %s aborted: "
672 "eager_recv failed %d\n",
673 libcfs_nid2str(msg->msg_rxpeer->lp_nid),
674 libcfs_id2str(msg->msg_target), rc);
675 LASSERT(rc < 0);
676 }
677
678 return rc;
679}
680
681
682void
683lnet_ni_query_locked(lnet_ni_t *ni, lnet_peer_t *lp)
684{
685 cfs_time_t last_alive = 0;
686
687 LASSERT(lnet_peer_aliveness_enabled(lp));
688 LASSERT(ni->ni_lnd->lnd_query != NULL);
689
690 lnet_net_unlock(lp->lp_cpt);
691 (ni->ni_lnd->lnd_query)(ni, lp->lp_nid, &last_alive);
692 lnet_net_lock(lp->lp_cpt);
693
694 lp->lp_last_query = cfs_time_current();
695
696 if (last_alive != 0)
697 lp->lp_last_alive = last_alive;
698}
699
700
701static inline int
702lnet_peer_is_alive(lnet_peer_t *lp, cfs_time_t now)
703{
704 int alive;
705 cfs_time_t deadline;
706
707 LASSERT(lnet_peer_aliveness_enabled(lp));
708
709
710
711
712 if (!lp->lp_alive && lp->lp_alive_count > 0 &&
713 cfs_time_aftereq(lp->lp_timestamp, lp->lp_last_alive))
714 return 0;
715
716 deadline = cfs_time_add(lp->lp_last_alive,
717 cfs_time_seconds(lp->lp_ni->ni_peertimeout));
718 alive = cfs_time_after(deadline, now);
719
720
721
722
723
724 if (alive && !lp->lp_alive &&
725 !(lnet_isrouter(lp) && lp->lp_alive_count == 0))
726 lnet_notify_locked(lp, 0, 1, lp->lp_last_alive);
727
728 return alive;
729}
730
731
732
733
734int
735lnet_peer_alive_locked(lnet_peer_t *lp)
736{
737 cfs_time_t now = cfs_time_current();
738
739 if (!lnet_peer_aliveness_enabled(lp))
740 return -ENODEV;
741
742 if (lnet_peer_is_alive(lp, now))
743 return 1;
744
745
746
747 if (lp->lp_last_query != 0) {
748 static const int lnet_queryinterval = 1;
749
750 cfs_time_t next_query =
751 cfs_time_add(lp->lp_last_query,
752 cfs_time_seconds(lnet_queryinterval));
753
754 if (cfs_time_before(now, next_query)) {
755 if (lp->lp_alive)
756 CWARN("Unexpected aliveness of peer %s: "
757 "%d < %d (%d/%d)\n",
758 libcfs_nid2str(lp->lp_nid),
759 (int)now, (int)next_query,
760 lnet_queryinterval,
761 lp->lp_ni->ni_peertimeout);
762 return 0;
763 }
764 }
765
766
767 lnet_ni_query_locked(lp->lp_ni, lp);
768
769 if (lnet_peer_is_alive(lp, now))
770 return 1;
771
772 lnet_notify_locked(lp, 0, 0, lp->lp_last_alive);
773 return 0;
774}
775
776int
777lnet_post_send_locked(lnet_msg_t *msg, int do_send)
778{
779
780
781
782
783 struct lnet_peer *lp = msg->msg_txpeer;
784 struct lnet_ni *ni = lp->lp_ni;
785 struct lnet_tx_queue *tq;
786 int cpt;
787
788
789 LASSERT(!do_send || msg->msg_tx_delayed);
790 LASSERT(!msg->msg_receiving);
791 LASSERT(msg->msg_tx_committed);
792
793 cpt = msg->msg_tx_cpt;
794 tq = ni->ni_tx_queues[cpt];
795
796
797 if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
798 lnet_peer_alive_locked(lp) == 0) {
799 the_lnet.ln_counters[cpt]->drop_count++;
800 the_lnet.ln_counters[cpt]->drop_length += msg->msg_len;
801 lnet_net_unlock(cpt);
802
803 CNETERR("Dropping message for %s: peer not alive\n",
804 libcfs_id2str(msg->msg_target));
805 if (do_send)
806 lnet_finalize(ni, msg, -EHOSTUNREACH);
807
808 lnet_net_lock(cpt);
809 return EHOSTUNREACH;
810 }
811
812 if (!msg->msg_peertxcredit) {
813 LASSERT((lp->lp_txcredits < 0) ==
814 !list_empty(&lp->lp_txq));
815
816 msg->msg_peertxcredit = 1;
817 lp->lp_txqnob += msg->msg_len + sizeof(lnet_hdr_t);
818 lp->lp_txcredits--;
819
820 if (lp->lp_txcredits < lp->lp_mintxcredits)
821 lp->lp_mintxcredits = lp->lp_txcredits;
822
823 if (lp->lp_txcredits < 0) {
824 msg->msg_tx_delayed = 1;
825 list_add_tail(&msg->msg_list, &lp->lp_txq);
826 return EAGAIN;
827 }
828 }
829
830 if (!msg->msg_txcredit) {
831 LASSERT((tq->tq_credits < 0) ==
832 !list_empty(&tq->tq_delayed));
833
834 msg->msg_txcredit = 1;
835 tq->tq_credits--;
836
837 if (tq->tq_credits < tq->tq_credits_min)
838 tq->tq_credits_min = tq->tq_credits;
839
840 if (tq->tq_credits < 0) {
841 msg->msg_tx_delayed = 1;
842 list_add_tail(&msg->msg_list, &tq->tq_delayed);
843 return EAGAIN;
844 }
845 }
846
847 if (do_send) {
848 lnet_net_unlock(cpt);
849 lnet_ni_send(ni, msg);
850 lnet_net_lock(cpt);
851 }
852 return 0;
853}
854
855
856lnet_rtrbufpool_t *
857lnet_msg2bufpool(lnet_msg_t *msg)
858{
859 lnet_rtrbufpool_t *rbp;
860 int cpt;
861
862 LASSERT(msg->msg_rx_committed);
863
864 cpt = msg->msg_rx_cpt;
865 rbp = &the_lnet.ln_rtrpools[cpt][0];
866
867 LASSERT(msg->msg_len <= LNET_MTU);
868 while (msg->msg_len > (unsigned int)rbp->rbp_npages * PAGE_CACHE_SIZE) {
869 rbp++;
870 LASSERT(rbp < &the_lnet.ln_rtrpools[cpt][LNET_NRBPOOLS]);
871 }
872
873 return rbp;
874}
875
876int
877lnet_post_routed_recv_locked(lnet_msg_t *msg, int do_recv)
878{
879
880
881
882 lnet_peer_t *lp = msg->msg_rxpeer;
883 lnet_rtrbufpool_t *rbp;
884 lnet_rtrbuf_t *rb;
885
886 LASSERT(msg->msg_iov == NULL);
887 LASSERT(msg->msg_kiov == NULL);
888 LASSERT(msg->msg_niov == 0);
889 LASSERT(msg->msg_routing);
890 LASSERT(msg->msg_receiving);
891 LASSERT(!msg->msg_sending);
892
893
894 LASSERT(!do_recv || msg->msg_rx_delayed);
895
896 if (!msg->msg_peerrtrcredit) {
897 LASSERT((lp->lp_rtrcredits < 0) ==
898 !list_empty(&lp->lp_rtrq));
899
900 msg->msg_peerrtrcredit = 1;
901 lp->lp_rtrcredits--;
902 if (lp->lp_rtrcredits < lp->lp_minrtrcredits)
903 lp->lp_minrtrcredits = lp->lp_rtrcredits;
904
905 if (lp->lp_rtrcredits < 0) {
906
907 LASSERT(msg->msg_rx_ready_delay);
908 msg->msg_rx_delayed = 1;
909 list_add_tail(&msg->msg_list, &lp->lp_rtrq);
910 return EAGAIN;
911 }
912 }
913
914 rbp = lnet_msg2bufpool(msg);
915
916 if (!msg->msg_rtrcredit) {
917 LASSERT((rbp->rbp_credits < 0) ==
918 !list_empty(&rbp->rbp_msgs));
919
920 msg->msg_rtrcredit = 1;
921 rbp->rbp_credits--;
922 if (rbp->rbp_credits < rbp->rbp_mincredits)
923 rbp->rbp_mincredits = rbp->rbp_credits;
924
925 if (rbp->rbp_credits < 0) {
926
927 LASSERT(msg->msg_rx_ready_delay);
928 msg->msg_rx_delayed = 1;
929 list_add_tail(&msg->msg_list, &rbp->rbp_msgs);
930 return EAGAIN;
931 }
932 }
933
934 LASSERT(!list_empty(&rbp->rbp_bufs));
935 rb = list_entry(rbp->rbp_bufs.next, lnet_rtrbuf_t, rb_list);
936 list_del(&rb->rb_list);
937
938 msg->msg_niov = rbp->rbp_npages;
939 msg->msg_kiov = &rb->rb_kiov[0];
940
941 if (do_recv) {
942 int cpt = msg->msg_rx_cpt;
943
944 lnet_net_unlock(cpt);
945 lnet_ni_recv(lp->lp_ni, msg->msg_private, msg, 1,
946 0, msg->msg_len, msg->msg_len);
947 lnet_net_lock(cpt);
948 }
949 return 0;
950}
951
952void
953lnet_return_tx_credits_locked(lnet_msg_t *msg)
954{
955 lnet_peer_t *txpeer = msg->msg_txpeer;
956 lnet_msg_t *msg2;
957
958 if (msg->msg_txcredit) {
959 struct lnet_ni *ni = txpeer->lp_ni;
960 struct lnet_tx_queue *tq = ni->ni_tx_queues[msg->msg_tx_cpt];
961
962
963 msg->msg_txcredit = 0;
964
965 LASSERT((tq->tq_credits < 0) ==
966 !list_empty(&tq->tq_delayed));
967
968 tq->tq_credits++;
969 if (tq->tq_credits <= 0) {
970 msg2 = list_entry(tq->tq_delayed.next,
971 lnet_msg_t, msg_list);
972 list_del(&msg2->msg_list);
973
974 LASSERT(msg2->msg_txpeer->lp_ni == ni);
975 LASSERT(msg2->msg_tx_delayed);
976
977 (void) lnet_post_send_locked(msg2, 1);
978 }
979 }
980
981 if (msg->msg_peertxcredit) {
982
983 msg->msg_peertxcredit = 0;
984
985 LASSERT((txpeer->lp_txcredits < 0) ==
986 !list_empty(&txpeer->lp_txq));
987
988 txpeer->lp_txqnob -= msg->msg_len + sizeof(lnet_hdr_t);
989 LASSERT(txpeer->lp_txqnob >= 0);
990
991 txpeer->lp_txcredits++;
992 if (txpeer->lp_txcredits <= 0) {
993 msg2 = list_entry(txpeer->lp_txq.next,
994 lnet_msg_t, msg_list);
995 list_del(&msg2->msg_list);
996
997 LASSERT(msg2->msg_txpeer == txpeer);
998 LASSERT(msg2->msg_tx_delayed);
999
1000 (void) lnet_post_send_locked(msg2, 1);
1001 }
1002 }
1003
1004 if (txpeer != NULL) {
1005 msg->msg_txpeer = NULL;
1006 lnet_peer_decref_locked(txpeer);
1007 }
1008}
1009
1010void
1011lnet_return_rx_credits_locked(lnet_msg_t *msg)
1012{
1013 lnet_peer_t *rxpeer = msg->msg_rxpeer;
1014 lnet_msg_t *msg2;
1015
1016 if (msg->msg_rtrcredit) {
1017
1018 lnet_rtrbuf_t *rb;
1019 lnet_rtrbufpool_t *rbp;
1020
1021
1022
1023
1024 LASSERT(msg->msg_kiov != NULL);
1025
1026 rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]);
1027 rbp = rb->rb_pool;
1028 LASSERT(rbp == lnet_msg2bufpool(msg));
1029
1030 msg->msg_kiov = NULL;
1031 msg->msg_rtrcredit = 0;
1032
1033 LASSERT((rbp->rbp_credits < 0) ==
1034 !list_empty(&rbp->rbp_msgs));
1035 LASSERT((rbp->rbp_credits > 0) ==
1036 !list_empty(&rbp->rbp_bufs));
1037
1038 list_add(&rb->rb_list, &rbp->rbp_bufs);
1039 rbp->rbp_credits++;
1040 if (rbp->rbp_credits <= 0) {
1041 msg2 = list_entry(rbp->rbp_msgs.next,
1042 lnet_msg_t, msg_list);
1043 list_del(&msg2->msg_list);
1044
1045 (void) lnet_post_routed_recv_locked(msg2, 1);
1046 }
1047 }
1048
1049 if (msg->msg_peerrtrcredit) {
1050
1051 msg->msg_peerrtrcredit = 0;
1052
1053 LASSERT((rxpeer->lp_rtrcredits < 0) ==
1054 !list_empty(&rxpeer->lp_rtrq));
1055
1056 rxpeer->lp_rtrcredits++;
1057 if (rxpeer->lp_rtrcredits <= 0) {
1058 msg2 = list_entry(rxpeer->lp_rtrq.next,
1059 lnet_msg_t, msg_list);
1060 list_del(&msg2->msg_list);
1061
1062 (void) lnet_post_routed_recv_locked(msg2, 1);
1063 }
1064 }
1065 if (rxpeer != NULL) {
1066 msg->msg_rxpeer = NULL;
1067 lnet_peer_decref_locked(rxpeer);
1068 }
1069}
1070
1071static int
1072lnet_compare_routes(lnet_route_t *r1, lnet_route_t *r2)
1073{
1074 lnet_peer_t *p1 = r1->lr_gateway;
1075 lnet_peer_t *p2 = r2->lr_gateway;
1076
1077 if (r1->lr_priority < r2->lr_priority)
1078 return 1;
1079
1080 if (r1->lr_priority > r2->lr_priority)
1081 return -1;
1082
1083 if (r1->lr_hops < r2->lr_hops)
1084 return 1;
1085
1086 if (r1->lr_hops > r2->lr_hops)
1087 return -1;
1088
1089 if (p1->lp_txqnob < p2->lp_txqnob)
1090 return 1;
1091
1092 if (p1->lp_txqnob > p2->lp_txqnob)
1093 return -1;
1094
1095 if (p1->lp_txcredits > p2->lp_txcredits)
1096 return 1;
1097
1098 if (p1->lp_txcredits < p2->lp_txcredits)
1099 return -1;
1100
1101 if (r1->lr_seq - r2->lr_seq <= 0)
1102 return 1;
1103
1104 return -1;
1105}
1106
1107static lnet_peer_t *
1108lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target, lnet_nid_t rtr_nid)
1109{
1110 lnet_remotenet_t *rnet;
1111 lnet_route_t *rtr;
1112 lnet_route_t *rtr_best;
1113 lnet_route_t *rtr_last;
1114 struct lnet_peer *lp_best;
1115 struct lnet_peer *lp;
1116 int rc;
1117
1118
1119
1120
1121 rnet = lnet_find_net_locked(LNET_NIDNET(target));
1122 if (rnet == NULL)
1123 return NULL;
1124
1125 lp_best = NULL;
1126 rtr_best = rtr_last = NULL;
1127 list_for_each_entry(rtr, &rnet->lrn_routes, lr_list) {
1128 lp = rtr->lr_gateway;
1129
1130 if (!lp->lp_alive ||
1131 ((lp->lp_ping_feats & LNET_PING_FEAT_NI_STATUS) != 0 &&
1132 rtr->lr_downis != 0))
1133 continue;
1134
1135 if (ni != NULL && lp->lp_ni != ni)
1136 continue;
1137
1138 if (lp->lp_nid == rtr_nid)
1139 return lp;
1140
1141 if (lp_best == NULL) {
1142 rtr_best = rtr_last = rtr;
1143 lp_best = lp;
1144 continue;
1145 }
1146
1147
1148 if (rtr_last->lr_seq - rtr->lr_seq < 0)
1149 rtr_last = rtr;
1150
1151 rc = lnet_compare_routes(rtr, rtr_best);
1152 if (rc < 0)
1153 continue;
1154
1155 rtr_best = rtr;
1156 lp_best = lp;
1157 }
1158
1159
1160
1161
1162 if (rtr_best != NULL)
1163 rtr_best->lr_seq = rtr_last->lr_seq + 1;
1164 return lp_best;
1165}
1166
1167int
1168lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
1169{
1170 lnet_nid_t dst_nid = msg->msg_target.nid;
1171 struct lnet_ni *src_ni;
1172 struct lnet_ni *local_ni;
1173 struct lnet_peer *lp;
1174 int cpt;
1175 int cpt2;
1176 int rc;
1177
1178
1179
1180
1181
1182 LASSERT(msg->msg_txpeer == NULL);
1183 LASSERT(!msg->msg_sending);
1184 LASSERT(!msg->msg_target_is_router);
1185 LASSERT(!msg->msg_receiving);
1186
1187 msg->msg_sending = 1;
1188
1189 LASSERT(!msg->msg_tx_committed);
1190 cpt = lnet_cpt_of_nid(rtr_nid == LNET_NID_ANY ? dst_nid : rtr_nid);
1191 again:
1192 lnet_net_lock(cpt);
1193
1194 if (the_lnet.ln_shutdown) {
1195 lnet_net_unlock(cpt);
1196 return -ESHUTDOWN;
1197 }
1198
1199 if (src_nid == LNET_NID_ANY) {
1200 src_ni = NULL;
1201 } else {
1202 src_ni = lnet_nid2ni_locked(src_nid, cpt);
1203 if (src_ni == NULL) {
1204 lnet_net_unlock(cpt);
1205 LCONSOLE_WARN("Can't send to %s: src %s is not a "
1206 "local nid\n", libcfs_nid2str(dst_nid),
1207 libcfs_nid2str(src_nid));
1208 return -EINVAL;
1209 }
1210 LASSERT(!msg->msg_routing);
1211 }
1212
1213
1214 local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid), cpt);
1215
1216 if (local_ni != NULL) {
1217 if (src_ni == NULL) {
1218 src_ni = local_ni;
1219 src_nid = src_ni->ni_nid;
1220 } else if (src_ni == local_ni) {
1221 lnet_ni_decref_locked(local_ni, cpt);
1222 } else {
1223 lnet_ni_decref_locked(local_ni, cpt);
1224 lnet_ni_decref_locked(src_ni, cpt);
1225 lnet_net_unlock(cpt);
1226 LCONSOLE_WARN("No route to %s via from %s\n",
1227 libcfs_nid2str(dst_nid),
1228 libcfs_nid2str(src_nid));
1229 return -EINVAL;
1230 }
1231
1232 LASSERT(src_nid != LNET_NID_ANY);
1233 lnet_msg_commit(msg, cpt);
1234
1235 if (!msg->msg_routing)
1236 msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1237
1238 if (src_ni == the_lnet.ln_loni) {
1239
1240 lnet_net_unlock(cpt);
1241 lnet_ni_send(src_ni, msg);
1242
1243 lnet_net_lock(cpt);
1244 lnet_ni_decref_locked(src_ni, cpt);
1245 lnet_net_unlock(cpt);
1246 return 0;
1247 }
1248
1249 rc = lnet_nid2peer_locked(&lp, dst_nid, cpt);
1250
1251 lnet_ni_decref_locked(src_ni, cpt);
1252 if (rc != 0) {
1253 lnet_net_unlock(cpt);
1254 LCONSOLE_WARN("Error %d finding peer %s\n", rc,
1255 libcfs_nid2str(dst_nid));
1256
1257 return rc;
1258 }
1259 LASSERT(lp->lp_ni == src_ni);
1260 } else {
1261
1262 lp = lnet_find_route_locked(src_ni, dst_nid, rtr_nid);
1263 if (lp == NULL) {
1264 if (src_ni != NULL)
1265 lnet_ni_decref_locked(src_ni, cpt);
1266 lnet_net_unlock(cpt);
1267
1268 LCONSOLE_WARN("No route to %s via %s "
1269 "(all routers down)\n",
1270 libcfs_id2str(msg->msg_target),
1271 libcfs_nid2str(src_nid));
1272 return -EHOSTUNREACH;
1273 }
1274
1275
1276
1277
1278
1279 if (rtr_nid != lp->lp_nid) {
1280 cpt2 = lnet_cpt_of_nid_locked(lp->lp_nid);
1281 if (cpt2 != cpt) {
1282 if (src_ni != NULL)
1283 lnet_ni_decref_locked(src_ni, cpt);
1284 lnet_net_unlock(cpt);
1285
1286 rtr_nid = lp->lp_nid;
1287 cpt = cpt2;
1288 goto again;
1289 }
1290 }
1291
1292 CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
1293 libcfs_nid2str(dst_nid), libcfs_nid2str(lp->lp_nid),
1294 lnet_msgtyp2str(msg->msg_type), msg->msg_len);
1295
1296 if (src_ni == NULL) {
1297 src_ni = lp->lp_ni;
1298 src_nid = src_ni->ni_nid;
1299 } else {
1300 LASSERT(src_ni == lp->lp_ni);
1301 lnet_ni_decref_locked(src_ni, cpt);
1302 }
1303
1304 lnet_peer_addref_locked(lp);
1305
1306 LASSERT(src_nid != LNET_NID_ANY);
1307 lnet_msg_commit(msg, cpt);
1308
1309 if (!msg->msg_routing) {
1310
1311 msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1312 }
1313
1314 msg->msg_target_is_router = 1;
1315 msg->msg_target.nid = lp->lp_nid;
1316 msg->msg_target.pid = LUSTRE_SRV_LNET_PID;
1317 }
1318
1319
1320
1321 LASSERT(!msg->msg_peertxcredit);
1322 LASSERT(!msg->msg_txcredit);
1323 LASSERT(msg->msg_txpeer == NULL);
1324
1325 msg->msg_txpeer = lp;
1326
1327 rc = lnet_post_send_locked(msg, 0);
1328 lnet_net_unlock(cpt);
1329
1330 if (rc == EHOSTUNREACH)
1331 return -EHOSTUNREACH;
1332
1333 if (rc == 0)
1334 lnet_ni_send(src_ni, msg);
1335
1336 return 0;
1337}
1338
1339static void
1340lnet_drop_message(lnet_ni_t *ni, int cpt, void *private, unsigned int nob)
1341{
1342 lnet_net_lock(cpt);
1343 the_lnet.ln_counters[cpt]->drop_count++;
1344 the_lnet.ln_counters[cpt]->drop_length += nob;
1345 lnet_net_unlock(cpt);
1346
1347 lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
1348}
1349
1350static void
1351lnet_recv_put(lnet_ni_t *ni, lnet_msg_t *msg)
1352{
1353 lnet_hdr_t *hdr = &msg->msg_hdr;
1354
1355 if (msg->msg_wanted != 0)
1356 lnet_setpayloadbuffer(msg);
1357
1358 lnet_build_msg_event(msg, LNET_EVENT_PUT);
1359
1360
1361
1362 msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
1363 (msg->msg_md->md_options & LNET_MD_ACK_DISABLE) == 0);
1364
1365 lnet_ni_recv(ni, msg->msg_private, msg, msg->msg_rx_delayed,
1366 msg->msg_offset, msg->msg_wanted, hdr->payload_length);
1367}
1368
1369static int
1370lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
1371{
1372 lnet_hdr_t *hdr = &msg->msg_hdr;
1373 struct lnet_match_info info;
1374 int rc;
1375
1376
1377 hdr->msg.put.match_bits = le64_to_cpu(hdr->msg.put.match_bits);
1378 hdr->msg.put.ptl_index = le32_to_cpu(hdr->msg.put.ptl_index);
1379 hdr->msg.put.offset = le32_to_cpu(hdr->msg.put.offset);
1380
1381 info.mi_id.nid = hdr->src_nid;
1382 info.mi_id.pid = hdr->src_pid;
1383 info.mi_opc = LNET_MD_OP_PUT;
1384 info.mi_portal = hdr->msg.put.ptl_index;
1385 info.mi_rlength = hdr->payload_length;
1386 info.mi_roffset = hdr->msg.put.offset;
1387 info.mi_mbits = hdr->msg.put.match_bits;
1388
1389 msg->msg_rx_ready_delay = ni->ni_lnd->lnd_eager_recv == NULL;
1390
1391 again:
1392 rc = lnet_ptl_match_md(&info, msg);
1393 switch (rc) {
1394 default:
1395 LBUG();
1396
1397 case LNET_MATCHMD_OK:
1398 lnet_recv_put(ni, msg);
1399 return 0;
1400
1401 case LNET_MATCHMD_NONE:
1402 if (msg->msg_rx_delayed)
1403 return 0;
1404
1405 rc = lnet_ni_eager_recv(ni, msg);
1406 if (rc == 0)
1407 goto again;
1408
1409
1410 case LNET_MATCHMD_DROP:
1411 CNETERR("Dropping PUT from %s portal %d match "LPU64
1412 " offset %d length %d: %d\n",
1413 libcfs_id2str(info.mi_id), info.mi_portal,
1414 info.mi_mbits, info.mi_roffset, info.mi_rlength, rc);
1415
1416 return ENOENT;
1417 }
1418}
1419
1420static int
1421lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
1422{
1423 struct lnet_match_info info;
1424 lnet_hdr_t *hdr = &msg->msg_hdr;
1425 lnet_handle_wire_t reply_wmd;
1426 int rc;
1427
1428
1429 hdr->msg.get.match_bits = le64_to_cpu(hdr->msg.get.match_bits);
1430 hdr->msg.get.ptl_index = le32_to_cpu(hdr->msg.get.ptl_index);
1431 hdr->msg.get.sink_length = le32_to_cpu(hdr->msg.get.sink_length);
1432 hdr->msg.get.src_offset = le32_to_cpu(hdr->msg.get.src_offset);
1433
1434 info.mi_id.nid = hdr->src_nid;
1435 info.mi_id.pid = hdr->src_pid;
1436 info.mi_opc = LNET_MD_OP_GET;
1437 info.mi_portal = hdr->msg.get.ptl_index;
1438 info.mi_rlength = hdr->msg.get.sink_length;
1439 info.mi_roffset = hdr->msg.get.src_offset;
1440 info.mi_mbits = hdr->msg.get.match_bits;
1441
1442 rc = lnet_ptl_match_md(&info, msg);
1443 if (rc == LNET_MATCHMD_DROP) {
1444 CNETERR("Dropping GET from %s portal %d match "LPU64
1445 " offset %d length %d\n",
1446 libcfs_id2str(info.mi_id), info.mi_portal,
1447 info.mi_mbits, info.mi_roffset, info.mi_rlength);
1448 return ENOENT;
1449 }
1450
1451 LASSERT(rc == LNET_MATCHMD_OK);
1452
1453 lnet_build_msg_event(msg, LNET_EVENT_GET);
1454
1455 reply_wmd = hdr->msg.get.return_wmd;
1456
1457 lnet_prep_send(msg, LNET_MSG_REPLY, info.mi_id,
1458 msg->msg_offset, msg->msg_wanted);
1459
1460 msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
1461
1462 if (rdma_get) {
1463
1464 lnet_ni_recv(ni, msg->msg_private, msg, 0,
1465 msg->msg_offset, msg->msg_len, msg->msg_len);
1466 return 0;
1467 }
1468
1469 lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
1470 msg->msg_receiving = 0;
1471
1472 rc = lnet_send(ni->ni_nid, msg, LNET_NID_ANY);
1473 if (rc < 0) {
1474
1475 CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
1476 libcfs_nid2str(ni->ni_nid),
1477 libcfs_id2str(info.mi_id), rc);
1478
1479 lnet_finalize(ni, msg, rc);
1480 }
1481
1482 return 0;
1483}
1484
1485static int
1486lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
1487{
1488 void *private = msg->msg_private;
1489 lnet_hdr_t *hdr = &msg->msg_hdr;
1490 lnet_process_id_t src = {0};
1491 lnet_libmd_t *md;
1492 int rlength;
1493 int mlength;
1494 int cpt;
1495
1496 cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie);
1497 lnet_res_lock(cpt);
1498
1499 src.nid = hdr->src_nid;
1500 src.pid = hdr->src_pid;
1501
1502
1503 md = lnet_wire_handle2md(&hdr->msg.reply.dst_wmd);
1504 if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
1505 CNETERR("%s: Dropping REPLY from %s for %s "
1506 "MD "LPX64"."LPX64"\n",
1507 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1508 (md == NULL) ? "invalid" : "inactive",
1509 hdr->msg.reply.dst_wmd.wh_interface_cookie,
1510 hdr->msg.reply.dst_wmd.wh_object_cookie);
1511 if (md != NULL && md->md_me != NULL)
1512 CERROR("REPLY MD also attached to portal %d\n",
1513 md->md_me->me_portal);
1514
1515 lnet_res_unlock(cpt);
1516 return ENOENT;
1517 }
1518
1519 LASSERT(md->md_offset == 0);
1520
1521 rlength = hdr->payload_length;
1522 mlength = MIN(rlength, (int)md->md_length);
1523
1524 if (mlength < rlength &&
1525 (md->md_options & LNET_MD_TRUNCATE) == 0) {
1526 CNETERR("%s: Dropping REPLY from %s length %d "
1527 "for MD "LPX64" would overflow (%d)\n",
1528 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1529 rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
1530 mlength);
1531 lnet_res_unlock(cpt);
1532 return ENOENT;
1533 }
1534
1535 CDEBUG(D_NET, "%s: Reply from %s of length %d/%d into md "LPX64"\n",
1536 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1537 mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
1538
1539 lnet_msg_attach_md(msg, md, 0, mlength);
1540
1541 if (mlength != 0)
1542 lnet_setpayloadbuffer(msg);
1543
1544 lnet_res_unlock(cpt);
1545
1546 lnet_build_msg_event(msg, LNET_EVENT_REPLY);
1547
1548 lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
1549 return 0;
1550}
1551
1552static int
1553lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
1554{
1555 lnet_hdr_t *hdr = &msg->msg_hdr;
1556 lnet_process_id_t src = {0};
1557 lnet_libmd_t *md;
1558 int cpt;
1559
1560 src.nid = hdr->src_nid;
1561 src.pid = hdr->src_pid;
1562
1563
1564 hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
1565 hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
1566
1567 cpt = lnet_cpt_of_cookie(hdr->msg.ack.dst_wmd.wh_object_cookie);
1568 lnet_res_lock(cpt);
1569
1570
1571 md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
1572 if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
1573
1574 CDEBUG(D_NET,
1575 "%s: Dropping ACK from %s to %s MD "LPX64"."LPX64"\n",
1576 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1577 (md == NULL) ? "invalid" : "inactive",
1578 hdr->msg.ack.dst_wmd.wh_interface_cookie,
1579 hdr->msg.ack.dst_wmd.wh_object_cookie);
1580 if (md != NULL && md->md_me != NULL)
1581 CERROR("Source MD also attached to portal %d\n",
1582 md->md_me->me_portal);
1583
1584 lnet_res_unlock(cpt);
1585 return ENOENT;
1586 }
1587
1588 CDEBUG(D_NET, "%s: ACK from %s into md "LPX64"\n",
1589 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1590 hdr->msg.ack.dst_wmd.wh_object_cookie);
1591
1592 lnet_msg_attach_md(msg, md, 0, 0);
1593
1594 lnet_res_unlock(cpt);
1595
1596 lnet_build_msg_event(msg, LNET_EVENT_ACK);
1597
1598 lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
1599 return 0;
1600}
1601
1602static int
1603lnet_parse_forward_locked(lnet_ni_t *ni, lnet_msg_t *msg)
1604{
1605 int rc = 0;
1606
1607 if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
1608 lnet_msg2bufpool(msg)->rbp_credits <= 0) {
1609 if (ni->ni_lnd->lnd_eager_recv == NULL) {
1610 msg->msg_rx_ready_delay = 1;
1611 } else {
1612 lnet_net_unlock(msg->msg_rx_cpt);
1613 rc = lnet_ni_eager_recv(ni, msg);
1614 lnet_net_lock(msg->msg_rx_cpt);
1615 }
1616 }
1617
1618 if (rc == 0)
1619 rc = lnet_post_routed_recv_locked(msg, 0);
1620 return rc;
1621}
1622
1623char *
1624lnet_msgtyp2str(int type)
1625{
1626 switch (type) {
1627 case LNET_MSG_ACK:
1628 return "ACK";
1629 case LNET_MSG_PUT:
1630 return "PUT";
1631 case LNET_MSG_GET:
1632 return "GET";
1633 case LNET_MSG_REPLY:
1634 return "REPLY";
1635 case LNET_MSG_HELLO:
1636 return "HELLO";
1637 default:
1638 return "<UNKNOWN>";
1639 }
1640}
1641EXPORT_SYMBOL(lnet_msgtyp2str);
1642
1643void
1644lnet_print_hdr(lnet_hdr_t *hdr)
1645{
1646 lnet_process_id_t src = {0};
1647 lnet_process_id_t dst = {0};
1648 char *type_str = lnet_msgtyp2str(hdr->type);
1649
1650 src.nid = hdr->src_nid;
1651 src.pid = hdr->src_pid;
1652
1653 dst.nid = hdr->dest_nid;
1654 dst.pid = hdr->dest_pid;
1655
1656 CWARN("P3 Header at %p of type %s\n", hdr, type_str);
1657 CWARN(" From %s\n", libcfs_id2str(src));
1658 CWARN(" To %s\n", libcfs_id2str(dst));
1659
1660 switch (hdr->type) {
1661 default:
1662 break;
1663
1664 case LNET_MSG_PUT:
1665 CWARN(" Ptl index %d, ack md "LPX64"."LPX64", "
1666 "match bits "LPU64"\n",
1667 hdr->msg.put.ptl_index,
1668 hdr->msg.put.ack_wmd.wh_interface_cookie,
1669 hdr->msg.put.ack_wmd.wh_object_cookie,
1670 hdr->msg.put.match_bits);
1671 CWARN(" Length %d, offset %d, hdr data "LPX64"\n",
1672 hdr->payload_length, hdr->msg.put.offset,
1673 hdr->msg.put.hdr_data);
1674 break;
1675
1676 case LNET_MSG_GET:
1677 CWARN(" Ptl index %d, return md "LPX64"."LPX64", "
1678 "match bits "LPU64"\n", hdr->msg.get.ptl_index,
1679 hdr->msg.get.return_wmd.wh_interface_cookie,
1680 hdr->msg.get.return_wmd.wh_object_cookie,
1681 hdr->msg.get.match_bits);
1682 CWARN(" Length %d, src offset %d\n",
1683 hdr->msg.get.sink_length,
1684 hdr->msg.get.src_offset);
1685 break;
1686
1687 case LNET_MSG_ACK:
1688 CWARN(" dst md "LPX64"."LPX64", "
1689 "manipulated length %d\n",
1690 hdr->msg.ack.dst_wmd.wh_interface_cookie,
1691 hdr->msg.ack.dst_wmd.wh_object_cookie,
1692 hdr->msg.ack.mlength);
1693 break;
1694
1695 case LNET_MSG_REPLY:
1696 CWARN(" dst md "LPX64"."LPX64", "
1697 "length %d\n",
1698 hdr->msg.reply.dst_wmd.wh_interface_cookie,
1699 hdr->msg.reply.dst_wmd.wh_object_cookie,
1700 hdr->payload_length);
1701 }
1702
1703}
1704
1705int
1706lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
1707 void *private, int rdma_req)
1708{
1709 int rc = 0;
1710 int cpt;
1711 int for_me;
1712 struct lnet_msg *msg;
1713 lnet_pid_t dest_pid;
1714 lnet_nid_t dest_nid;
1715 lnet_nid_t src_nid;
1716 __u32 payload_length;
1717 __u32 type;
1718
1719 LASSERT(!in_interrupt());
1720
1721 type = le32_to_cpu(hdr->type);
1722 src_nid = le64_to_cpu(hdr->src_nid);
1723 dest_nid = le64_to_cpu(hdr->dest_nid);
1724 dest_pid = le32_to_cpu(hdr->dest_pid);
1725 payload_length = le32_to_cpu(hdr->payload_length);
1726
1727 for_me = (ni->ni_nid == dest_nid);
1728 cpt = lnet_cpt_of_nid(from_nid);
1729
1730 switch (type) {
1731 case LNET_MSG_ACK:
1732 case LNET_MSG_GET:
1733 if (payload_length > 0) {
1734 CERROR("%s, src %s: bad %s payload %d (0 expected)\n",
1735 libcfs_nid2str(from_nid),
1736 libcfs_nid2str(src_nid),
1737 lnet_msgtyp2str(type), payload_length);
1738 return -EPROTO;
1739 }
1740 break;
1741
1742 case LNET_MSG_PUT:
1743 case LNET_MSG_REPLY:
1744 if (payload_length >
1745 (__u32)(for_me ? LNET_MAX_PAYLOAD : LNET_MTU)) {
1746 CERROR("%s, src %s: bad %s payload %d "
1747 "(%d max expected)\n",
1748 libcfs_nid2str(from_nid),
1749 libcfs_nid2str(src_nid),
1750 lnet_msgtyp2str(type),
1751 payload_length,
1752 for_me ? LNET_MAX_PAYLOAD : LNET_MTU);
1753 return -EPROTO;
1754 }
1755 break;
1756
1757 default:
1758 CERROR("%s, src %s: Bad message type 0x%x\n",
1759 libcfs_nid2str(from_nid),
1760 libcfs_nid2str(src_nid), type);
1761 return -EPROTO;
1762 }
1763
1764 if (the_lnet.ln_routing &&
1765 ni->ni_last_alive != cfs_time_current_sec()) {
1766 lnet_ni_lock(ni);
1767
1768
1769 ni->ni_last_alive = cfs_time_current_sec();
1770 if (ni->ni_status != NULL &&
1771 ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
1772 ni->ni_status->ns_status = LNET_NI_STATUS_UP;
1773 lnet_ni_unlock(ni);
1774 }
1775
1776
1777
1778
1779
1780 if (!for_me) {
1781 if (LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
1782
1783 CERROR("%s, src %s: Bad dest nid %s "
1784 "(should have been sent direct)\n",
1785 libcfs_nid2str(from_nid),
1786 libcfs_nid2str(src_nid),
1787 libcfs_nid2str(dest_nid));
1788 return -EPROTO;
1789 }
1790
1791 if (lnet_islocalnid(dest_nid)) {
1792
1793
1794 CERROR("%s, src %s: Bad dest nid %s "
1795 "(it's my nid but on a different network)\n",
1796 libcfs_nid2str(from_nid),
1797 libcfs_nid2str(src_nid),
1798 libcfs_nid2str(dest_nid));
1799 return -EPROTO;
1800 }
1801
1802 if (rdma_req && type == LNET_MSG_GET) {
1803 CERROR("%s, src %s: Bad optimized GET for %s "
1804 "(final destination must be me)\n",
1805 libcfs_nid2str(from_nid),
1806 libcfs_nid2str(src_nid),
1807 libcfs_nid2str(dest_nid));
1808 return -EPROTO;
1809 }
1810
1811 if (!the_lnet.ln_routing) {
1812 CERROR("%s, src %s: Dropping message for %s "
1813 "(routing not enabled)\n",
1814 libcfs_nid2str(from_nid),
1815 libcfs_nid2str(src_nid),
1816 libcfs_nid2str(dest_nid));
1817 goto drop;
1818 }
1819 }
1820
1821
1822
1823
1824 if (!list_empty(&the_lnet.ln_test_peers) &&
1825 fail_peer(src_nid, 0)) {
1826 CERROR("%s, src %s: Dropping %s to simulate failure\n",
1827 libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1828 lnet_msgtyp2str(type));
1829 goto drop;
1830 }
1831
1832 msg = lnet_msg_alloc();
1833 if (msg == NULL) {
1834 CERROR("%s, src %s: Dropping %s (out of memory)\n",
1835 libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1836 lnet_msgtyp2str(type));
1837 goto drop;
1838 }
1839
1840
1841
1842
1843
1844 msg->msg_type = type;
1845 msg->msg_private = private;
1846 msg->msg_receiving = 1;
1847 msg->msg_len = msg->msg_wanted = payload_length;
1848 msg->msg_offset = 0;
1849 msg->msg_hdr = *hdr;
1850
1851 msg->msg_from = from_nid;
1852 if (!for_me) {
1853 msg->msg_target.pid = dest_pid;
1854 msg->msg_target.nid = dest_nid;
1855 msg->msg_routing = 1;
1856
1857 } else {
1858
1859 msg->msg_hdr.type = type;
1860 msg->msg_hdr.src_nid = src_nid;
1861 msg->msg_hdr.src_pid = le32_to_cpu(msg->msg_hdr.src_pid);
1862 msg->msg_hdr.dest_nid = dest_nid;
1863 msg->msg_hdr.dest_pid = dest_pid;
1864 msg->msg_hdr.payload_length = payload_length;
1865 }
1866
1867 lnet_net_lock(cpt);
1868 rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid, cpt);
1869 if (rc != 0) {
1870 lnet_net_unlock(cpt);
1871 CERROR("%s, src %s: Dropping %s "
1872 "(error %d looking up sender)\n",
1873 libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1874 lnet_msgtyp2str(type), rc);
1875 lnet_msg_free(msg);
1876 goto drop;
1877 }
1878
1879 lnet_msg_commit(msg, cpt);
1880
1881 if (!for_me) {
1882 rc = lnet_parse_forward_locked(ni, msg);
1883 lnet_net_unlock(cpt);
1884
1885 if (rc < 0)
1886 goto free_drop;
1887 if (rc == 0) {
1888 lnet_ni_recv(ni, msg->msg_private, msg, 0,
1889 0, payload_length, payload_length);
1890 }
1891 return 0;
1892 }
1893
1894 lnet_net_unlock(cpt);
1895
1896 switch (type) {
1897 case LNET_MSG_ACK:
1898 rc = lnet_parse_ack(ni, msg);
1899 break;
1900 case LNET_MSG_PUT:
1901 rc = lnet_parse_put(ni, msg);
1902 break;
1903 case LNET_MSG_GET:
1904 rc = lnet_parse_get(ni, msg, rdma_req);
1905 break;
1906 case LNET_MSG_REPLY:
1907 rc = lnet_parse_reply(ni, msg);
1908 break;
1909 default:
1910 LASSERT(0);
1911 rc = -EPROTO;
1912 goto free_drop;
1913 }
1914
1915 if (rc == 0)
1916 return 0;
1917
1918 LASSERT(rc == ENOENT);
1919
1920 free_drop:
1921 LASSERT(msg->msg_md == NULL);
1922 lnet_finalize(ni, msg, rc);
1923
1924 drop:
1925 lnet_drop_message(ni, cpt, private, payload_length);
1926 return 0;
1927}
1928EXPORT_SYMBOL(lnet_parse);
1929
1930void
1931lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
1932{
1933 while (!list_empty(head)) {
1934 lnet_process_id_t id = {0};
1935 lnet_msg_t *msg;
1936
1937 msg = list_entry(head->next, lnet_msg_t, msg_list);
1938 list_del(&msg->msg_list);
1939
1940 id.nid = msg->msg_hdr.src_nid;
1941 id.pid = msg->msg_hdr.src_pid;
1942
1943 LASSERT(msg->msg_md == NULL);
1944 LASSERT(msg->msg_rx_delayed);
1945 LASSERT(msg->msg_rxpeer != NULL);
1946 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
1947
1948 CWARN("Dropping delayed PUT from %s portal %d match "LPU64
1949 " offset %d length %d: %s\n",
1950 libcfs_id2str(id),
1951 msg->msg_hdr.msg.put.ptl_index,
1952 msg->msg_hdr.msg.put.match_bits,
1953 msg->msg_hdr.msg.put.offset,
1954 msg->msg_hdr.payload_length, reason);
1955
1956
1957
1958
1959
1960 lnet_drop_message(msg->msg_rxpeer->lp_ni,
1961 msg->msg_rxpeer->lp_cpt,
1962 msg->msg_private, msg->msg_len);
1963
1964
1965
1966
1967
1968 lnet_finalize(msg->msg_rxpeer->lp_ni, msg, -ENOENT);
1969 }
1970}
1971
1972void
1973lnet_recv_delayed_msg_list(struct list_head *head)
1974{
1975 while (!list_empty(head)) {
1976 lnet_msg_t *msg;
1977 lnet_process_id_t id;
1978
1979 msg = list_entry(head->next, lnet_msg_t, msg_list);
1980 list_del(&msg->msg_list);
1981
1982
1983
1984
1985 id.nid = msg->msg_hdr.src_nid;
1986 id.pid = msg->msg_hdr.src_pid;
1987
1988 LASSERT(msg->msg_rx_delayed);
1989 LASSERT(msg->msg_md != NULL);
1990 LASSERT(msg->msg_rxpeer != NULL);
1991 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
1992
1993 CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d "
1994 "match "LPU64" offset %d length %d.\n",
1995 libcfs_id2str(id), msg->msg_hdr.msg.put.ptl_index,
1996 msg->msg_hdr.msg.put.match_bits,
1997 msg->msg_hdr.msg.put.offset,
1998 msg->msg_hdr.payload_length);
1999
2000 lnet_recv_put(msg->msg_rxpeer->lp_ni, msg);
2001 }
2002}
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048int
2049LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
2050 lnet_process_id_t target, unsigned int portal,
2051 __u64 match_bits, unsigned int offset,
2052 __u64 hdr_data)
2053{
2054 struct lnet_msg *msg;
2055 struct lnet_libmd *md;
2056 int cpt;
2057 int rc;
2058
2059 LASSERT(the_lnet.ln_init);
2060 LASSERT(the_lnet.ln_refcount > 0);
2061
2062 if (!list_empty(&the_lnet.ln_test_peers) &&
2063 fail_peer(target.nid, 1)) {
2064 CERROR("Dropping PUT to %s: simulated failure\n",
2065 libcfs_id2str(target));
2066 return -EIO;
2067 }
2068
2069 msg = lnet_msg_alloc();
2070 if (msg == NULL) {
2071 CERROR("Dropping PUT to %s: ENOMEM on lnet_msg_t\n",
2072 libcfs_id2str(target));
2073 return -ENOMEM;
2074 }
2075 msg->msg_vmflush = !!memory_pressure_get();
2076
2077 cpt = lnet_cpt_of_cookie(mdh.cookie);
2078 lnet_res_lock(cpt);
2079
2080 md = lnet_handle2md(&mdh);
2081 if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
2082 CERROR("Dropping PUT ("LPU64":%d:%s): MD (%d) invalid\n",
2083 match_bits, portal, libcfs_id2str(target),
2084 md == NULL ? -1 : md->md_threshold);
2085 if (md != NULL && md->md_me != NULL)
2086 CERROR("Source MD also attached to portal %d\n",
2087 md->md_me->me_portal);
2088 lnet_res_unlock(cpt);
2089
2090 lnet_msg_free(msg);
2091 return -ENOENT;
2092 }
2093
2094 CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
2095
2096 lnet_msg_attach_md(msg, md, 0, 0);
2097
2098 lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
2099
2100 msg->msg_hdr.msg.put.match_bits = cpu_to_le64(match_bits);
2101 msg->msg_hdr.msg.put.ptl_index = cpu_to_le32(portal);
2102 msg->msg_hdr.msg.put.offset = cpu_to_le32(offset);
2103 msg->msg_hdr.msg.put.hdr_data = hdr_data;
2104
2105
2106 if (ack == LNET_ACK_REQ) {
2107 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
2108 the_lnet.ln_interface_cookie;
2109 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
2110 md->md_lh.lh_cookie;
2111 } else {
2112 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
2113 LNET_WIRE_HANDLE_COOKIE_NONE;
2114 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
2115 LNET_WIRE_HANDLE_COOKIE_NONE;
2116 }
2117
2118 lnet_res_unlock(cpt);
2119
2120 lnet_build_msg_event(msg, LNET_EVENT_SEND);
2121
2122 rc = lnet_send(self, msg, LNET_NID_ANY);
2123 if (rc != 0) {
2124 CNETERR("Error sending PUT to %s: %d\n",
2125 libcfs_id2str(target), rc);
2126 lnet_finalize(NULL, msg, rc);
2127 }
2128
2129
2130 return 0;
2131}
2132EXPORT_SYMBOL(LNetPut);
2133
2134lnet_msg_t *
2135lnet_create_reply_msg(lnet_ni_t *ni, lnet_msg_t *getmsg)
2136{
2137
2138
2139
2140
2141
2142
2143
2144 struct lnet_msg *msg = lnet_msg_alloc();
2145 struct lnet_libmd *getmd = getmsg->msg_md;
2146 lnet_process_id_t peer_id = getmsg->msg_target;
2147 int cpt;
2148
2149 LASSERT(!getmsg->msg_target_is_router);
2150 LASSERT(!getmsg->msg_routing);
2151
2152 cpt = lnet_cpt_of_cookie(getmd->md_lh.lh_cookie);
2153 lnet_res_lock(cpt);
2154
2155 LASSERT(getmd->md_refcount > 0);
2156
2157 if (msg == NULL) {
2158 CERROR("%s: Dropping REPLY from %s: can't allocate msg\n",
2159 libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id));
2160 goto drop;
2161 }
2162
2163 if (getmd->md_threshold == 0) {
2164 CERROR("%s: Dropping REPLY from %s for inactive MD %p\n",
2165 libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id),
2166 getmd);
2167 lnet_res_unlock(cpt);
2168 goto drop;
2169 }
2170
2171 LASSERT(getmd->md_offset == 0);
2172
2173 CDEBUG(D_NET, "%s: Reply from %s md %p\n",
2174 libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
2175
2176
2177 msg->msg_from = peer_id.nid;
2178 msg->msg_type = LNET_MSG_GET;
2179 msg->msg_hdr.src_nid = peer_id.nid;
2180 msg->msg_hdr.payload_length = getmd->md_length;
2181 msg->msg_receiving = 1;
2182
2183 lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
2184 lnet_res_unlock(cpt);
2185
2186 cpt = lnet_cpt_of_nid(peer_id.nid);
2187
2188 lnet_net_lock(cpt);
2189 lnet_msg_commit(msg, cpt);
2190 lnet_net_unlock(cpt);
2191
2192 lnet_build_msg_event(msg, LNET_EVENT_REPLY);
2193
2194 return msg;
2195
2196 drop:
2197 cpt = lnet_cpt_of_nid(peer_id.nid);
2198
2199 lnet_net_lock(cpt);
2200 the_lnet.ln_counters[cpt]->drop_count++;
2201 the_lnet.ln_counters[cpt]->drop_length += getmd->md_length;
2202 lnet_net_unlock(cpt);
2203
2204 if (msg != NULL)
2205 lnet_msg_free(msg);
2206
2207 return NULL;
2208}
2209EXPORT_SYMBOL(lnet_create_reply_msg);
2210
2211void
2212lnet_set_reply_msg_len(lnet_ni_t *ni, lnet_msg_t *reply, unsigned int len)
2213{
2214
2215
2216 LASSERT(reply != NULL);
2217 LASSERT(reply->msg_type == LNET_MSG_GET);
2218 LASSERT(reply->msg_ev.type == LNET_EVENT_REPLY);
2219
2220
2221
2222 LASSERT(len <= reply->msg_ev.mlength);
2223
2224 reply->msg_ev.mlength = len;
2225}
2226EXPORT_SYMBOL(lnet_set_reply_msg_len);
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249int
2250LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
2251 lnet_process_id_t target, unsigned int portal,
2252 __u64 match_bits, unsigned int offset)
2253{
2254 struct lnet_msg *msg;
2255 struct lnet_libmd *md;
2256 int cpt;
2257 int rc;
2258
2259 LASSERT(the_lnet.ln_init);
2260 LASSERT(the_lnet.ln_refcount > 0);
2261
2262 if (!list_empty(&the_lnet.ln_test_peers) &&
2263 fail_peer(target.nid, 1)) {
2264 CERROR("Dropping GET to %s: simulated failure\n",
2265 libcfs_id2str(target));
2266 return -EIO;
2267 }
2268
2269 msg = lnet_msg_alloc();
2270 if (msg == NULL) {
2271 CERROR("Dropping GET to %s: ENOMEM on lnet_msg_t\n",
2272 libcfs_id2str(target));
2273 return -ENOMEM;
2274 }
2275
2276 cpt = lnet_cpt_of_cookie(mdh.cookie);
2277 lnet_res_lock(cpt);
2278
2279 md = lnet_handle2md(&mdh);
2280 if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
2281 CERROR("Dropping GET ("LPU64":%d:%s): MD (%d) invalid\n",
2282 match_bits, portal, libcfs_id2str(target),
2283 md == NULL ? -1 : md->md_threshold);
2284 if (md != NULL && md->md_me != NULL)
2285 CERROR("REPLY MD also attached to portal %d\n",
2286 md->md_me->me_portal);
2287
2288 lnet_res_unlock(cpt);
2289
2290 lnet_msg_free(msg);
2291
2292 return -ENOENT;
2293 }
2294
2295 CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
2296
2297 lnet_msg_attach_md(msg, md, 0, 0);
2298
2299 lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
2300
2301 msg->msg_hdr.msg.get.match_bits = cpu_to_le64(match_bits);
2302 msg->msg_hdr.msg.get.ptl_index = cpu_to_le32(portal);
2303 msg->msg_hdr.msg.get.src_offset = cpu_to_le32(offset);
2304 msg->msg_hdr.msg.get.sink_length = cpu_to_le32(md->md_length);
2305
2306
2307 msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie =
2308 the_lnet.ln_interface_cookie;
2309 msg->msg_hdr.msg.get.return_wmd.wh_object_cookie =
2310 md->md_lh.lh_cookie;
2311
2312 lnet_res_unlock(cpt);
2313
2314 lnet_build_msg_event(msg, LNET_EVENT_SEND);
2315
2316 rc = lnet_send(self, msg, LNET_NID_ANY);
2317 if (rc < 0) {
2318 CNETERR("Error sending GET to %s: %d\n",
2319 libcfs_id2str(target), rc);
2320 lnet_finalize(NULL, msg, rc);
2321 }
2322
2323
2324 return 0;
2325}
2326EXPORT_SYMBOL(LNetGet);
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342int
2343LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
2344{
2345 struct list_head *e;
2346 struct lnet_ni *ni;
2347 lnet_remotenet_t *rnet;
2348 __u32 dstnet = LNET_NIDNET(dstnid);
2349 int hops;
2350 int cpt;
2351 __u32 order = 2;
2352 struct list_head *rn_list;
2353
2354
2355
2356
2357
2358
2359 LASSERT(the_lnet.ln_init);
2360 LASSERT(the_lnet.ln_refcount > 0);
2361
2362 cpt = lnet_net_lock_current();
2363
2364 list_for_each(e, &the_lnet.ln_nis) {
2365 ni = list_entry(e, lnet_ni_t, ni_list);
2366
2367 if (ni->ni_nid == dstnid) {
2368 if (srcnidp != NULL)
2369 *srcnidp = dstnid;
2370 if (orderp != NULL) {
2371 if (LNET_NETTYP(LNET_NIDNET(dstnid)) == LOLND)
2372 *orderp = 0;
2373 else
2374 *orderp = 1;
2375 }
2376 lnet_net_unlock(cpt);
2377
2378 return local_nid_dist_zero ? 0 : 1;
2379 }
2380
2381 if (LNET_NIDNET(ni->ni_nid) == dstnet) {
2382 if (srcnidp != NULL)
2383 *srcnidp = ni->ni_nid;
2384 if (orderp != NULL)
2385 *orderp = order;
2386 lnet_net_unlock(cpt);
2387 return 1;
2388 }
2389
2390 order++;
2391 }
2392
2393 rn_list = lnet_net2rnethash(dstnet);
2394 list_for_each(e, rn_list) {
2395 rnet = list_entry(e, lnet_remotenet_t, lrn_list);
2396
2397 if (rnet->lrn_net == dstnet) {
2398 lnet_route_t *route;
2399 lnet_route_t *shortest = NULL;
2400
2401 LASSERT(!list_empty(&rnet->lrn_routes));
2402
2403 list_for_each_entry(route, &rnet->lrn_routes,
2404 lr_list) {
2405 if (shortest == NULL ||
2406 route->lr_hops < shortest->lr_hops)
2407 shortest = route;
2408 }
2409
2410 LASSERT(shortest != NULL);
2411 hops = shortest->lr_hops;
2412 if (srcnidp != NULL)
2413 *srcnidp = shortest->lr_gateway->lp_ni->ni_nid;
2414 if (orderp != NULL)
2415 *orderp = order;
2416 lnet_net_unlock(cpt);
2417 return hops + 1;
2418 }
2419 order++;
2420 }
2421
2422 lnet_net_unlock(cpt);
2423 return -EHOSTUNREACH;
2424}
2425EXPORT_SYMBOL(LNetDist);
2426
2427
2428
2429
2430
2431
2432
2433
2434
2435
2436
2437
2438
2439
2440
2441
2442
2443
2444int
2445LNetSetAsync(lnet_process_id_t id, int nasync)
2446{
2447 return 0;
2448}
2449EXPORT_SYMBOL(LNetSetAsync);
2450