1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41#define DEBUG_SUBSYSTEM S_LNET
42
43#include "../../include/linux/lnet/lib-lnet.h"
44
45static int local_nid_dist_zero = 1;
46module_param(local_nid_dist_zero, int, 0444);
47MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
48
49int
50lnet_fail_nid(lnet_nid_t nid, unsigned int threshold)
51{
52 lnet_test_peer_t *tp;
53 struct list_head *el;
54 struct list_head *next;
55 struct list_head cull;
56
57 LASSERT(the_lnet.ln_init);
58
59
60 if (threshold != 0) {
61
62 LIBCFS_ALLOC(tp, sizeof(*tp));
63 if (tp == NULL)
64 return -ENOMEM;
65
66 tp->tp_nid = nid;
67 tp->tp_threshold = threshold;
68
69 lnet_net_lock(0);
70 list_add_tail(&tp->tp_list, &the_lnet.ln_test_peers);
71 lnet_net_unlock(0);
72 return 0;
73 }
74
75
76 INIT_LIST_HEAD(&cull);
77
78 lnet_net_lock(0);
79
80 list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
81 tp = list_entry(el, lnet_test_peer_t, tp_list);
82
83 if (tp->tp_threshold == 0 ||
84 nid == LNET_NID_ANY ||
85 tp->tp_nid == nid) {
86 list_del(&tp->tp_list);
87 list_add(&tp->tp_list, &cull);
88 }
89 }
90
91 lnet_net_unlock(0);
92
93 while (!list_empty(&cull)) {
94 tp = list_entry(cull.next, lnet_test_peer_t, tp_list);
95
96 list_del(&tp->tp_list);
97 LIBCFS_FREE(tp, sizeof(*tp));
98 }
99 return 0;
100}
101
102static int
103fail_peer(lnet_nid_t nid, int outgoing)
104{
105 lnet_test_peer_t *tp;
106 struct list_head *el;
107 struct list_head *next;
108 struct list_head cull;
109 int fail = 0;
110
111 INIT_LIST_HEAD(&cull);
112
113
114 lnet_net_lock(0);
115
116 list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
117 tp = list_entry(el, lnet_test_peer_t, tp_list);
118
119 if (tp->tp_threshold == 0) {
120
121 if (outgoing) {
122
123
124
125 list_del(&tp->tp_list);
126 list_add(&tp->tp_list, &cull);
127 }
128 continue;
129 }
130
131 if (tp->tp_nid == LNET_NID_ANY ||
132 nid == tp->tp_nid) {
133 fail = 1;
134
135 if (tp->tp_threshold != LNET_MD_THRESH_INF) {
136 tp->tp_threshold--;
137 if (outgoing &&
138 tp->tp_threshold == 0) {
139
140 list_del(&tp->tp_list);
141 list_add(&tp->tp_list, &cull);
142 }
143 }
144 break;
145 }
146 }
147
148 lnet_net_unlock(0);
149
150 while (!list_empty(&cull)) {
151 tp = list_entry(cull.next, lnet_test_peer_t, tp_list);
152 list_del(&tp->tp_list);
153
154 LIBCFS_FREE(tp, sizeof(*tp));
155 }
156
157 return fail;
158}
159
160unsigned int
161lnet_iov_nob(unsigned int niov, struct kvec *iov)
162{
163 unsigned int nob = 0;
164
165 while (niov-- > 0)
166 nob += (iov++)->iov_len;
167
168 return nob;
169}
170EXPORT_SYMBOL(lnet_iov_nob);
171
172void
173lnet_copy_iov2iov(unsigned int ndiov, struct kvec *diov, unsigned int doffset,
174 unsigned int nsiov, struct kvec *siov, unsigned int soffset,
175 unsigned int nob)
176{
177
178 unsigned int this_nob;
179
180 if (nob == 0)
181 return;
182
183
184 LASSERT(ndiov > 0);
185 while (doffset >= diov->iov_len) {
186 doffset -= diov->iov_len;
187 diov++;
188 ndiov--;
189 LASSERT(ndiov > 0);
190 }
191
192
193 LASSERT(nsiov > 0);
194 while (soffset >= siov->iov_len) {
195 soffset -= siov->iov_len;
196 siov++;
197 nsiov--;
198 LASSERT(nsiov > 0);
199 }
200
201 do {
202 LASSERT(ndiov > 0);
203 LASSERT(nsiov > 0);
204 this_nob = min(diov->iov_len - doffset,
205 siov->iov_len - soffset);
206 this_nob = min(this_nob, nob);
207
208 memcpy((char *)diov->iov_base + doffset,
209 (char *)siov->iov_base + soffset, this_nob);
210 nob -= this_nob;
211
212 if (diov->iov_len > doffset + this_nob) {
213 doffset += this_nob;
214 } else {
215 diov++;
216 ndiov--;
217 doffset = 0;
218 }
219
220 if (siov->iov_len > soffset + this_nob) {
221 soffset += this_nob;
222 } else {
223 siov++;
224 nsiov--;
225 soffset = 0;
226 }
227 } while (nob > 0);
228}
229EXPORT_SYMBOL(lnet_copy_iov2iov);
230
231int
232lnet_extract_iov(int dst_niov, struct kvec *dst,
233 int src_niov, struct kvec *src,
234 unsigned int offset, unsigned int len)
235{
236
237
238
239 unsigned int frag_len;
240 unsigned int niov;
241
242 if (len == 0)
243 return 0;
244
245 LASSERT(src_niov > 0);
246 while (offset >= src->iov_len) {
247 offset -= src->iov_len;
248 src_niov--;
249 src++;
250 LASSERT(src_niov > 0);
251 }
252
253 niov = 1;
254 for (;;) {
255 LASSERT(src_niov > 0);
256 LASSERT((int)niov <= dst_niov);
257
258 frag_len = src->iov_len - offset;
259 dst->iov_base = ((char *)src->iov_base) + offset;
260
261 if (len <= frag_len) {
262 dst->iov_len = len;
263 return niov;
264 }
265
266 dst->iov_len = frag_len;
267
268 len -= frag_len;
269 dst++;
270 src++;
271 niov++;
272 src_niov--;
273 offset = 0;
274 }
275}
276EXPORT_SYMBOL(lnet_extract_iov);
277
278
279unsigned int
280lnet_kiov_nob(unsigned int niov, lnet_kiov_t *kiov)
281{
282 unsigned int nob = 0;
283
284 while (niov-- > 0)
285 nob += (kiov++)->kiov_len;
286
287 return nob;
288}
289EXPORT_SYMBOL(lnet_kiov_nob);
290
291void
292lnet_copy_kiov2kiov(unsigned int ndiov, lnet_kiov_t *diov, unsigned int doffset,
293 unsigned int nsiov, lnet_kiov_t *siov, unsigned int soffset,
294 unsigned int nob)
295{
296
297 unsigned int this_nob;
298 char *daddr = NULL;
299 char *saddr = NULL;
300
301 if (nob == 0)
302 return;
303
304 LASSERT(!in_interrupt());
305
306 LASSERT(ndiov > 0);
307 while (doffset >= diov->kiov_len) {
308 doffset -= diov->kiov_len;
309 diov++;
310 ndiov--;
311 LASSERT(ndiov > 0);
312 }
313
314 LASSERT(nsiov > 0);
315 while (soffset >= siov->kiov_len) {
316 soffset -= siov->kiov_len;
317 siov++;
318 nsiov--;
319 LASSERT(nsiov > 0);
320 }
321
322 do {
323 LASSERT(ndiov > 0);
324 LASSERT(nsiov > 0);
325 this_nob = min(diov->kiov_len - doffset,
326 siov->kiov_len - soffset);
327 this_nob = min(this_nob, nob);
328
329 if (daddr == NULL)
330 daddr = ((char *)kmap(diov->kiov_page)) +
331 diov->kiov_offset + doffset;
332 if (saddr == NULL)
333 saddr = ((char *)kmap(siov->kiov_page)) +
334 siov->kiov_offset + soffset;
335
336
337
338
339
340 memcpy(daddr, saddr, this_nob);
341 nob -= this_nob;
342
343 if (diov->kiov_len > doffset + this_nob) {
344 daddr += this_nob;
345 doffset += this_nob;
346 } else {
347 kunmap(diov->kiov_page);
348 daddr = NULL;
349 diov++;
350 ndiov--;
351 doffset = 0;
352 }
353
354 if (siov->kiov_len > soffset + this_nob) {
355 saddr += this_nob;
356 soffset += this_nob;
357 } else {
358 kunmap(siov->kiov_page);
359 saddr = NULL;
360 siov++;
361 nsiov--;
362 soffset = 0;
363 }
364 } while (nob > 0);
365
366 if (daddr != NULL)
367 kunmap(diov->kiov_page);
368 if (saddr != NULL)
369 kunmap(siov->kiov_page);
370}
371EXPORT_SYMBOL(lnet_copy_kiov2kiov);
372
373void
374lnet_copy_kiov2iov(unsigned int niov, struct kvec *iov, unsigned int iovoffset,
375 unsigned int nkiov, lnet_kiov_t *kiov,
376 unsigned int kiovoffset, unsigned int nob)
377{
378
379 unsigned int this_nob;
380 char *addr = NULL;
381
382 if (nob == 0)
383 return;
384
385 LASSERT(!in_interrupt());
386
387 LASSERT(niov > 0);
388 while (iovoffset >= iov->iov_len) {
389 iovoffset -= iov->iov_len;
390 iov++;
391 niov--;
392 LASSERT(niov > 0);
393 }
394
395 LASSERT(nkiov > 0);
396 while (kiovoffset >= kiov->kiov_len) {
397 kiovoffset -= kiov->kiov_len;
398 kiov++;
399 nkiov--;
400 LASSERT(nkiov > 0);
401 }
402
403 do {
404 LASSERT(niov > 0);
405 LASSERT(nkiov > 0);
406 this_nob = min(iov->iov_len - iovoffset,
407 (__kernel_size_t) kiov->kiov_len - kiovoffset);
408 this_nob = min(this_nob, nob);
409
410 if (addr == NULL)
411 addr = ((char *)kmap(kiov->kiov_page)) +
412 kiov->kiov_offset + kiovoffset;
413
414 memcpy((char *)iov->iov_base + iovoffset, addr, this_nob);
415 nob -= this_nob;
416
417 if (iov->iov_len > iovoffset + this_nob) {
418 iovoffset += this_nob;
419 } else {
420 iov++;
421 niov--;
422 iovoffset = 0;
423 }
424
425 if (kiov->kiov_len > kiovoffset + this_nob) {
426 addr += this_nob;
427 kiovoffset += this_nob;
428 } else {
429 kunmap(kiov->kiov_page);
430 addr = NULL;
431 kiov++;
432 nkiov--;
433 kiovoffset = 0;
434 }
435
436 } while (nob > 0);
437
438 if (addr != NULL)
439 kunmap(kiov->kiov_page);
440}
441EXPORT_SYMBOL(lnet_copy_kiov2iov);
442
443void
444lnet_copy_iov2kiov(unsigned int nkiov, lnet_kiov_t *kiov,
445 unsigned int kiovoffset, unsigned int niov,
446 struct kvec *iov, unsigned int iovoffset,
447 unsigned int nob)
448{
449
450 unsigned int this_nob;
451 char *addr = NULL;
452
453 if (nob == 0)
454 return;
455
456 LASSERT(!in_interrupt());
457
458 LASSERT(nkiov > 0);
459 while (kiovoffset >= kiov->kiov_len) {
460 kiovoffset -= kiov->kiov_len;
461 kiov++;
462 nkiov--;
463 LASSERT(nkiov > 0);
464 }
465
466 LASSERT(niov > 0);
467 while (iovoffset >= iov->iov_len) {
468 iovoffset -= iov->iov_len;
469 iov++;
470 niov--;
471 LASSERT(niov > 0);
472 }
473
474 do {
475 LASSERT(nkiov > 0);
476 LASSERT(niov > 0);
477 this_nob = min((__kernel_size_t) kiov->kiov_len - kiovoffset,
478 iov->iov_len - iovoffset);
479 this_nob = min(this_nob, nob);
480
481 if (addr == NULL)
482 addr = ((char *)kmap(kiov->kiov_page)) +
483 kiov->kiov_offset + kiovoffset;
484
485 memcpy(addr, (char *)iov->iov_base + iovoffset, this_nob);
486 nob -= this_nob;
487
488 if (kiov->kiov_len > kiovoffset + this_nob) {
489 addr += this_nob;
490 kiovoffset += this_nob;
491 } else {
492 kunmap(kiov->kiov_page);
493 addr = NULL;
494 kiov++;
495 nkiov--;
496 kiovoffset = 0;
497 }
498
499 if (iov->iov_len > iovoffset + this_nob) {
500 iovoffset += this_nob;
501 } else {
502 iov++;
503 niov--;
504 iovoffset = 0;
505 }
506 } while (nob > 0);
507
508 if (addr != NULL)
509 kunmap(kiov->kiov_page);
510}
511EXPORT_SYMBOL(lnet_copy_iov2kiov);
512
513int
514lnet_extract_kiov(int dst_niov, lnet_kiov_t *dst,
515 int src_niov, lnet_kiov_t *src,
516 unsigned int offset, unsigned int len)
517{
518
519
520
521 unsigned int frag_len;
522 unsigned int niov;
523
524 if (len == 0)
525 return 0;
526
527 LASSERT(src_niov > 0);
528 while (offset >= src->kiov_len) {
529 offset -= src->kiov_len;
530 src_niov--;
531 src++;
532 LASSERT(src_niov > 0);
533 }
534
535 niov = 1;
536 for (;;) {
537 LASSERT(src_niov > 0);
538 LASSERT((int)niov <= dst_niov);
539
540 frag_len = src->kiov_len - offset;
541 dst->kiov_page = src->kiov_page;
542 dst->kiov_offset = src->kiov_offset + offset;
543
544 if (len <= frag_len) {
545 dst->kiov_len = len;
546 LASSERT(dst->kiov_offset + dst->kiov_len
547 <= PAGE_CACHE_SIZE);
548 return niov;
549 }
550
551 dst->kiov_len = frag_len;
552 LASSERT(dst->kiov_offset + dst->kiov_len <= PAGE_CACHE_SIZE);
553
554 len -= frag_len;
555 dst++;
556 src++;
557 niov++;
558 src_niov--;
559 offset = 0;
560 }
561}
562EXPORT_SYMBOL(lnet_extract_kiov);
563
564static void
565lnet_ni_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
566 unsigned int offset, unsigned int mlen, unsigned int rlen)
567{
568 unsigned int niov = 0;
569 struct kvec *iov = NULL;
570 lnet_kiov_t *kiov = NULL;
571 int rc;
572
573 LASSERT(!in_interrupt());
574 LASSERT(mlen == 0 || msg != NULL);
575
576 if (msg != NULL) {
577 LASSERT(msg->msg_receiving);
578 LASSERT(!msg->msg_sending);
579 LASSERT(rlen == msg->msg_len);
580 LASSERT(mlen <= msg->msg_len);
581 LASSERT(msg->msg_offset == offset);
582 LASSERT(msg->msg_wanted == mlen);
583
584 msg->msg_receiving = 0;
585
586 if (mlen != 0) {
587 niov = msg->msg_niov;
588 iov = msg->msg_iov;
589 kiov = msg->msg_kiov;
590
591 LASSERT(niov > 0);
592 LASSERT((iov == NULL) != (kiov == NULL));
593 }
594 }
595
596 rc = (ni->ni_lnd->lnd_recv)(ni, private, msg, delayed,
597 niov, iov, kiov, offset, mlen, rlen);
598 if (rc < 0)
599 lnet_finalize(ni, msg, rc);
600}
601
602static void
603lnet_setpayloadbuffer(lnet_msg_t *msg)
604{
605 lnet_libmd_t *md = msg->msg_md;
606
607 LASSERT(msg->msg_len > 0);
608 LASSERT(!msg->msg_routing);
609 LASSERT(md != NULL);
610 LASSERT(msg->msg_niov == 0);
611 LASSERT(msg->msg_iov == NULL);
612 LASSERT(msg->msg_kiov == NULL);
613
614 msg->msg_niov = md->md_niov;
615 if ((md->md_options & LNET_MD_KIOV) != 0)
616 msg->msg_kiov = md->md_iov.kiov;
617 else
618 msg->msg_iov = md->md_iov.iov;
619}
620
621void
622lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
623 unsigned int offset, unsigned int len)
624{
625 msg->msg_type = type;
626 msg->msg_target = target;
627 msg->msg_len = len;
628 msg->msg_offset = offset;
629
630 if (len != 0)
631 lnet_setpayloadbuffer(msg);
632
633 memset(&msg->msg_hdr, 0, sizeof(msg->msg_hdr));
634 msg->msg_hdr.type = cpu_to_le32(type);
635 msg->msg_hdr.dest_nid = cpu_to_le64(target.nid);
636 msg->msg_hdr.dest_pid = cpu_to_le32(target.pid);
637
638 msg->msg_hdr.src_pid = cpu_to_le32(the_lnet.ln_pid);
639 msg->msg_hdr.payload_length = cpu_to_le32(len);
640}
641
642static void
643lnet_ni_send(lnet_ni_t *ni, lnet_msg_t *msg)
644{
645 void *priv = msg->msg_private;
646 int rc;
647
648 LASSERT(!in_interrupt());
649 LASSERT(LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
650 (msg->msg_txcredit && msg->msg_peertxcredit));
651
652 rc = (ni->ni_lnd->lnd_send)(ni, priv, msg);
653 if (rc < 0)
654 lnet_finalize(ni, msg, rc);
655}
656
657static int
658lnet_ni_eager_recv(lnet_ni_t *ni, lnet_msg_t *msg)
659{
660 int rc;
661
662 LASSERT(!msg->msg_sending);
663 LASSERT(msg->msg_receiving);
664 LASSERT(!msg->msg_rx_ready_delay);
665 LASSERT(ni->ni_lnd->lnd_eager_recv != NULL);
666
667 msg->msg_rx_ready_delay = 1;
668 rc = (ni->ni_lnd->lnd_eager_recv)(ni, msg->msg_private, msg,
669 &msg->msg_private);
670 if (rc != 0) {
671 CERROR("recv from %s / send to %s aborted: eager_recv failed %d\n",
672 libcfs_nid2str(msg->msg_rxpeer->lp_nid),
673 libcfs_id2str(msg->msg_target), rc);
674 LASSERT(rc < 0);
675 }
676
677 return rc;
678}
679
680
681static void
682lnet_ni_query_locked(lnet_ni_t *ni, lnet_peer_t *lp)
683{
684 unsigned long last_alive = 0;
685
686 LASSERT(lnet_peer_aliveness_enabled(lp));
687 LASSERT(ni->ni_lnd->lnd_query != NULL);
688
689 lnet_net_unlock(lp->lp_cpt);
690 (ni->ni_lnd->lnd_query)(ni, lp->lp_nid, &last_alive);
691 lnet_net_lock(lp->lp_cpt);
692
693 lp->lp_last_query = cfs_time_current();
694
695 if (last_alive != 0)
696 lp->lp_last_alive = last_alive;
697}
698
699
700static inline int
701lnet_peer_is_alive(lnet_peer_t *lp, unsigned long now)
702{
703 int alive;
704 unsigned long deadline;
705
706 LASSERT(lnet_peer_aliveness_enabled(lp));
707
708
709
710
711 if (!lp->lp_alive && lp->lp_alive_count > 0 &&
712 cfs_time_aftereq(lp->lp_timestamp, lp->lp_last_alive))
713 return 0;
714
715 deadline = cfs_time_add(lp->lp_last_alive,
716 cfs_time_seconds(lp->lp_ni->ni_peertimeout));
717 alive = cfs_time_after(deadline, now);
718
719
720
721
722
723 if (alive && !lp->lp_alive &&
724 !(lnet_isrouter(lp) && lp->lp_alive_count == 0))
725 lnet_notify_locked(lp, 0, 1, lp->lp_last_alive);
726
727 return alive;
728}
729
730
731
732
733static int
734lnet_peer_alive_locked(lnet_peer_t *lp)
735{
736 unsigned long now = cfs_time_current();
737
738 if (!lnet_peer_aliveness_enabled(lp))
739 return -ENODEV;
740
741 if (lnet_peer_is_alive(lp, now))
742 return 1;
743
744
745
746 if (lp->lp_last_query != 0) {
747 static const int lnet_queryinterval = 1;
748
749 unsigned long next_query =
750 cfs_time_add(lp->lp_last_query,
751 cfs_time_seconds(lnet_queryinterval));
752
753 if (time_before(now, next_query)) {
754 if (lp->lp_alive)
755 CWARN("Unexpected aliveness of peer %s: %d < %d (%d/%d)\n",
756 libcfs_nid2str(lp->lp_nid),
757 (int)now, (int)next_query,
758 lnet_queryinterval,
759 lp->lp_ni->ni_peertimeout);
760 return 0;
761 }
762 }
763
764
765 lnet_ni_query_locked(lp->lp_ni, lp);
766
767 if (lnet_peer_is_alive(lp, now))
768 return 1;
769
770 lnet_notify_locked(lp, 0, 0, lp->lp_last_alive);
771 return 0;
772}
773
774
775
776
777
778
779
780
781
782
783
784
785static int
786lnet_post_send_locked(lnet_msg_t *msg, int do_send)
787{
788 lnet_peer_t *lp = msg->msg_txpeer;
789 lnet_ni_t *ni = lp->lp_ni;
790 int cpt = msg->msg_tx_cpt;
791 struct lnet_tx_queue *tq = ni->ni_tx_queues[cpt];
792
793
794 LASSERT(!do_send || msg->msg_tx_delayed);
795 LASSERT(!msg->msg_receiving);
796 LASSERT(msg->msg_tx_committed);
797
798
799 if ((msg->msg_target.pid & LNET_PID_USERFLAG) == 0 &&
800 lnet_peer_alive_locked(lp) == 0) {
801 the_lnet.ln_counters[cpt]->drop_count++;
802 the_lnet.ln_counters[cpt]->drop_length += msg->msg_len;
803 lnet_net_unlock(cpt);
804
805 CNETERR("Dropping message for %s: peer not alive\n",
806 libcfs_id2str(msg->msg_target));
807 if (do_send)
808 lnet_finalize(ni, msg, -EHOSTUNREACH);
809
810 lnet_net_lock(cpt);
811 return EHOSTUNREACH;
812 }
813
814 if (msg->msg_md != NULL &&
815 (msg->msg_md->md_flags & LNET_MD_FLAG_ABORTED) != 0) {
816 lnet_net_unlock(cpt);
817
818 CNETERR("Aborting message for %s: LNetM[DE]Unlink() already called on the MD/ME.\n",
819 libcfs_id2str(msg->msg_target));
820 if (do_send)
821 lnet_finalize(ni, msg, -ECANCELED);
822
823 lnet_net_lock(cpt);
824 return ECANCELED;
825 }
826
827 if (!msg->msg_peertxcredit) {
828 LASSERT((lp->lp_txcredits < 0) ==
829 !list_empty(&lp->lp_txq));
830
831 msg->msg_peertxcredit = 1;
832 lp->lp_txqnob += msg->msg_len + sizeof(lnet_hdr_t);
833 lp->lp_txcredits--;
834
835 if (lp->lp_txcredits < lp->lp_mintxcredits)
836 lp->lp_mintxcredits = lp->lp_txcredits;
837
838 if (lp->lp_txcredits < 0) {
839 msg->msg_tx_delayed = 1;
840 list_add_tail(&msg->msg_list, &lp->lp_txq);
841 return EAGAIN;
842 }
843 }
844
845 if (!msg->msg_txcredit) {
846 LASSERT((tq->tq_credits < 0) ==
847 !list_empty(&tq->tq_delayed));
848
849 msg->msg_txcredit = 1;
850 tq->tq_credits--;
851
852 if (tq->tq_credits < tq->tq_credits_min)
853 tq->tq_credits_min = tq->tq_credits;
854
855 if (tq->tq_credits < 0) {
856 msg->msg_tx_delayed = 1;
857 list_add_tail(&msg->msg_list, &tq->tq_delayed);
858 return EAGAIN;
859 }
860 }
861
862 if (do_send) {
863 lnet_net_unlock(cpt);
864 lnet_ni_send(ni, msg);
865 lnet_net_lock(cpt);
866 }
867 return 0;
868}
869
870
871static lnet_rtrbufpool_t *
872lnet_msg2bufpool(lnet_msg_t *msg)
873{
874 lnet_rtrbufpool_t *rbp;
875 int cpt;
876
877 LASSERT(msg->msg_rx_committed);
878
879 cpt = msg->msg_rx_cpt;
880 rbp = &the_lnet.ln_rtrpools[cpt][0];
881
882 LASSERT(msg->msg_len <= LNET_MTU);
883 while (msg->msg_len > (unsigned int)rbp->rbp_npages * PAGE_CACHE_SIZE) {
884 rbp++;
885 LASSERT(rbp < &the_lnet.ln_rtrpools[cpt][LNET_NRBPOOLS]);
886 }
887
888 return rbp;
889}
890
891static int
892lnet_post_routed_recv_locked(lnet_msg_t *msg, int do_recv)
893{
894
895
896
897 lnet_peer_t *lp = msg->msg_rxpeer;
898 lnet_rtrbufpool_t *rbp;
899 lnet_rtrbuf_t *rb;
900
901 LASSERT(msg->msg_iov == NULL);
902 LASSERT(msg->msg_kiov == NULL);
903 LASSERT(msg->msg_niov == 0);
904 LASSERT(msg->msg_routing);
905 LASSERT(msg->msg_receiving);
906 LASSERT(!msg->msg_sending);
907
908
909 LASSERT(!do_recv || msg->msg_rx_delayed);
910
911 if (!msg->msg_peerrtrcredit) {
912 LASSERT((lp->lp_rtrcredits < 0) ==
913 !list_empty(&lp->lp_rtrq));
914
915 msg->msg_peerrtrcredit = 1;
916 lp->lp_rtrcredits--;
917 if (lp->lp_rtrcredits < lp->lp_minrtrcredits)
918 lp->lp_minrtrcredits = lp->lp_rtrcredits;
919
920 if (lp->lp_rtrcredits < 0) {
921
922 LASSERT(msg->msg_rx_ready_delay);
923 msg->msg_rx_delayed = 1;
924 list_add_tail(&msg->msg_list, &lp->lp_rtrq);
925 return EAGAIN;
926 }
927 }
928
929 rbp = lnet_msg2bufpool(msg);
930
931 if (!msg->msg_rtrcredit) {
932 LASSERT((rbp->rbp_credits < 0) ==
933 !list_empty(&rbp->rbp_msgs));
934
935 msg->msg_rtrcredit = 1;
936 rbp->rbp_credits--;
937 if (rbp->rbp_credits < rbp->rbp_mincredits)
938 rbp->rbp_mincredits = rbp->rbp_credits;
939
940 if (rbp->rbp_credits < 0) {
941
942 LASSERT(msg->msg_rx_ready_delay);
943 msg->msg_rx_delayed = 1;
944 list_add_tail(&msg->msg_list, &rbp->rbp_msgs);
945 return EAGAIN;
946 }
947 }
948
949 LASSERT(!list_empty(&rbp->rbp_bufs));
950 rb = list_entry(rbp->rbp_bufs.next, lnet_rtrbuf_t, rb_list);
951 list_del(&rb->rb_list);
952
953 msg->msg_niov = rbp->rbp_npages;
954 msg->msg_kiov = &rb->rb_kiov[0];
955
956 if (do_recv) {
957 int cpt = msg->msg_rx_cpt;
958
959 lnet_net_unlock(cpt);
960 lnet_ni_recv(lp->lp_ni, msg->msg_private, msg, 1,
961 0, msg->msg_len, msg->msg_len);
962 lnet_net_lock(cpt);
963 }
964 return 0;
965}
966
967void
968lnet_return_tx_credits_locked(lnet_msg_t *msg)
969{
970 lnet_peer_t *txpeer = msg->msg_txpeer;
971 lnet_msg_t *msg2;
972
973 if (msg->msg_txcredit) {
974 struct lnet_ni *ni = txpeer->lp_ni;
975 struct lnet_tx_queue *tq = ni->ni_tx_queues[msg->msg_tx_cpt];
976
977
978 msg->msg_txcredit = 0;
979
980 LASSERT((tq->tq_credits < 0) ==
981 !list_empty(&tq->tq_delayed));
982
983 tq->tq_credits++;
984 if (tq->tq_credits <= 0) {
985 msg2 = list_entry(tq->tq_delayed.next,
986 lnet_msg_t, msg_list);
987 list_del(&msg2->msg_list);
988
989 LASSERT(msg2->msg_txpeer->lp_ni == ni);
990 LASSERT(msg2->msg_tx_delayed);
991
992 (void) lnet_post_send_locked(msg2, 1);
993 }
994 }
995
996 if (msg->msg_peertxcredit) {
997
998 msg->msg_peertxcredit = 0;
999
1000 LASSERT((txpeer->lp_txcredits < 0) ==
1001 !list_empty(&txpeer->lp_txq));
1002
1003 txpeer->lp_txqnob -= msg->msg_len + sizeof(lnet_hdr_t);
1004 LASSERT(txpeer->lp_txqnob >= 0);
1005
1006 txpeer->lp_txcredits++;
1007 if (txpeer->lp_txcredits <= 0) {
1008 msg2 = list_entry(txpeer->lp_txq.next,
1009 lnet_msg_t, msg_list);
1010 list_del(&msg2->msg_list);
1011
1012 LASSERT(msg2->msg_txpeer == txpeer);
1013 LASSERT(msg2->msg_tx_delayed);
1014
1015 (void) lnet_post_send_locked(msg2, 1);
1016 }
1017 }
1018
1019 if (txpeer != NULL) {
1020 msg->msg_txpeer = NULL;
1021 lnet_peer_decref_locked(txpeer);
1022 }
1023}
1024
1025void
1026lnet_return_rx_credits_locked(lnet_msg_t *msg)
1027{
1028 lnet_peer_t *rxpeer = msg->msg_rxpeer;
1029 lnet_msg_t *msg2;
1030
1031 if (msg->msg_rtrcredit) {
1032
1033 lnet_rtrbuf_t *rb;
1034 lnet_rtrbufpool_t *rbp;
1035
1036
1037
1038
1039 LASSERT(msg->msg_kiov != NULL);
1040
1041 rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]);
1042 rbp = rb->rb_pool;
1043 LASSERT(rbp == lnet_msg2bufpool(msg));
1044
1045 msg->msg_kiov = NULL;
1046 msg->msg_rtrcredit = 0;
1047
1048 LASSERT((rbp->rbp_credits < 0) ==
1049 !list_empty(&rbp->rbp_msgs));
1050 LASSERT((rbp->rbp_credits > 0) ==
1051 !list_empty(&rbp->rbp_bufs));
1052
1053 list_add(&rb->rb_list, &rbp->rbp_bufs);
1054 rbp->rbp_credits++;
1055 if (rbp->rbp_credits <= 0) {
1056 msg2 = list_entry(rbp->rbp_msgs.next,
1057 lnet_msg_t, msg_list);
1058 list_del(&msg2->msg_list);
1059
1060 (void) lnet_post_routed_recv_locked(msg2, 1);
1061 }
1062 }
1063
1064 if (msg->msg_peerrtrcredit) {
1065
1066 msg->msg_peerrtrcredit = 0;
1067
1068 LASSERT((rxpeer->lp_rtrcredits < 0) ==
1069 !list_empty(&rxpeer->lp_rtrq));
1070
1071 rxpeer->lp_rtrcredits++;
1072 if (rxpeer->lp_rtrcredits <= 0) {
1073 msg2 = list_entry(rxpeer->lp_rtrq.next,
1074 lnet_msg_t, msg_list);
1075 list_del(&msg2->msg_list);
1076
1077 (void) lnet_post_routed_recv_locked(msg2, 1);
1078 }
1079 }
1080 if (rxpeer != NULL) {
1081 msg->msg_rxpeer = NULL;
1082 lnet_peer_decref_locked(rxpeer);
1083 }
1084}
1085
1086static int
1087lnet_compare_routes(lnet_route_t *r1, lnet_route_t *r2)
1088{
1089 lnet_peer_t *p1 = r1->lr_gateway;
1090 lnet_peer_t *p2 = r2->lr_gateway;
1091
1092 if (r1->lr_priority < r2->lr_priority)
1093 return 1;
1094
1095 if (r1->lr_priority > r2->lr_priority)
1096 return -1;
1097
1098 if (r1->lr_hops < r2->lr_hops)
1099 return 1;
1100
1101 if (r1->lr_hops > r2->lr_hops)
1102 return -1;
1103
1104 if (p1->lp_txqnob < p2->lp_txqnob)
1105 return 1;
1106
1107 if (p1->lp_txqnob > p2->lp_txqnob)
1108 return -1;
1109
1110 if (p1->lp_txcredits > p2->lp_txcredits)
1111 return 1;
1112
1113 if (p1->lp_txcredits < p2->lp_txcredits)
1114 return -1;
1115
1116 if (r1->lr_seq - r2->lr_seq <= 0)
1117 return 1;
1118
1119 return -1;
1120}
1121
1122static lnet_peer_t *
1123lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target, lnet_nid_t rtr_nid)
1124{
1125 lnet_remotenet_t *rnet;
1126 lnet_route_t *rtr;
1127 lnet_route_t *rtr_best;
1128 lnet_route_t *rtr_last;
1129 struct lnet_peer *lp_best;
1130 struct lnet_peer *lp;
1131 int rc;
1132
1133
1134
1135
1136 rnet = lnet_find_net_locked(LNET_NIDNET(target));
1137 if (rnet == NULL)
1138 return NULL;
1139
1140 lp_best = NULL;
1141 rtr_best = rtr_last = NULL;
1142 list_for_each_entry(rtr, &rnet->lrn_routes, lr_list) {
1143 lp = rtr->lr_gateway;
1144
1145 if (!lp->lp_alive ||
1146 ((lp->lp_ping_feats & LNET_PING_FEAT_NI_STATUS) != 0 &&
1147 rtr->lr_downis != 0))
1148 continue;
1149
1150 if (ni != NULL && lp->lp_ni != ni)
1151 continue;
1152
1153 if (lp->lp_nid == rtr_nid)
1154 return lp;
1155
1156 if (lp_best == NULL) {
1157 rtr_best = rtr_last = rtr;
1158 lp_best = lp;
1159 continue;
1160 }
1161
1162
1163 if (rtr_last->lr_seq - rtr->lr_seq < 0)
1164 rtr_last = rtr;
1165
1166 rc = lnet_compare_routes(rtr, rtr_best);
1167 if (rc < 0)
1168 continue;
1169
1170 rtr_best = rtr;
1171 lp_best = lp;
1172 }
1173
1174
1175
1176
1177 if (rtr_best != NULL)
1178 rtr_best->lr_seq = rtr_last->lr_seq + 1;
1179 return lp_best;
1180}
1181
1182int
1183lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
1184{
1185 lnet_nid_t dst_nid = msg->msg_target.nid;
1186 struct lnet_ni *src_ni;
1187 struct lnet_ni *local_ni;
1188 struct lnet_peer *lp;
1189 int cpt;
1190 int cpt2;
1191 int rc;
1192
1193
1194
1195
1196
1197 LASSERT(msg->msg_txpeer == NULL);
1198 LASSERT(!msg->msg_sending);
1199 LASSERT(!msg->msg_target_is_router);
1200 LASSERT(!msg->msg_receiving);
1201
1202 msg->msg_sending = 1;
1203
1204 LASSERT(!msg->msg_tx_committed);
1205 cpt = lnet_cpt_of_nid(rtr_nid == LNET_NID_ANY ? dst_nid : rtr_nid);
1206 again:
1207 lnet_net_lock(cpt);
1208
1209 if (the_lnet.ln_shutdown) {
1210 lnet_net_unlock(cpt);
1211 return -ESHUTDOWN;
1212 }
1213
1214 if (src_nid == LNET_NID_ANY) {
1215 src_ni = NULL;
1216 } else {
1217 src_ni = lnet_nid2ni_locked(src_nid, cpt);
1218 if (src_ni == NULL) {
1219 lnet_net_unlock(cpt);
1220 LCONSOLE_WARN("Can't send to %s: src %s is not a local nid\n",
1221 libcfs_nid2str(dst_nid),
1222 libcfs_nid2str(src_nid));
1223 return -EINVAL;
1224 }
1225 LASSERT(!msg->msg_routing);
1226 }
1227
1228
1229 local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid), cpt);
1230
1231 if (local_ni != NULL) {
1232 if (src_ni == NULL) {
1233 src_ni = local_ni;
1234 src_nid = src_ni->ni_nid;
1235 } else if (src_ni == local_ni) {
1236 lnet_ni_decref_locked(local_ni, cpt);
1237 } else {
1238 lnet_ni_decref_locked(local_ni, cpt);
1239 lnet_ni_decref_locked(src_ni, cpt);
1240 lnet_net_unlock(cpt);
1241 LCONSOLE_WARN("No route to %s via from %s\n",
1242 libcfs_nid2str(dst_nid),
1243 libcfs_nid2str(src_nid));
1244 return -EINVAL;
1245 }
1246
1247 LASSERT(src_nid != LNET_NID_ANY);
1248 lnet_msg_commit(msg, cpt);
1249
1250 if (!msg->msg_routing)
1251 msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1252
1253 if (src_ni == the_lnet.ln_loni) {
1254
1255 lnet_net_unlock(cpt);
1256 lnet_ni_send(src_ni, msg);
1257
1258 lnet_net_lock(cpt);
1259 lnet_ni_decref_locked(src_ni, cpt);
1260 lnet_net_unlock(cpt);
1261 return 0;
1262 }
1263
1264 rc = lnet_nid2peer_locked(&lp, dst_nid, cpt);
1265
1266 lnet_ni_decref_locked(src_ni, cpt);
1267 if (rc != 0) {
1268 lnet_net_unlock(cpt);
1269 LCONSOLE_WARN("Error %d finding peer %s\n", rc,
1270 libcfs_nid2str(dst_nid));
1271
1272 return rc;
1273 }
1274 LASSERT(lp->lp_ni == src_ni);
1275 } else {
1276
1277 lp = lnet_find_route_locked(src_ni, dst_nid, rtr_nid);
1278 if (lp == NULL) {
1279 if (src_ni != NULL)
1280 lnet_ni_decref_locked(src_ni, cpt);
1281 lnet_net_unlock(cpt);
1282
1283 LCONSOLE_WARN("No route to %s via %s (all routers down)\n",
1284 libcfs_id2str(msg->msg_target),
1285 libcfs_nid2str(src_nid));
1286 return -EHOSTUNREACH;
1287 }
1288
1289
1290
1291
1292
1293 if (rtr_nid != lp->lp_nid) {
1294 cpt2 = lnet_cpt_of_nid_locked(lp->lp_nid);
1295 if (cpt2 != cpt) {
1296 if (src_ni != NULL)
1297 lnet_ni_decref_locked(src_ni, cpt);
1298 lnet_net_unlock(cpt);
1299
1300 rtr_nid = lp->lp_nid;
1301 cpt = cpt2;
1302 goto again;
1303 }
1304 }
1305
1306 CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
1307 libcfs_nid2str(dst_nid), libcfs_nid2str(lp->lp_nid),
1308 lnet_msgtyp2str(msg->msg_type), msg->msg_len);
1309
1310 if (src_ni == NULL) {
1311 src_ni = lp->lp_ni;
1312 src_nid = src_ni->ni_nid;
1313 } else {
1314 LASSERT(src_ni == lp->lp_ni);
1315 lnet_ni_decref_locked(src_ni, cpt);
1316 }
1317
1318 lnet_peer_addref_locked(lp);
1319
1320 LASSERT(src_nid != LNET_NID_ANY);
1321 lnet_msg_commit(msg, cpt);
1322
1323 if (!msg->msg_routing) {
1324
1325 msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1326 }
1327
1328 msg->msg_target_is_router = 1;
1329 msg->msg_target.nid = lp->lp_nid;
1330 msg->msg_target.pid = LUSTRE_SRV_LNET_PID;
1331 }
1332
1333
1334
1335 LASSERT(!msg->msg_peertxcredit);
1336 LASSERT(!msg->msg_txcredit);
1337 LASSERT(msg->msg_txpeer == NULL);
1338
1339 msg->msg_txpeer = lp;
1340
1341 rc = lnet_post_send_locked(msg, 0);
1342 lnet_net_unlock(cpt);
1343
1344 if (rc == EHOSTUNREACH || rc == ECANCELED)
1345 return -rc;
1346
1347 if (rc == 0)
1348 lnet_ni_send(src_ni, msg);
1349
1350 return 0;
1351}
1352
1353static void
1354lnet_drop_message(lnet_ni_t *ni, int cpt, void *private, unsigned int nob)
1355{
1356 lnet_net_lock(cpt);
1357 the_lnet.ln_counters[cpt]->drop_count++;
1358 the_lnet.ln_counters[cpt]->drop_length += nob;
1359 lnet_net_unlock(cpt);
1360
1361 lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
1362}
1363
1364static void
1365lnet_recv_put(lnet_ni_t *ni, lnet_msg_t *msg)
1366{
1367 lnet_hdr_t *hdr = &msg->msg_hdr;
1368
1369 if (msg->msg_wanted != 0)
1370 lnet_setpayloadbuffer(msg);
1371
1372 lnet_build_msg_event(msg, LNET_EVENT_PUT);
1373
1374
1375
1376 msg->msg_ack = (!lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
1377 (msg->msg_md->md_options & LNET_MD_ACK_DISABLE) == 0);
1378
1379 lnet_ni_recv(ni, msg->msg_private, msg, msg->msg_rx_delayed,
1380 msg->msg_offset, msg->msg_wanted, hdr->payload_length);
1381}
1382
1383static int
1384lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
1385{
1386 lnet_hdr_t *hdr = &msg->msg_hdr;
1387 struct lnet_match_info info;
1388 int rc;
1389
1390
1391 hdr->msg.put.match_bits = le64_to_cpu(hdr->msg.put.match_bits);
1392 hdr->msg.put.ptl_index = le32_to_cpu(hdr->msg.put.ptl_index);
1393 hdr->msg.put.offset = le32_to_cpu(hdr->msg.put.offset);
1394
1395 info.mi_id.nid = hdr->src_nid;
1396 info.mi_id.pid = hdr->src_pid;
1397 info.mi_opc = LNET_MD_OP_PUT;
1398 info.mi_portal = hdr->msg.put.ptl_index;
1399 info.mi_rlength = hdr->payload_length;
1400 info.mi_roffset = hdr->msg.put.offset;
1401 info.mi_mbits = hdr->msg.put.match_bits;
1402
1403 msg->msg_rx_ready_delay = ni->ni_lnd->lnd_eager_recv == NULL;
1404
1405 again:
1406 rc = lnet_ptl_match_md(&info, msg);
1407 switch (rc) {
1408 default:
1409 LBUG();
1410
1411 case LNET_MATCHMD_OK:
1412 lnet_recv_put(ni, msg);
1413 return 0;
1414
1415 case LNET_MATCHMD_NONE:
1416 if (msg->msg_rx_delayed)
1417 return 0;
1418
1419 rc = lnet_ni_eager_recv(ni, msg);
1420 if (rc == 0)
1421 goto again;
1422
1423
1424 case LNET_MATCHMD_DROP:
1425 CNETERR("Dropping PUT from %s portal %d match %llu offset %d length %d: %d\n",
1426 libcfs_id2str(info.mi_id), info.mi_portal,
1427 info.mi_mbits, info.mi_roffset, info.mi_rlength, rc);
1428
1429 return ENOENT;
1430 }
1431}
1432
1433static int
1434lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
1435{
1436 struct lnet_match_info info;
1437 lnet_hdr_t *hdr = &msg->msg_hdr;
1438 lnet_handle_wire_t reply_wmd;
1439 int rc;
1440
1441
1442 hdr->msg.get.match_bits = le64_to_cpu(hdr->msg.get.match_bits);
1443 hdr->msg.get.ptl_index = le32_to_cpu(hdr->msg.get.ptl_index);
1444 hdr->msg.get.sink_length = le32_to_cpu(hdr->msg.get.sink_length);
1445 hdr->msg.get.src_offset = le32_to_cpu(hdr->msg.get.src_offset);
1446
1447 info.mi_id.nid = hdr->src_nid;
1448 info.mi_id.pid = hdr->src_pid;
1449 info.mi_opc = LNET_MD_OP_GET;
1450 info.mi_portal = hdr->msg.get.ptl_index;
1451 info.mi_rlength = hdr->msg.get.sink_length;
1452 info.mi_roffset = hdr->msg.get.src_offset;
1453 info.mi_mbits = hdr->msg.get.match_bits;
1454
1455 rc = lnet_ptl_match_md(&info, msg);
1456 if (rc == LNET_MATCHMD_DROP) {
1457 CNETERR("Dropping GET from %s portal %d match %llu offset %d length %d\n",
1458 libcfs_id2str(info.mi_id), info.mi_portal,
1459 info.mi_mbits, info.mi_roffset, info.mi_rlength);
1460 return ENOENT;
1461 }
1462
1463 LASSERT(rc == LNET_MATCHMD_OK);
1464
1465 lnet_build_msg_event(msg, LNET_EVENT_GET);
1466
1467 reply_wmd = hdr->msg.get.return_wmd;
1468
1469 lnet_prep_send(msg, LNET_MSG_REPLY, info.mi_id,
1470 msg->msg_offset, msg->msg_wanted);
1471
1472 msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
1473
1474 if (rdma_get) {
1475
1476 lnet_ni_recv(ni, msg->msg_private, msg, 0,
1477 msg->msg_offset, msg->msg_len, msg->msg_len);
1478 return 0;
1479 }
1480
1481 lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
1482 msg->msg_receiving = 0;
1483
1484 rc = lnet_send(ni->ni_nid, msg, LNET_NID_ANY);
1485 if (rc < 0) {
1486
1487 CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
1488 libcfs_nid2str(ni->ni_nid),
1489 libcfs_id2str(info.mi_id), rc);
1490
1491 lnet_finalize(ni, msg, rc);
1492 }
1493
1494 return 0;
1495}
1496
1497static int
1498lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
1499{
1500 void *private = msg->msg_private;
1501 lnet_hdr_t *hdr = &msg->msg_hdr;
1502 lnet_process_id_t src = {0};
1503 lnet_libmd_t *md;
1504 int rlength;
1505 int mlength;
1506 int cpt;
1507
1508 cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie);
1509 lnet_res_lock(cpt);
1510
1511 src.nid = hdr->src_nid;
1512 src.pid = hdr->src_pid;
1513
1514
1515 md = lnet_wire_handle2md(&hdr->msg.reply.dst_wmd);
1516 if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
1517 CNETERR("%s: Dropping REPLY from %s for %s MD %#llx.%#llx\n",
1518 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1519 (md == NULL) ? "invalid" : "inactive",
1520 hdr->msg.reply.dst_wmd.wh_interface_cookie,
1521 hdr->msg.reply.dst_wmd.wh_object_cookie);
1522 if (md != NULL && md->md_me != NULL)
1523 CERROR("REPLY MD also attached to portal %d\n",
1524 md->md_me->me_portal);
1525
1526 lnet_res_unlock(cpt);
1527 return ENOENT;
1528 }
1529
1530 LASSERT(md->md_offset == 0);
1531
1532 rlength = hdr->payload_length;
1533 mlength = min_t(uint, rlength, md->md_length);
1534
1535 if (mlength < rlength &&
1536 (md->md_options & LNET_MD_TRUNCATE) == 0) {
1537 CNETERR("%s: Dropping REPLY from %s length %d for MD %#llx would overflow (%d)\n",
1538 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1539 rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
1540 mlength);
1541 lnet_res_unlock(cpt);
1542 return ENOENT;
1543 }
1544
1545 CDEBUG(D_NET, "%s: Reply from %s of length %d/%d into md %#llx\n",
1546 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1547 mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
1548
1549 lnet_msg_attach_md(msg, md, 0, mlength);
1550
1551 if (mlength != 0)
1552 lnet_setpayloadbuffer(msg);
1553
1554 lnet_res_unlock(cpt);
1555
1556 lnet_build_msg_event(msg, LNET_EVENT_REPLY);
1557
1558 lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
1559 return 0;
1560}
1561
1562static int
1563lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
1564{
1565 lnet_hdr_t *hdr = &msg->msg_hdr;
1566 lnet_process_id_t src = {0};
1567 lnet_libmd_t *md;
1568 int cpt;
1569
1570 src.nid = hdr->src_nid;
1571 src.pid = hdr->src_pid;
1572
1573
1574 hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
1575 hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
1576
1577 cpt = lnet_cpt_of_cookie(hdr->msg.ack.dst_wmd.wh_object_cookie);
1578 lnet_res_lock(cpt);
1579
1580
1581 md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
1582 if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
1583
1584 CDEBUG(D_NET,
1585 "%s: Dropping ACK from %s to %s MD %#llx.%#llx\n",
1586 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1587 (md == NULL) ? "invalid" : "inactive",
1588 hdr->msg.ack.dst_wmd.wh_interface_cookie,
1589 hdr->msg.ack.dst_wmd.wh_object_cookie);
1590 if (md != NULL && md->md_me != NULL)
1591 CERROR("Source MD also attached to portal %d\n",
1592 md->md_me->me_portal);
1593
1594 lnet_res_unlock(cpt);
1595 return ENOENT;
1596 }
1597
1598 CDEBUG(D_NET, "%s: ACK from %s into md %#llx\n",
1599 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1600 hdr->msg.ack.dst_wmd.wh_object_cookie);
1601
1602 lnet_msg_attach_md(msg, md, 0, 0);
1603
1604 lnet_res_unlock(cpt);
1605
1606 lnet_build_msg_event(msg, LNET_EVENT_ACK);
1607
1608 lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
1609 return 0;
1610}
1611
1612static int
1613lnet_parse_forward_locked(lnet_ni_t *ni, lnet_msg_t *msg)
1614{
1615 int rc = 0;
1616
1617 if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
1618 lnet_msg2bufpool(msg)->rbp_credits <= 0) {
1619 if (ni->ni_lnd->lnd_eager_recv == NULL) {
1620 msg->msg_rx_ready_delay = 1;
1621 } else {
1622 lnet_net_unlock(msg->msg_rx_cpt);
1623 rc = lnet_ni_eager_recv(ni, msg);
1624 lnet_net_lock(msg->msg_rx_cpt);
1625 }
1626 }
1627
1628 if (rc == 0)
1629 rc = lnet_post_routed_recv_locked(msg, 0);
1630 return rc;
1631}
1632
1633char *
1634lnet_msgtyp2str(int type)
1635{
1636 switch (type) {
1637 case LNET_MSG_ACK:
1638 return "ACK";
1639 case LNET_MSG_PUT:
1640 return "PUT";
1641 case LNET_MSG_GET:
1642 return "GET";
1643 case LNET_MSG_REPLY:
1644 return "REPLY";
1645 case LNET_MSG_HELLO:
1646 return "HELLO";
1647 default:
1648 return "<UNKNOWN>";
1649 }
1650}
1651EXPORT_SYMBOL(lnet_msgtyp2str);
1652
1653void
1654lnet_print_hdr(lnet_hdr_t *hdr)
1655{
1656 lnet_process_id_t src = {0};
1657 lnet_process_id_t dst = {0};
1658 char *type_str = lnet_msgtyp2str(hdr->type);
1659
1660 src.nid = hdr->src_nid;
1661 src.pid = hdr->src_pid;
1662
1663 dst.nid = hdr->dest_nid;
1664 dst.pid = hdr->dest_pid;
1665
1666 CWARN("P3 Header at %p of type %s\n", hdr, type_str);
1667 CWARN(" From %s\n", libcfs_id2str(src));
1668 CWARN(" To %s\n", libcfs_id2str(dst));
1669
1670 switch (hdr->type) {
1671 default:
1672 break;
1673
1674 case LNET_MSG_PUT:
1675 CWARN(" Ptl index %d, ack md %#llx.%#llx, match bits %llu\n",
1676 hdr->msg.put.ptl_index,
1677 hdr->msg.put.ack_wmd.wh_interface_cookie,
1678 hdr->msg.put.ack_wmd.wh_object_cookie,
1679 hdr->msg.put.match_bits);
1680 CWARN(" Length %d, offset %d, hdr data %#llx\n",
1681 hdr->payload_length, hdr->msg.put.offset,
1682 hdr->msg.put.hdr_data);
1683 break;
1684
1685 case LNET_MSG_GET:
1686 CWARN(" Ptl index %d, return md %#llx.%#llx, match bits %llu\n",
1687 hdr->msg.get.ptl_index,
1688 hdr->msg.get.return_wmd.wh_interface_cookie,
1689 hdr->msg.get.return_wmd.wh_object_cookie,
1690 hdr->msg.get.match_bits);
1691 CWARN(" Length %d, src offset %d\n",
1692 hdr->msg.get.sink_length,
1693 hdr->msg.get.src_offset);
1694 break;
1695
1696 case LNET_MSG_ACK:
1697 CWARN(" dst md %#llx.%#llx, manipulated length %d\n",
1698 hdr->msg.ack.dst_wmd.wh_interface_cookie,
1699 hdr->msg.ack.dst_wmd.wh_object_cookie,
1700 hdr->msg.ack.mlength);
1701 break;
1702
1703 case LNET_MSG_REPLY:
1704 CWARN(" dst md %#llx.%#llx, length %d\n",
1705 hdr->msg.reply.dst_wmd.wh_interface_cookie,
1706 hdr->msg.reply.dst_wmd.wh_object_cookie,
1707 hdr->payload_length);
1708 }
1709
1710}
1711
1712int
1713lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
1714 void *private, int rdma_req)
1715{
1716 int rc = 0;
1717 int cpt;
1718 int for_me;
1719 struct lnet_msg *msg;
1720 lnet_pid_t dest_pid;
1721 lnet_nid_t dest_nid;
1722 lnet_nid_t src_nid;
1723 __u32 payload_length;
1724 __u32 type;
1725
1726 LASSERT(!in_interrupt());
1727
1728 type = le32_to_cpu(hdr->type);
1729 src_nid = le64_to_cpu(hdr->src_nid);
1730 dest_nid = le64_to_cpu(hdr->dest_nid);
1731 dest_pid = le32_to_cpu(hdr->dest_pid);
1732 payload_length = le32_to_cpu(hdr->payload_length);
1733
1734 for_me = (ni->ni_nid == dest_nid);
1735 cpt = lnet_cpt_of_nid(from_nid);
1736
1737 switch (type) {
1738 case LNET_MSG_ACK:
1739 case LNET_MSG_GET:
1740 if (payload_length > 0) {
1741 CERROR("%s, src %s: bad %s payload %d (0 expected)\n",
1742 libcfs_nid2str(from_nid),
1743 libcfs_nid2str(src_nid),
1744 lnet_msgtyp2str(type), payload_length);
1745 return -EPROTO;
1746 }
1747 break;
1748
1749 case LNET_MSG_PUT:
1750 case LNET_MSG_REPLY:
1751 if (payload_length >
1752 (__u32)(for_me ? LNET_MAX_PAYLOAD : LNET_MTU)) {
1753 CERROR("%s, src %s: bad %s payload %d (%d max expected)\n",
1754 libcfs_nid2str(from_nid),
1755 libcfs_nid2str(src_nid),
1756 lnet_msgtyp2str(type),
1757 payload_length,
1758 for_me ? LNET_MAX_PAYLOAD : LNET_MTU);
1759 return -EPROTO;
1760 }
1761 break;
1762
1763 default:
1764 CERROR("%s, src %s: Bad message type 0x%x\n",
1765 libcfs_nid2str(from_nid),
1766 libcfs_nid2str(src_nid), type);
1767 return -EPROTO;
1768 }
1769
1770 if (the_lnet.ln_routing &&
1771 ni->ni_last_alive != get_seconds()) {
1772 lnet_ni_lock(ni);
1773
1774
1775 ni->ni_last_alive = get_seconds();
1776 if (ni->ni_status != NULL &&
1777 ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
1778 ni->ni_status->ns_status = LNET_NI_STATUS_UP;
1779 lnet_ni_unlock(ni);
1780 }
1781
1782
1783
1784
1785
1786 if (!for_me) {
1787 if (LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
1788
1789 CERROR("%s, src %s: Bad dest nid %s (should have been sent direct)\n",
1790 libcfs_nid2str(from_nid),
1791 libcfs_nid2str(src_nid),
1792 libcfs_nid2str(dest_nid));
1793 return -EPROTO;
1794 }
1795
1796 if (lnet_islocalnid(dest_nid)) {
1797
1798
1799 CERROR("%s, src %s: Bad dest nid %s (it's my nid but on a different network)\n",
1800 libcfs_nid2str(from_nid),
1801 libcfs_nid2str(src_nid),
1802 libcfs_nid2str(dest_nid));
1803 return -EPROTO;
1804 }
1805
1806 if (rdma_req && type == LNET_MSG_GET) {
1807 CERROR("%s, src %s: Bad optimized GET for %s (final destination must be me)\n",
1808 libcfs_nid2str(from_nid),
1809 libcfs_nid2str(src_nid),
1810 libcfs_nid2str(dest_nid));
1811 return -EPROTO;
1812 }
1813
1814 if (!the_lnet.ln_routing) {
1815 CERROR("%s, src %s: Dropping message for %s (routing not enabled)\n",
1816 libcfs_nid2str(from_nid),
1817 libcfs_nid2str(src_nid),
1818 libcfs_nid2str(dest_nid));
1819 goto drop;
1820 }
1821 }
1822
1823
1824
1825
1826 if (!list_empty(&the_lnet.ln_test_peers) &&
1827 fail_peer(src_nid, 0)) {
1828 CERROR("%s, src %s: Dropping %s to simulate failure\n",
1829 libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1830 lnet_msgtyp2str(type));
1831 goto drop;
1832 }
1833
1834 msg = lnet_msg_alloc();
1835 if (msg == NULL) {
1836 CERROR("%s, src %s: Dropping %s (out of memory)\n",
1837 libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1838 lnet_msgtyp2str(type));
1839 goto drop;
1840 }
1841
1842
1843
1844
1845
1846 msg->msg_type = type;
1847 msg->msg_private = private;
1848 msg->msg_receiving = 1;
1849 msg->msg_len = msg->msg_wanted = payload_length;
1850 msg->msg_offset = 0;
1851 msg->msg_hdr = *hdr;
1852
1853 msg->msg_from = from_nid;
1854 if (!for_me) {
1855 msg->msg_target.pid = dest_pid;
1856 msg->msg_target.nid = dest_nid;
1857 msg->msg_routing = 1;
1858
1859 } else {
1860
1861 msg->msg_hdr.type = type;
1862 msg->msg_hdr.src_nid = src_nid;
1863 msg->msg_hdr.src_pid = le32_to_cpu(msg->msg_hdr.src_pid);
1864 msg->msg_hdr.dest_nid = dest_nid;
1865 msg->msg_hdr.dest_pid = dest_pid;
1866 msg->msg_hdr.payload_length = payload_length;
1867 }
1868
1869 lnet_net_lock(cpt);
1870 rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid, cpt);
1871 if (rc != 0) {
1872 lnet_net_unlock(cpt);
1873 CERROR("%s, src %s: Dropping %s (error %d looking up sender)\n",
1874 libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1875 lnet_msgtyp2str(type), rc);
1876 lnet_msg_free(msg);
1877 goto drop;
1878 }
1879
1880 if (lnet_isrouter(msg->msg_rxpeer)) {
1881 lnet_peer_set_alive(msg->msg_rxpeer);
1882 if (avoid_asym_router_failure &&
1883 LNET_NIDNET(src_nid) != LNET_NIDNET(from_nid)) {
1884
1885
1886
1887
1888 lnet_router_ni_update_locked(msg->msg_rxpeer,
1889 LNET_NIDNET(src_nid));
1890 }
1891 }
1892
1893 lnet_msg_commit(msg, cpt);
1894
1895 if (!for_me) {
1896 rc = lnet_parse_forward_locked(ni, msg);
1897 lnet_net_unlock(cpt);
1898
1899 if (rc < 0)
1900 goto free_drop;
1901 if (rc == 0) {
1902 lnet_ni_recv(ni, msg->msg_private, msg, 0,
1903 0, payload_length, payload_length);
1904 }
1905 return 0;
1906 }
1907
1908 lnet_net_unlock(cpt);
1909
1910 switch (type) {
1911 case LNET_MSG_ACK:
1912 rc = lnet_parse_ack(ni, msg);
1913 break;
1914 case LNET_MSG_PUT:
1915 rc = lnet_parse_put(ni, msg);
1916 break;
1917 case LNET_MSG_GET:
1918 rc = lnet_parse_get(ni, msg, rdma_req);
1919 break;
1920 case LNET_MSG_REPLY:
1921 rc = lnet_parse_reply(ni, msg);
1922 break;
1923 default:
1924 LASSERT(0);
1925 rc = -EPROTO;
1926 goto free_drop;
1927 }
1928
1929 if (rc == 0)
1930 return 0;
1931
1932 LASSERT(rc == ENOENT);
1933
1934 free_drop:
1935 LASSERT(msg->msg_md == NULL);
1936 lnet_finalize(ni, msg, rc);
1937
1938 drop:
1939 lnet_drop_message(ni, cpt, private, payload_length);
1940 return 0;
1941}
1942EXPORT_SYMBOL(lnet_parse);
1943
1944void
1945lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
1946{
1947 while (!list_empty(head)) {
1948 lnet_process_id_t id = {0};
1949 lnet_msg_t *msg;
1950
1951 msg = list_entry(head->next, lnet_msg_t, msg_list);
1952 list_del(&msg->msg_list);
1953
1954 id.nid = msg->msg_hdr.src_nid;
1955 id.pid = msg->msg_hdr.src_pid;
1956
1957 LASSERT(msg->msg_md == NULL);
1958 LASSERT(msg->msg_rx_delayed);
1959 LASSERT(msg->msg_rxpeer != NULL);
1960 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
1961
1962 CWARN("Dropping delayed PUT from %s portal %d match %llu offset %d length %d: %s\n",
1963 libcfs_id2str(id),
1964 msg->msg_hdr.msg.put.ptl_index,
1965 msg->msg_hdr.msg.put.match_bits,
1966 msg->msg_hdr.msg.put.offset,
1967 msg->msg_hdr.payload_length, reason);
1968
1969
1970
1971
1972
1973 lnet_drop_message(msg->msg_rxpeer->lp_ni,
1974 msg->msg_rxpeer->lp_cpt,
1975 msg->msg_private, msg->msg_len);
1976
1977
1978
1979
1980
1981 lnet_finalize(msg->msg_rxpeer->lp_ni, msg, -ENOENT);
1982 }
1983}
1984
1985void
1986lnet_recv_delayed_msg_list(struct list_head *head)
1987{
1988 while (!list_empty(head)) {
1989 lnet_msg_t *msg;
1990 lnet_process_id_t id;
1991
1992 msg = list_entry(head->next, lnet_msg_t, msg_list);
1993 list_del(&msg->msg_list);
1994
1995
1996
1997
1998 id.nid = msg->msg_hdr.src_nid;
1999 id.pid = msg->msg_hdr.src_pid;
2000
2001 LASSERT(msg->msg_rx_delayed);
2002 LASSERT(msg->msg_md != NULL);
2003 LASSERT(msg->msg_rxpeer != NULL);
2004 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
2005
2006 CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d match %llu offset %d length %d.\n",
2007 libcfs_id2str(id), msg->msg_hdr.msg.put.ptl_index,
2008 msg->msg_hdr.msg.put.match_bits,
2009 msg->msg_hdr.msg.put.offset,
2010 msg->msg_hdr.payload_length);
2011
2012 lnet_recv_put(msg->msg_rxpeer->lp_ni, msg);
2013 }
2014}
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060int
2061LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
2062 lnet_process_id_t target, unsigned int portal,
2063 __u64 match_bits, unsigned int offset,
2064 __u64 hdr_data)
2065{
2066 struct lnet_msg *msg;
2067 struct lnet_libmd *md;
2068 int cpt;
2069 int rc;
2070
2071 LASSERT(the_lnet.ln_init);
2072 LASSERT(the_lnet.ln_refcount > 0);
2073
2074 if (!list_empty(&the_lnet.ln_test_peers) &&
2075 fail_peer(target.nid, 1)) {
2076 CERROR("Dropping PUT to %s: simulated failure\n",
2077 libcfs_id2str(target));
2078 return -EIO;
2079 }
2080
2081 msg = lnet_msg_alloc();
2082 if (msg == NULL) {
2083 CERROR("Dropping PUT to %s: ENOMEM on lnet_msg_t\n",
2084 libcfs_id2str(target));
2085 return -ENOMEM;
2086 }
2087 msg->msg_vmflush = !!memory_pressure_get();
2088
2089 cpt = lnet_cpt_of_cookie(mdh.cookie);
2090 lnet_res_lock(cpt);
2091
2092 md = lnet_handle2md(&mdh);
2093 if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
2094 CERROR("Dropping PUT (%llu:%d:%s): MD (%d) invalid\n",
2095 match_bits, portal, libcfs_id2str(target),
2096 md == NULL ? -1 : md->md_threshold);
2097 if (md != NULL && md->md_me != NULL)
2098 CERROR("Source MD also attached to portal %d\n",
2099 md->md_me->me_portal);
2100 lnet_res_unlock(cpt);
2101
2102 lnet_msg_free(msg);
2103 return -ENOENT;
2104 }
2105
2106 CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
2107
2108 lnet_msg_attach_md(msg, md, 0, 0);
2109
2110 lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
2111
2112 msg->msg_hdr.msg.put.match_bits = cpu_to_le64(match_bits);
2113 msg->msg_hdr.msg.put.ptl_index = cpu_to_le32(portal);
2114 msg->msg_hdr.msg.put.offset = cpu_to_le32(offset);
2115 msg->msg_hdr.msg.put.hdr_data = hdr_data;
2116
2117
2118 if (ack == LNET_ACK_REQ) {
2119 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
2120 the_lnet.ln_interface_cookie;
2121 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
2122 md->md_lh.lh_cookie;
2123 } else {
2124 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
2125 LNET_WIRE_HANDLE_COOKIE_NONE;
2126 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
2127 LNET_WIRE_HANDLE_COOKIE_NONE;
2128 }
2129
2130 lnet_res_unlock(cpt);
2131
2132 lnet_build_msg_event(msg, LNET_EVENT_SEND);
2133
2134 rc = lnet_send(self, msg, LNET_NID_ANY);
2135 if (rc != 0) {
2136 CNETERR("Error sending PUT to %s: %d\n",
2137 libcfs_id2str(target), rc);
2138 lnet_finalize(NULL, msg, rc);
2139 }
2140
2141
2142 return 0;
2143}
2144EXPORT_SYMBOL(LNetPut);
2145
2146lnet_msg_t *
2147lnet_create_reply_msg(lnet_ni_t *ni, lnet_msg_t *getmsg)
2148{
2149
2150
2151
2152
2153
2154
2155
2156 struct lnet_msg *msg = lnet_msg_alloc();
2157 struct lnet_libmd *getmd = getmsg->msg_md;
2158 lnet_process_id_t peer_id = getmsg->msg_target;
2159 int cpt;
2160
2161 LASSERT(!getmsg->msg_target_is_router);
2162 LASSERT(!getmsg->msg_routing);
2163
2164 cpt = lnet_cpt_of_cookie(getmd->md_lh.lh_cookie);
2165 lnet_res_lock(cpt);
2166
2167 LASSERT(getmd->md_refcount > 0);
2168
2169 if (msg == NULL) {
2170 CERROR("%s: Dropping REPLY from %s: can't allocate msg\n",
2171 libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id));
2172 goto drop;
2173 }
2174
2175 if (getmd->md_threshold == 0) {
2176 CERROR("%s: Dropping REPLY from %s for inactive MD %p\n",
2177 libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id),
2178 getmd);
2179 lnet_res_unlock(cpt);
2180 goto drop;
2181 }
2182
2183 LASSERT(getmd->md_offset == 0);
2184
2185 CDEBUG(D_NET, "%s: Reply from %s md %p\n",
2186 libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
2187
2188
2189 msg->msg_from = peer_id.nid;
2190 msg->msg_type = LNET_MSG_GET;
2191 msg->msg_hdr.src_nid = peer_id.nid;
2192 msg->msg_hdr.payload_length = getmd->md_length;
2193 msg->msg_receiving = 1;
2194
2195 lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
2196 lnet_res_unlock(cpt);
2197
2198 cpt = lnet_cpt_of_nid(peer_id.nid);
2199
2200 lnet_net_lock(cpt);
2201 lnet_msg_commit(msg, cpt);
2202 lnet_net_unlock(cpt);
2203
2204 lnet_build_msg_event(msg, LNET_EVENT_REPLY);
2205
2206 return msg;
2207
2208 drop:
2209 cpt = lnet_cpt_of_nid(peer_id.nid);
2210
2211 lnet_net_lock(cpt);
2212 the_lnet.ln_counters[cpt]->drop_count++;
2213 the_lnet.ln_counters[cpt]->drop_length += getmd->md_length;
2214 lnet_net_unlock(cpt);
2215
2216 if (msg != NULL)
2217 lnet_msg_free(msg);
2218
2219 return NULL;
2220}
2221EXPORT_SYMBOL(lnet_create_reply_msg);
2222
2223void
2224lnet_set_reply_msg_len(lnet_ni_t *ni, lnet_msg_t *reply, unsigned int len)
2225{
2226
2227
2228 LASSERT(reply != NULL);
2229 LASSERT(reply->msg_type == LNET_MSG_GET);
2230 LASSERT(reply->msg_ev.type == LNET_EVENT_REPLY);
2231
2232
2233
2234 LASSERT(len <= reply->msg_ev.mlength);
2235
2236 reply->msg_ev.mlength = len;
2237}
2238EXPORT_SYMBOL(lnet_set_reply_msg_len);
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261int
2262LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
2263 lnet_process_id_t target, unsigned int portal,
2264 __u64 match_bits, unsigned int offset)
2265{
2266 struct lnet_msg *msg;
2267 struct lnet_libmd *md;
2268 int cpt;
2269 int rc;
2270
2271 LASSERT(the_lnet.ln_init);
2272 LASSERT(the_lnet.ln_refcount > 0);
2273
2274 if (!list_empty(&the_lnet.ln_test_peers) &&
2275 fail_peer(target.nid, 1)) {
2276 CERROR("Dropping GET to %s: simulated failure\n",
2277 libcfs_id2str(target));
2278 return -EIO;
2279 }
2280
2281 msg = lnet_msg_alloc();
2282 if (msg == NULL) {
2283 CERROR("Dropping GET to %s: ENOMEM on lnet_msg_t\n",
2284 libcfs_id2str(target));
2285 return -ENOMEM;
2286 }
2287
2288 cpt = lnet_cpt_of_cookie(mdh.cookie);
2289 lnet_res_lock(cpt);
2290
2291 md = lnet_handle2md(&mdh);
2292 if (md == NULL || md->md_threshold == 0 || md->md_me != NULL) {
2293 CERROR("Dropping GET (%llu:%d:%s): MD (%d) invalid\n",
2294 match_bits, portal, libcfs_id2str(target),
2295 md == NULL ? -1 : md->md_threshold);
2296 if (md != NULL && md->md_me != NULL)
2297 CERROR("REPLY MD also attached to portal %d\n",
2298 md->md_me->me_portal);
2299
2300 lnet_res_unlock(cpt);
2301
2302 lnet_msg_free(msg);
2303 return -ENOENT;
2304 }
2305
2306 CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
2307
2308 lnet_msg_attach_md(msg, md, 0, 0);
2309
2310 lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
2311
2312 msg->msg_hdr.msg.get.match_bits = cpu_to_le64(match_bits);
2313 msg->msg_hdr.msg.get.ptl_index = cpu_to_le32(portal);
2314 msg->msg_hdr.msg.get.src_offset = cpu_to_le32(offset);
2315 msg->msg_hdr.msg.get.sink_length = cpu_to_le32(md->md_length);
2316
2317
2318 msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie =
2319 the_lnet.ln_interface_cookie;
2320 msg->msg_hdr.msg.get.return_wmd.wh_object_cookie =
2321 md->md_lh.lh_cookie;
2322
2323 lnet_res_unlock(cpt);
2324
2325 lnet_build_msg_event(msg, LNET_EVENT_SEND);
2326
2327 rc = lnet_send(self, msg, LNET_NID_ANY);
2328 if (rc < 0) {
2329 CNETERR("Error sending GET to %s: %d\n",
2330 libcfs_id2str(target), rc);
2331 lnet_finalize(NULL, msg, rc);
2332 }
2333
2334
2335 return 0;
2336}
2337EXPORT_SYMBOL(LNetGet);
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353int
2354LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
2355{
2356 struct list_head *e;
2357 struct lnet_ni *ni;
2358 lnet_remotenet_t *rnet;
2359 __u32 dstnet = LNET_NIDNET(dstnid);
2360 int hops;
2361 int cpt;
2362 __u32 order = 2;
2363 struct list_head *rn_list;
2364
2365
2366
2367
2368
2369
2370 LASSERT(the_lnet.ln_init);
2371 LASSERT(the_lnet.ln_refcount > 0);
2372
2373 cpt = lnet_net_lock_current();
2374
2375 list_for_each(e, &the_lnet.ln_nis) {
2376 ni = list_entry(e, lnet_ni_t, ni_list);
2377
2378 if (ni->ni_nid == dstnid) {
2379 if (srcnidp != NULL)
2380 *srcnidp = dstnid;
2381 if (orderp != NULL) {
2382 if (LNET_NETTYP(LNET_NIDNET(dstnid)) == LOLND)
2383 *orderp = 0;
2384 else
2385 *orderp = 1;
2386 }
2387 lnet_net_unlock(cpt);
2388
2389 return local_nid_dist_zero ? 0 : 1;
2390 }
2391
2392 if (LNET_NIDNET(ni->ni_nid) == dstnet) {
2393 if (srcnidp != NULL)
2394 *srcnidp = ni->ni_nid;
2395 if (orderp != NULL)
2396 *orderp = order;
2397 lnet_net_unlock(cpt);
2398 return 1;
2399 }
2400
2401 order++;
2402 }
2403
2404 rn_list = lnet_net2rnethash(dstnet);
2405 list_for_each(e, rn_list) {
2406 rnet = list_entry(e, lnet_remotenet_t, lrn_list);
2407
2408 if (rnet->lrn_net == dstnet) {
2409 lnet_route_t *route;
2410 lnet_route_t *shortest = NULL;
2411
2412 LASSERT(!list_empty(&rnet->lrn_routes));
2413
2414 list_for_each_entry(route, &rnet->lrn_routes,
2415 lr_list) {
2416 if (shortest == NULL ||
2417 route->lr_hops < shortest->lr_hops)
2418 shortest = route;
2419 }
2420
2421 LASSERT(shortest != NULL);
2422 hops = shortest->lr_hops;
2423 if (srcnidp != NULL)
2424 *srcnidp = shortest->lr_gateway->lp_ni->ni_nid;
2425 if (orderp != NULL)
2426 *orderp = order;
2427 lnet_net_unlock(cpt);
2428 return hops + 1;
2429 }
2430 order++;
2431 }
2432
2433 lnet_net_unlock(cpt);
2434 return -EHOSTUNREACH;
2435}
2436EXPORT_SYMBOL(LNetDist);
2437