1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37#define DEBUG_SUBSYSTEM S_LNET
38
39#include "../../include/linux/lnet/lib-lnet.h"
40#include <linux/nsproxy.h>
41#include <net/net_namespace.h>
42
43static int local_nid_dist_zero = 1;
44module_param(local_nid_dist_zero, int, 0444);
45MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
46
47int
48lnet_fail_nid(lnet_nid_t nid, unsigned int threshold)
49{
50 lnet_test_peer_t *tp;
51 lnet_test_peer_t *temp;
52 struct list_head *el;
53 struct list_head *next;
54 struct list_head cull;
55
56
57 if (threshold) {
58
59 LIBCFS_ALLOC(tp, sizeof(*tp));
60 if (!tp)
61 return -ENOMEM;
62
63 tp->tp_nid = nid;
64 tp->tp_threshold = threshold;
65
66 lnet_net_lock(0);
67 list_add_tail(&tp->tp_list, &the_lnet.ln_test_peers);
68 lnet_net_unlock(0);
69 return 0;
70 }
71
72
73 INIT_LIST_HEAD(&cull);
74
75 lnet_net_lock(0);
76
77 list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
78 tp = list_entry(el, lnet_test_peer_t, tp_list);
79
80 if (!tp->tp_threshold ||
81 nid == LNET_NID_ANY ||
82 tp->tp_nid == nid) {
83 list_del(&tp->tp_list);
84 list_add(&tp->tp_list, &cull);
85 }
86 }
87
88 lnet_net_unlock(0);
89
90 list_for_each_entry_safe(tp, temp, &cull, tp_list) {
91 list_del(&tp->tp_list);
92 LIBCFS_FREE(tp, sizeof(*tp));
93 }
94 return 0;
95}
96
97static int
98fail_peer(lnet_nid_t nid, int outgoing)
99{
100 lnet_test_peer_t *tp;
101 lnet_test_peer_t *temp;
102 struct list_head *el;
103 struct list_head *next;
104 struct list_head cull;
105 int fail = 0;
106
107 INIT_LIST_HEAD(&cull);
108
109
110 lnet_net_lock(0);
111
112 list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
113 tp = list_entry(el, lnet_test_peer_t, tp_list);
114
115 if (!tp->tp_threshold) {
116
117 if (outgoing) {
118
119
120
121
122
123 list_del(&tp->tp_list);
124 list_add(&tp->tp_list, &cull);
125 }
126 continue;
127 }
128
129 if (tp->tp_nid == LNET_NID_ANY ||
130 nid == tp->tp_nid) {
131 fail = 1;
132
133 if (tp->tp_threshold != LNET_MD_THRESH_INF) {
134 tp->tp_threshold--;
135 if (outgoing &&
136 !tp->tp_threshold) {
137
138 list_del(&tp->tp_list);
139 list_add(&tp->tp_list, &cull);
140 }
141 }
142 break;
143 }
144 }
145
146 lnet_net_unlock(0);
147
148 list_for_each_entry_safe(tp, temp, &cull, tp_list) {
149 list_del(&tp->tp_list);
150
151 LIBCFS_FREE(tp, sizeof(*tp));
152 }
153
154 return fail;
155}
156
157unsigned int
158lnet_iov_nob(unsigned int niov, struct kvec *iov)
159{
160 unsigned int nob = 0;
161
162 LASSERT(!niov || iov);
163 while (niov-- > 0)
164 nob += (iov++)->iov_len;
165
166 return nob;
167}
168EXPORT_SYMBOL(lnet_iov_nob);
169
170void
171lnet_copy_iov2iter(struct iov_iter *to,
172 unsigned int nsiov, const struct kvec *siov,
173 unsigned int soffset, unsigned int nob)
174{
175
176 const char *s;
177 size_t left;
178
179 if (!nob)
180 return;
181
182
183 LASSERT(nsiov > 0);
184 while (soffset >= siov->iov_len) {
185 soffset -= siov->iov_len;
186 siov++;
187 nsiov--;
188 LASSERT(nsiov > 0);
189 }
190
191 s = (char *)siov->iov_base + soffset;
192 left = siov->iov_len - soffset;
193 do {
194 size_t n, copy = left;
195
196 LASSERT(nsiov > 0);
197
198 if (copy > nob)
199 copy = nob;
200 n = copy_to_iter(s, copy, to);
201 if (n != copy)
202 return;
203 nob -= n;
204
205 siov++;
206 s = (char *)siov->iov_base;
207 left = siov->iov_len;
208 nsiov--;
209 } while (nob > 0);
210}
211EXPORT_SYMBOL(lnet_copy_iov2iter);
212
213void
214lnet_copy_kiov2iter(struct iov_iter *to,
215 unsigned int nsiov, const lnet_kiov_t *siov,
216 unsigned int soffset, unsigned int nob)
217{
218 if (!nob)
219 return;
220
221 LASSERT(!in_interrupt());
222
223 LASSERT(nsiov > 0);
224 while (soffset >= siov->bv_len) {
225 soffset -= siov->bv_len;
226 siov++;
227 nsiov--;
228 LASSERT(nsiov > 0);
229 }
230
231 do {
232 size_t copy = siov->bv_len - soffset, n;
233
234 LASSERT(nsiov > 0);
235
236 if (copy > nob)
237 copy = nob;
238 n = copy_page_to_iter(siov->bv_page,
239 siov->bv_offset + soffset,
240 copy, to);
241 if (n != copy)
242 return;
243 nob -= n;
244 siov++;
245 nsiov--;
246 soffset = 0;
247 } while (nob > 0);
248}
249EXPORT_SYMBOL(lnet_copy_kiov2iter);
250
251int
252lnet_extract_iov(int dst_niov, struct kvec *dst,
253 int src_niov, const struct kvec *src,
254 unsigned int offset, unsigned int len)
255{
256
257
258
259
260
261 unsigned int frag_len;
262 unsigned int niov;
263
264 if (!len)
265 return 0;
266
267 LASSERT(src_niov > 0);
268 while (offset >= src->iov_len) {
269 offset -= src->iov_len;
270 src_niov--;
271 src++;
272 LASSERT(src_niov > 0);
273 }
274
275 niov = 1;
276 for (;;) {
277 LASSERT(src_niov > 0);
278 LASSERT((int)niov <= dst_niov);
279
280 frag_len = src->iov_len - offset;
281 dst->iov_base = ((char *)src->iov_base) + offset;
282
283 if (len <= frag_len) {
284 dst->iov_len = len;
285 return niov;
286 }
287
288 dst->iov_len = frag_len;
289
290 len -= frag_len;
291 dst++;
292 src++;
293 niov++;
294 src_niov--;
295 offset = 0;
296 }
297}
298EXPORT_SYMBOL(lnet_extract_iov);
299
300unsigned int
301lnet_kiov_nob(unsigned int niov, lnet_kiov_t *kiov)
302{
303 unsigned int nob = 0;
304
305 LASSERT(!niov || kiov);
306 while (niov-- > 0)
307 nob += (kiov++)->bv_len;
308
309 return nob;
310}
311EXPORT_SYMBOL(lnet_kiov_nob);
312
313int
314lnet_extract_kiov(int dst_niov, lnet_kiov_t *dst,
315 int src_niov, const lnet_kiov_t *src,
316 unsigned int offset, unsigned int len)
317{
318
319
320
321
322
323 unsigned int frag_len;
324 unsigned int niov;
325
326 if (!len)
327 return 0;
328
329 LASSERT(src_niov > 0);
330 while (offset >= src->bv_len) {
331 offset -= src->bv_len;
332 src_niov--;
333 src++;
334 LASSERT(src_niov > 0);
335 }
336
337 niov = 1;
338 for (;;) {
339 LASSERT(src_niov > 0);
340 LASSERT((int)niov <= dst_niov);
341
342 frag_len = src->bv_len - offset;
343 dst->bv_page = src->bv_page;
344 dst->bv_offset = src->bv_offset + offset;
345
346 if (len <= frag_len) {
347 dst->bv_len = len;
348 LASSERT(dst->bv_offset + dst->bv_len
349 <= PAGE_SIZE);
350 return niov;
351 }
352
353 dst->bv_len = frag_len;
354 LASSERT(dst->bv_offset + dst->bv_len <= PAGE_SIZE);
355
356 len -= frag_len;
357 dst++;
358 src++;
359 niov++;
360 src_niov--;
361 offset = 0;
362 }
363}
364EXPORT_SYMBOL(lnet_extract_kiov);
365
366void
367lnet_ni_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
368 unsigned int offset, unsigned int mlen, unsigned int rlen)
369{
370 unsigned int niov = 0;
371 struct kvec *iov = NULL;
372 lnet_kiov_t *kiov = NULL;
373 struct iov_iter to;
374 int rc;
375
376 LASSERT(!in_interrupt());
377 LASSERT(!mlen || msg);
378
379 if (msg) {
380 LASSERT(msg->msg_receiving);
381 LASSERT(!msg->msg_sending);
382 LASSERT(rlen == msg->msg_len);
383 LASSERT(mlen <= msg->msg_len);
384 LASSERT(msg->msg_offset == offset);
385 LASSERT(msg->msg_wanted == mlen);
386
387 msg->msg_receiving = 0;
388
389 if (mlen) {
390 niov = msg->msg_niov;
391 iov = msg->msg_iov;
392 kiov = msg->msg_kiov;
393
394 LASSERT(niov > 0);
395 LASSERT(!iov != !kiov);
396 }
397 }
398
399 if (iov) {
400 iov_iter_kvec(&to, ITER_KVEC | READ, iov, niov, mlen + offset);
401 iov_iter_advance(&to, offset);
402 } else {
403 iov_iter_bvec(&to, ITER_BVEC | READ, kiov, niov, mlen + offset);
404 iov_iter_advance(&to, offset);
405 }
406 rc = ni->ni_lnd->lnd_recv(ni, private, msg, delayed, &to, rlen);
407 if (rc < 0)
408 lnet_finalize(ni, msg, rc);
409}
410
411static void
412lnet_setpayloadbuffer(lnet_msg_t *msg)
413{
414 lnet_libmd_t *md = msg->msg_md;
415
416 LASSERT(msg->msg_len > 0);
417 LASSERT(!msg->msg_routing);
418 LASSERT(md);
419 LASSERT(!msg->msg_niov);
420 LASSERT(!msg->msg_iov);
421 LASSERT(!msg->msg_kiov);
422
423 msg->msg_niov = md->md_niov;
424 if (md->md_options & LNET_MD_KIOV)
425 msg->msg_kiov = md->md_iov.kiov;
426 else
427 msg->msg_iov = md->md_iov.iov;
428}
429
430void
431lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
432 unsigned int offset, unsigned int len)
433{
434 msg->msg_type = type;
435 msg->msg_target = target;
436 msg->msg_len = len;
437 msg->msg_offset = offset;
438
439 if (len)
440 lnet_setpayloadbuffer(msg);
441
442 memset(&msg->msg_hdr, 0, sizeof(msg->msg_hdr));
443 msg->msg_hdr.type = cpu_to_le32(type);
444 msg->msg_hdr.dest_nid = cpu_to_le64(target.nid);
445 msg->msg_hdr.dest_pid = cpu_to_le32(target.pid);
446
447 msg->msg_hdr.src_pid = cpu_to_le32(the_lnet.ln_pid);
448 msg->msg_hdr.payload_length = cpu_to_le32(len);
449}
450
451static void
452lnet_ni_send(lnet_ni_t *ni, lnet_msg_t *msg)
453{
454 void *priv = msg->msg_private;
455 int rc;
456
457 LASSERT(!in_interrupt());
458 LASSERT(LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
459 (msg->msg_txcredit && msg->msg_peertxcredit));
460
461 rc = ni->ni_lnd->lnd_send(ni, priv, msg);
462 if (rc < 0)
463 lnet_finalize(ni, msg, rc);
464}
465
466static int
467lnet_ni_eager_recv(lnet_ni_t *ni, lnet_msg_t *msg)
468{
469 int rc;
470
471 LASSERT(!msg->msg_sending);
472 LASSERT(msg->msg_receiving);
473 LASSERT(!msg->msg_rx_ready_delay);
474 LASSERT(ni->ni_lnd->lnd_eager_recv);
475
476 msg->msg_rx_ready_delay = 1;
477 rc = ni->ni_lnd->lnd_eager_recv(ni, msg->msg_private, msg,
478 &msg->msg_private);
479 if (rc) {
480 CERROR("recv from %s / send to %s aborted: eager_recv failed %d\n",
481 libcfs_nid2str(msg->msg_rxpeer->lp_nid),
482 libcfs_id2str(msg->msg_target), rc);
483 LASSERT(rc < 0);
484 }
485
486 return rc;
487}
488
489
490static void
491lnet_ni_query_locked(lnet_ni_t *ni, lnet_peer_t *lp)
492{
493 unsigned long last_alive = 0;
494
495 LASSERT(lnet_peer_aliveness_enabled(lp));
496 LASSERT(ni->ni_lnd->lnd_query);
497
498 lnet_net_unlock(lp->lp_cpt);
499 ni->ni_lnd->lnd_query(ni, lp->lp_nid, &last_alive);
500 lnet_net_lock(lp->lp_cpt);
501
502 lp->lp_last_query = cfs_time_current();
503
504 if (last_alive)
505 lp->lp_last_alive = last_alive;
506}
507
508
509static inline int
510lnet_peer_is_alive(lnet_peer_t *lp, unsigned long now)
511{
512 int alive;
513 unsigned long deadline;
514
515 LASSERT(lnet_peer_aliveness_enabled(lp));
516
517
518
519
520 if (!lp->lp_alive && lp->lp_alive_count > 0 &&
521 cfs_time_aftereq(lp->lp_timestamp, lp->lp_last_alive))
522 return 0;
523
524 deadline = cfs_time_add(lp->lp_last_alive,
525 cfs_time_seconds(lp->lp_ni->ni_peertimeout));
526 alive = cfs_time_after(deadline, now);
527
528
529
530
531
532 if (alive && !lp->lp_alive &&
533 !(lnet_isrouter(lp) && !lp->lp_alive_count))
534 lnet_notify_locked(lp, 0, 1, lp->lp_last_alive);
535
536 return alive;
537}
538
539
540
541
542
543static int
544lnet_peer_alive_locked(lnet_peer_t *lp)
545{
546 unsigned long now = cfs_time_current();
547
548 if (!lnet_peer_aliveness_enabled(lp))
549 return -ENODEV;
550
551 if (lnet_peer_is_alive(lp, now))
552 return 1;
553
554
555
556
557
558 if (lp->lp_last_query) {
559 static const int lnet_queryinterval = 1;
560
561 unsigned long next_query =
562 cfs_time_add(lp->lp_last_query,
563 cfs_time_seconds(lnet_queryinterval));
564
565 if (time_before(now, next_query)) {
566 if (lp->lp_alive)
567 CWARN("Unexpected aliveness of peer %s: %d < %d (%d/%d)\n",
568 libcfs_nid2str(lp->lp_nid),
569 (int)now, (int)next_query,
570 lnet_queryinterval,
571 lp->lp_ni->ni_peertimeout);
572 return 0;
573 }
574 }
575
576
577 lnet_ni_query_locked(lp->lp_ni, lp);
578
579 if (lnet_peer_is_alive(lp, now))
580 return 1;
581
582 lnet_notify_locked(lp, 0, 0, lp->lp_last_alive);
583 return 0;
584}
585
586
587
588
589
590
591
592
593
594
595
596
597static int
598lnet_post_send_locked(lnet_msg_t *msg, int do_send)
599{
600 lnet_peer_t *lp = msg->msg_txpeer;
601 lnet_ni_t *ni = lp->lp_ni;
602 int cpt = msg->msg_tx_cpt;
603 struct lnet_tx_queue *tq = ni->ni_tx_queues[cpt];
604
605
606 LASSERT(!do_send || msg->msg_tx_delayed);
607 LASSERT(!msg->msg_receiving);
608 LASSERT(msg->msg_tx_committed);
609
610
611 if (!(msg->msg_target.pid & LNET_PID_USERFLAG) &&
612 !lnet_peer_alive_locked(lp)) {
613 the_lnet.ln_counters[cpt]->drop_count++;
614 the_lnet.ln_counters[cpt]->drop_length += msg->msg_len;
615 lnet_net_unlock(cpt);
616
617 CNETERR("Dropping message for %s: peer not alive\n",
618 libcfs_id2str(msg->msg_target));
619 if (do_send)
620 lnet_finalize(ni, msg, -EHOSTUNREACH);
621
622 lnet_net_lock(cpt);
623 return -EHOSTUNREACH;
624 }
625
626 if (msg->msg_md &&
627 (msg->msg_md->md_flags & LNET_MD_FLAG_ABORTED)) {
628 lnet_net_unlock(cpt);
629
630 CNETERR("Aborting message for %s: LNetM[DE]Unlink() already called on the MD/ME.\n",
631 libcfs_id2str(msg->msg_target));
632 if (do_send)
633 lnet_finalize(ni, msg, -ECANCELED);
634
635 lnet_net_lock(cpt);
636 return -ECANCELED;
637 }
638
639 if (!msg->msg_peertxcredit) {
640 LASSERT((lp->lp_txcredits < 0) ==
641 !list_empty(&lp->lp_txq));
642
643 msg->msg_peertxcredit = 1;
644 lp->lp_txqnob += msg->msg_len + sizeof(lnet_hdr_t);
645 lp->lp_txcredits--;
646
647 if (lp->lp_txcredits < lp->lp_mintxcredits)
648 lp->lp_mintxcredits = lp->lp_txcredits;
649
650 if (lp->lp_txcredits < 0) {
651 msg->msg_tx_delayed = 1;
652 list_add_tail(&msg->msg_list, &lp->lp_txq);
653 return LNET_CREDIT_WAIT;
654 }
655 }
656
657 if (!msg->msg_txcredit) {
658 LASSERT((tq->tq_credits < 0) ==
659 !list_empty(&tq->tq_delayed));
660
661 msg->msg_txcredit = 1;
662 tq->tq_credits--;
663
664 if (tq->tq_credits < tq->tq_credits_min)
665 tq->tq_credits_min = tq->tq_credits;
666
667 if (tq->tq_credits < 0) {
668 msg->msg_tx_delayed = 1;
669 list_add_tail(&msg->msg_list, &tq->tq_delayed);
670 return LNET_CREDIT_WAIT;
671 }
672 }
673
674 if (do_send) {
675 lnet_net_unlock(cpt);
676 lnet_ni_send(ni, msg);
677 lnet_net_lock(cpt);
678 }
679 return LNET_CREDIT_OK;
680}
681
682static lnet_rtrbufpool_t *
683lnet_msg2bufpool(lnet_msg_t *msg)
684{
685 lnet_rtrbufpool_t *rbp;
686 int cpt;
687
688 LASSERT(msg->msg_rx_committed);
689
690 cpt = msg->msg_rx_cpt;
691 rbp = &the_lnet.ln_rtrpools[cpt][0];
692
693 LASSERT(msg->msg_len <= LNET_MTU);
694 while (msg->msg_len > (unsigned int)rbp->rbp_npages * PAGE_SIZE) {
695 rbp++;
696 LASSERT(rbp < &the_lnet.ln_rtrpools[cpt][LNET_NRBPOOLS]);
697 }
698
699 return rbp;
700}
701
702static int
703lnet_post_routed_recv_locked(lnet_msg_t *msg, int do_recv)
704{
705
706
707
708
709
710
711 lnet_peer_t *lp = msg->msg_rxpeer;
712 lnet_rtrbufpool_t *rbp;
713 lnet_rtrbuf_t *rb;
714
715 LASSERT(!msg->msg_iov);
716 LASSERT(!msg->msg_kiov);
717 LASSERT(!msg->msg_niov);
718 LASSERT(msg->msg_routing);
719 LASSERT(msg->msg_receiving);
720 LASSERT(!msg->msg_sending);
721
722
723 LASSERT(!do_recv || msg->msg_rx_delayed);
724
725 if (!msg->msg_peerrtrcredit) {
726 LASSERT((lp->lp_rtrcredits < 0) ==
727 !list_empty(&lp->lp_rtrq));
728
729 msg->msg_peerrtrcredit = 1;
730 lp->lp_rtrcredits--;
731 if (lp->lp_rtrcredits < lp->lp_minrtrcredits)
732 lp->lp_minrtrcredits = lp->lp_rtrcredits;
733
734 if (lp->lp_rtrcredits < 0) {
735
736 LASSERT(msg->msg_rx_ready_delay);
737 msg->msg_rx_delayed = 1;
738 list_add_tail(&msg->msg_list, &lp->lp_rtrq);
739 return LNET_CREDIT_WAIT;
740 }
741 }
742
743 rbp = lnet_msg2bufpool(msg);
744
745 if (!msg->msg_rtrcredit) {
746 msg->msg_rtrcredit = 1;
747 rbp->rbp_credits--;
748 if (rbp->rbp_credits < rbp->rbp_mincredits)
749 rbp->rbp_mincredits = rbp->rbp_credits;
750
751 if (rbp->rbp_credits < 0) {
752
753 LASSERT(msg->msg_rx_ready_delay);
754 msg->msg_rx_delayed = 1;
755 list_add_tail(&msg->msg_list, &rbp->rbp_msgs);
756 return LNET_CREDIT_WAIT;
757 }
758 }
759
760 LASSERT(!list_empty(&rbp->rbp_bufs));
761 rb = list_entry(rbp->rbp_bufs.next, lnet_rtrbuf_t, rb_list);
762 list_del(&rb->rb_list);
763
764 msg->msg_niov = rbp->rbp_npages;
765 msg->msg_kiov = &rb->rb_kiov[0];
766
767 if (do_recv) {
768 int cpt = msg->msg_rx_cpt;
769
770 lnet_net_unlock(cpt);
771 lnet_ni_recv(lp->lp_ni, msg->msg_private, msg, 1,
772 0, msg->msg_len, msg->msg_len);
773 lnet_net_lock(cpt);
774 }
775 return LNET_CREDIT_OK;
776}
777
778void
779lnet_return_tx_credits_locked(lnet_msg_t *msg)
780{
781 lnet_peer_t *txpeer = msg->msg_txpeer;
782 lnet_msg_t *msg2;
783
784 if (msg->msg_txcredit) {
785 struct lnet_ni *ni = txpeer->lp_ni;
786 struct lnet_tx_queue *tq = ni->ni_tx_queues[msg->msg_tx_cpt];
787
788
789 msg->msg_txcredit = 0;
790
791 LASSERT((tq->tq_credits < 0) ==
792 !list_empty(&tq->tq_delayed));
793
794 tq->tq_credits++;
795 if (tq->tq_credits <= 0) {
796 msg2 = list_entry(tq->tq_delayed.next,
797 lnet_msg_t, msg_list);
798 list_del(&msg2->msg_list);
799
800 LASSERT(msg2->msg_txpeer->lp_ni == ni);
801 LASSERT(msg2->msg_tx_delayed);
802
803 (void)lnet_post_send_locked(msg2, 1);
804 }
805 }
806
807 if (msg->msg_peertxcredit) {
808
809 msg->msg_peertxcredit = 0;
810
811 LASSERT((txpeer->lp_txcredits < 0) ==
812 !list_empty(&txpeer->lp_txq));
813
814 txpeer->lp_txqnob -= msg->msg_len + sizeof(lnet_hdr_t);
815 LASSERT(txpeer->lp_txqnob >= 0);
816
817 txpeer->lp_txcredits++;
818 if (txpeer->lp_txcredits <= 0) {
819 msg2 = list_entry(txpeer->lp_txq.next,
820 lnet_msg_t, msg_list);
821 list_del(&msg2->msg_list);
822
823 LASSERT(msg2->msg_txpeer == txpeer);
824 LASSERT(msg2->msg_tx_delayed);
825
826 (void)lnet_post_send_locked(msg2, 1);
827 }
828 }
829
830 if (txpeer) {
831 msg->msg_txpeer = NULL;
832 lnet_peer_decref_locked(txpeer);
833 }
834}
835
836void
837lnet_schedule_blocked_locked(lnet_rtrbufpool_t *rbp)
838{
839 lnet_msg_t *msg;
840
841 if (list_empty(&rbp->rbp_msgs))
842 return;
843 msg = list_entry(rbp->rbp_msgs.next,
844 lnet_msg_t, msg_list);
845 list_del(&msg->msg_list);
846
847 (void)lnet_post_routed_recv_locked(msg, 1);
848}
849
850void
851lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
852{
853 struct list_head drop;
854 lnet_msg_t *msg;
855 lnet_msg_t *tmp;
856
857 INIT_LIST_HEAD(&drop);
858
859 list_splice_init(list, &drop);
860
861 lnet_net_unlock(cpt);
862
863 list_for_each_entry_safe(msg, tmp, &drop, msg_list) {
864 lnet_ni_recv(msg->msg_rxpeer->lp_ni, msg->msg_private, NULL,
865 0, 0, 0, msg->msg_hdr.payload_length);
866 list_del_init(&msg->msg_list);
867 lnet_finalize(NULL, msg, -ECANCELED);
868 }
869
870 lnet_net_lock(cpt);
871}
872
873void
874lnet_return_rx_credits_locked(lnet_msg_t *msg)
875{
876 lnet_peer_t *rxpeer = msg->msg_rxpeer;
877 lnet_msg_t *msg2;
878
879 if (msg->msg_rtrcredit) {
880
881 lnet_rtrbuf_t *rb;
882 lnet_rtrbufpool_t *rbp;
883
884
885
886
887
888
889 LASSERT(msg->msg_kiov);
890
891 rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]);
892 rbp = rb->rb_pool;
893
894 msg->msg_kiov = NULL;
895 msg->msg_rtrcredit = 0;
896
897 LASSERT(rbp == lnet_msg2bufpool(msg));
898
899 LASSERT((rbp->rbp_credits > 0) ==
900 !list_empty(&rbp->rbp_bufs));
901
902
903
904
905
906 if (!the_lnet.ln_routing) {
907 lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
908 goto routing_off;
909 }
910
911
912
913
914
915
916 if (unlikely(rbp->rbp_credits >= rbp->rbp_req_nbuffers)) {
917
918 lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
919 rbp->rbp_nbuffers--;
920 } else {
921 list_add(&rb->rb_list, &rbp->rbp_bufs);
922 rbp->rbp_credits++;
923 if (rbp->rbp_credits <= 0)
924 lnet_schedule_blocked_locked(rbp);
925 }
926 }
927
928routing_off:
929 if (msg->msg_peerrtrcredit) {
930
931 msg->msg_peerrtrcredit = 0;
932
933 LASSERT((rxpeer->lp_rtrcredits < 0) ==
934 !list_empty(&rxpeer->lp_rtrq));
935
936 rxpeer->lp_rtrcredits++;
937
938
939
940
941 if (!the_lnet.ln_routing) {
942 lnet_drop_routed_msgs_locked(&rxpeer->lp_rtrq,
943 msg->msg_rx_cpt);
944 } else if (rxpeer->lp_rtrcredits <= 0) {
945 msg2 = list_entry(rxpeer->lp_rtrq.next,
946 lnet_msg_t, msg_list);
947 list_del(&msg2->msg_list);
948
949 (void)lnet_post_routed_recv_locked(msg2, 1);
950 }
951 }
952 if (rxpeer) {
953 msg->msg_rxpeer = NULL;
954 lnet_peer_decref_locked(rxpeer);
955 }
956}
957
958static int
959lnet_compare_routes(lnet_route_t *r1, lnet_route_t *r2)
960{
961 lnet_peer_t *p1 = r1->lr_gateway;
962 lnet_peer_t *p2 = r2->lr_gateway;
963 int r1_hops = (r1->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r1->lr_hops;
964 int r2_hops = (r2->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r2->lr_hops;
965
966 if (r1->lr_priority < r2->lr_priority)
967 return 1;
968
969 if (r1->lr_priority > r2->lr_priority)
970 return -ERANGE;
971
972 if (r1_hops < r2_hops)
973 return 1;
974
975 if (r1_hops > r2_hops)
976 return -ERANGE;
977
978 if (p1->lp_txqnob < p2->lp_txqnob)
979 return 1;
980
981 if (p1->lp_txqnob > p2->lp_txqnob)
982 return -ERANGE;
983
984 if (p1->lp_txcredits > p2->lp_txcredits)
985 return 1;
986
987 if (p1->lp_txcredits < p2->lp_txcredits)
988 return -ERANGE;
989
990 if (r1->lr_seq - r2->lr_seq <= 0)
991 return 1;
992
993 return -ERANGE;
994}
995
996static lnet_peer_t *
997lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target, lnet_nid_t rtr_nid)
998{
999 lnet_remotenet_t *rnet;
1000 lnet_route_t *route;
1001 lnet_route_t *best_route;
1002 lnet_route_t *last_route;
1003 struct lnet_peer *lp_best;
1004 struct lnet_peer *lp;
1005 int rc;
1006
1007
1008
1009
1010
1011 rnet = lnet_find_net_locked(LNET_NIDNET(target));
1012 if (!rnet)
1013 return NULL;
1014
1015 lp_best = NULL;
1016 best_route = NULL;
1017 last_route = NULL;
1018 list_for_each_entry(route, &rnet->lrn_routes, lr_list) {
1019 lp = route->lr_gateway;
1020
1021 if (!lnet_is_route_alive(route))
1022 continue;
1023
1024 if (ni && lp->lp_ni != ni)
1025 continue;
1026
1027 if (lp->lp_nid == rtr_nid)
1028 return lp;
1029
1030 if (!lp_best) {
1031 best_route = route;
1032 last_route = route;
1033 lp_best = lp;
1034 continue;
1035 }
1036
1037
1038 if (last_route->lr_seq - route->lr_seq < 0)
1039 last_route = route;
1040
1041 rc = lnet_compare_routes(route, best_route);
1042 if (rc < 0)
1043 continue;
1044
1045 best_route = route;
1046 lp_best = lp;
1047 }
1048
1049
1050
1051
1052
1053
1054 if (best_route)
1055 best_route->lr_seq = last_route->lr_seq + 1;
1056 return lp_best;
1057}
1058
1059int
1060lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
1061{
1062 lnet_nid_t dst_nid = msg->msg_target.nid;
1063 struct lnet_ni *src_ni;
1064 struct lnet_ni *local_ni;
1065 struct lnet_peer *lp;
1066 int cpt;
1067 int cpt2;
1068 int rc;
1069
1070
1071
1072
1073
1074
1075
1076 LASSERT(!msg->msg_txpeer);
1077 LASSERT(!msg->msg_sending);
1078 LASSERT(!msg->msg_target_is_router);
1079 LASSERT(!msg->msg_receiving);
1080
1081 msg->msg_sending = 1;
1082
1083 LASSERT(!msg->msg_tx_committed);
1084 cpt = lnet_cpt_of_nid(rtr_nid == LNET_NID_ANY ? dst_nid : rtr_nid);
1085 again:
1086 lnet_net_lock(cpt);
1087
1088 if (the_lnet.ln_shutdown) {
1089 lnet_net_unlock(cpt);
1090 return -ESHUTDOWN;
1091 }
1092
1093 if (src_nid == LNET_NID_ANY) {
1094 src_ni = NULL;
1095 } else {
1096 src_ni = lnet_nid2ni_locked(src_nid, cpt);
1097 if (!src_ni) {
1098 lnet_net_unlock(cpt);
1099 LCONSOLE_WARN("Can't send to %s: src %s is not a local nid\n",
1100 libcfs_nid2str(dst_nid),
1101 libcfs_nid2str(src_nid));
1102 return -EINVAL;
1103 }
1104 LASSERT(!msg->msg_routing);
1105 }
1106
1107
1108 local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid), cpt);
1109
1110 if (local_ni) {
1111 if (!src_ni) {
1112 src_ni = local_ni;
1113 src_nid = src_ni->ni_nid;
1114 } else if (src_ni == local_ni) {
1115 lnet_ni_decref_locked(local_ni, cpt);
1116 } else {
1117 lnet_ni_decref_locked(local_ni, cpt);
1118 lnet_ni_decref_locked(src_ni, cpt);
1119 lnet_net_unlock(cpt);
1120 LCONSOLE_WARN("No route to %s via from %s\n",
1121 libcfs_nid2str(dst_nid),
1122 libcfs_nid2str(src_nid));
1123 return -EINVAL;
1124 }
1125
1126 LASSERT(src_nid != LNET_NID_ANY);
1127 lnet_msg_commit(msg, cpt);
1128
1129 if (!msg->msg_routing)
1130 msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1131
1132 if (src_ni == the_lnet.ln_loni) {
1133
1134 lnet_net_unlock(cpt);
1135 lnet_ni_send(src_ni, msg);
1136
1137 lnet_net_lock(cpt);
1138 lnet_ni_decref_locked(src_ni, cpt);
1139 lnet_net_unlock(cpt);
1140 return 0;
1141 }
1142
1143 rc = lnet_nid2peer_locked(&lp, dst_nid, cpt);
1144
1145 lnet_ni_decref_locked(src_ni, cpt);
1146 if (rc) {
1147 lnet_net_unlock(cpt);
1148 LCONSOLE_WARN("Error %d finding peer %s\n", rc,
1149 libcfs_nid2str(dst_nid));
1150
1151 return rc;
1152 }
1153 LASSERT(lp->lp_ni == src_ni);
1154 } else {
1155
1156 lp = lnet_find_route_locked(src_ni, dst_nid, rtr_nid);
1157 if (!lp) {
1158 if (src_ni)
1159 lnet_ni_decref_locked(src_ni, cpt);
1160 lnet_net_unlock(cpt);
1161
1162 LCONSOLE_WARN("No route to %s via %s (all routers down)\n",
1163 libcfs_id2str(msg->msg_target),
1164 libcfs_nid2str(src_nid));
1165 return -EHOSTUNREACH;
1166 }
1167
1168
1169
1170
1171
1172
1173
1174 if (rtr_nid != lp->lp_nid) {
1175 cpt2 = lnet_cpt_of_nid_locked(lp->lp_nid);
1176 if (cpt2 != cpt) {
1177 if (src_ni)
1178 lnet_ni_decref_locked(src_ni, cpt);
1179 lnet_net_unlock(cpt);
1180
1181 rtr_nid = lp->lp_nid;
1182 cpt = cpt2;
1183 goto again;
1184 }
1185 }
1186
1187 CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
1188 libcfs_nid2str(dst_nid), libcfs_nid2str(lp->lp_nid),
1189 lnet_msgtyp2str(msg->msg_type), msg->msg_len);
1190
1191 if (!src_ni) {
1192 src_ni = lp->lp_ni;
1193 src_nid = src_ni->ni_nid;
1194 } else {
1195 LASSERT(src_ni == lp->lp_ni);
1196 lnet_ni_decref_locked(src_ni, cpt);
1197 }
1198
1199 lnet_peer_addref_locked(lp);
1200
1201 LASSERT(src_nid != LNET_NID_ANY);
1202 lnet_msg_commit(msg, cpt);
1203
1204 if (!msg->msg_routing) {
1205
1206 msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1207 }
1208
1209 msg->msg_target_is_router = 1;
1210 msg->msg_target.nid = lp->lp_nid;
1211 msg->msg_target.pid = LNET_PID_LUSTRE;
1212 }
1213
1214
1215
1216 LASSERT(!msg->msg_peertxcredit);
1217 LASSERT(!msg->msg_txcredit);
1218 LASSERT(!msg->msg_txpeer);
1219
1220 msg->msg_txpeer = lp;
1221
1222 rc = lnet_post_send_locked(msg, 0);
1223 lnet_net_unlock(cpt);
1224
1225 if (rc < 0)
1226 return rc;
1227
1228 if (rc == LNET_CREDIT_OK)
1229 lnet_ni_send(src_ni, msg);
1230
1231 return 0;
1232}
1233
1234void
1235lnet_drop_message(lnet_ni_t *ni, int cpt, void *private, unsigned int nob)
1236{
1237 lnet_net_lock(cpt);
1238 the_lnet.ln_counters[cpt]->drop_count++;
1239 the_lnet.ln_counters[cpt]->drop_length += nob;
1240 lnet_net_unlock(cpt);
1241
1242 lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
1243}
1244
1245static void
1246lnet_recv_put(lnet_ni_t *ni, lnet_msg_t *msg)
1247{
1248 lnet_hdr_t *hdr = &msg->msg_hdr;
1249
1250 if (msg->msg_wanted)
1251 lnet_setpayloadbuffer(msg);
1252
1253 lnet_build_msg_event(msg, LNET_EVENT_PUT);
1254
1255
1256
1257
1258
1259 msg->msg_ack = !lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
1260 !(msg->msg_md->md_options & LNET_MD_ACK_DISABLE);
1261
1262 lnet_ni_recv(ni, msg->msg_private, msg, msg->msg_rx_delayed,
1263 msg->msg_offset, msg->msg_wanted, hdr->payload_length);
1264}
1265
1266static int
1267lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
1268{
1269 lnet_hdr_t *hdr = &msg->msg_hdr;
1270 struct lnet_match_info info;
1271 bool ready_delay;
1272 int rc;
1273
1274
1275 hdr->msg.put.match_bits = le64_to_cpu(hdr->msg.put.match_bits);
1276 hdr->msg.put.ptl_index = le32_to_cpu(hdr->msg.put.ptl_index);
1277 hdr->msg.put.offset = le32_to_cpu(hdr->msg.put.offset);
1278
1279 info.mi_id.nid = hdr->src_nid;
1280 info.mi_id.pid = hdr->src_pid;
1281 info.mi_opc = LNET_MD_OP_PUT;
1282 info.mi_portal = hdr->msg.put.ptl_index;
1283 info.mi_rlength = hdr->payload_length;
1284 info.mi_roffset = hdr->msg.put.offset;
1285 info.mi_mbits = hdr->msg.put.match_bits;
1286
1287 msg->msg_rx_ready_delay = !ni->ni_lnd->lnd_eager_recv;
1288 ready_delay = msg->msg_rx_ready_delay;
1289
1290 again:
1291 rc = lnet_ptl_match_md(&info, msg);
1292 switch (rc) {
1293 default:
1294 LBUG();
1295
1296 case LNET_MATCHMD_OK:
1297 lnet_recv_put(ni, msg);
1298 return 0;
1299
1300 case LNET_MATCHMD_NONE:
1301
1302
1303
1304
1305 if (ready_delay)
1306 return 0;
1307
1308 rc = lnet_ni_eager_recv(ni, msg);
1309 if (!rc) {
1310 ready_delay = true;
1311 goto again;
1312 }
1313
1314
1315 case LNET_MATCHMD_DROP:
1316 CNETERR("Dropping PUT from %s portal %d match %llu offset %d length %d: %d\n",
1317 libcfs_id2str(info.mi_id), info.mi_portal,
1318 info.mi_mbits, info.mi_roffset, info.mi_rlength, rc);
1319
1320 return -ENOENT;
1321 }
1322}
1323
1324static int
1325lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
1326{
1327 struct lnet_match_info info;
1328 lnet_hdr_t *hdr = &msg->msg_hdr;
1329 lnet_handle_wire_t reply_wmd;
1330 int rc;
1331
1332
1333 hdr->msg.get.match_bits = le64_to_cpu(hdr->msg.get.match_bits);
1334 hdr->msg.get.ptl_index = le32_to_cpu(hdr->msg.get.ptl_index);
1335 hdr->msg.get.sink_length = le32_to_cpu(hdr->msg.get.sink_length);
1336 hdr->msg.get.src_offset = le32_to_cpu(hdr->msg.get.src_offset);
1337
1338 info.mi_id.nid = hdr->src_nid;
1339 info.mi_id.pid = hdr->src_pid;
1340 info.mi_opc = LNET_MD_OP_GET;
1341 info.mi_portal = hdr->msg.get.ptl_index;
1342 info.mi_rlength = hdr->msg.get.sink_length;
1343 info.mi_roffset = hdr->msg.get.src_offset;
1344 info.mi_mbits = hdr->msg.get.match_bits;
1345
1346 rc = lnet_ptl_match_md(&info, msg);
1347 if (rc == LNET_MATCHMD_DROP) {
1348 CNETERR("Dropping GET from %s portal %d match %llu offset %d length %d\n",
1349 libcfs_id2str(info.mi_id), info.mi_portal,
1350 info.mi_mbits, info.mi_roffset, info.mi_rlength);
1351 return -ENOENT;
1352 }
1353
1354 LASSERT(rc == LNET_MATCHMD_OK);
1355
1356 lnet_build_msg_event(msg, LNET_EVENT_GET);
1357
1358 reply_wmd = hdr->msg.get.return_wmd;
1359
1360 lnet_prep_send(msg, LNET_MSG_REPLY, info.mi_id,
1361 msg->msg_offset, msg->msg_wanted);
1362
1363 msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
1364
1365 if (rdma_get) {
1366
1367 lnet_ni_recv(ni, msg->msg_private, msg, 0,
1368 msg->msg_offset, msg->msg_len, msg->msg_len);
1369 return 0;
1370 }
1371
1372 lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
1373 msg->msg_receiving = 0;
1374
1375 rc = lnet_send(ni->ni_nid, msg, LNET_NID_ANY);
1376 if (rc < 0) {
1377
1378 CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
1379 libcfs_nid2str(ni->ni_nid),
1380 libcfs_id2str(info.mi_id), rc);
1381
1382 lnet_finalize(ni, msg, rc);
1383 }
1384
1385 return 0;
1386}
1387
1388static int
1389lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
1390{
1391 void *private = msg->msg_private;
1392 lnet_hdr_t *hdr = &msg->msg_hdr;
1393 lnet_process_id_t src = {0};
1394 lnet_libmd_t *md;
1395 int rlength;
1396 int mlength;
1397 int cpt;
1398
1399 cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie);
1400 lnet_res_lock(cpt);
1401
1402 src.nid = hdr->src_nid;
1403 src.pid = hdr->src_pid;
1404
1405
1406 md = lnet_wire_handle2md(&hdr->msg.reply.dst_wmd);
1407 if (!md || !md->md_threshold || md->md_me) {
1408 CNETERR("%s: Dropping REPLY from %s for %s MD %#llx.%#llx\n",
1409 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1410 !md ? "invalid" : "inactive",
1411 hdr->msg.reply.dst_wmd.wh_interface_cookie,
1412 hdr->msg.reply.dst_wmd.wh_object_cookie);
1413 if (md && md->md_me)
1414 CERROR("REPLY MD also attached to portal %d\n",
1415 md->md_me->me_portal);
1416
1417 lnet_res_unlock(cpt);
1418 return -ENOENT;
1419 }
1420
1421 LASSERT(!md->md_offset);
1422
1423 rlength = hdr->payload_length;
1424 mlength = min_t(uint, rlength, md->md_length);
1425
1426 if (mlength < rlength &&
1427 !(md->md_options & LNET_MD_TRUNCATE)) {
1428 CNETERR("%s: Dropping REPLY from %s length %d for MD %#llx would overflow (%d)\n",
1429 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1430 rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
1431 mlength);
1432 lnet_res_unlock(cpt);
1433 return -ENOENT;
1434 }
1435
1436 CDEBUG(D_NET, "%s: Reply from %s of length %d/%d into md %#llx\n",
1437 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1438 mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
1439
1440 lnet_msg_attach_md(msg, md, 0, mlength);
1441
1442 if (mlength)
1443 lnet_setpayloadbuffer(msg);
1444
1445 lnet_res_unlock(cpt);
1446
1447 lnet_build_msg_event(msg, LNET_EVENT_REPLY);
1448
1449 lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
1450 return 0;
1451}
1452
1453static int
1454lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
1455{
1456 lnet_hdr_t *hdr = &msg->msg_hdr;
1457 lnet_process_id_t src = {0};
1458 lnet_libmd_t *md;
1459 int cpt;
1460
1461 src.nid = hdr->src_nid;
1462 src.pid = hdr->src_pid;
1463
1464
1465 hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
1466 hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
1467
1468 cpt = lnet_cpt_of_cookie(hdr->msg.ack.dst_wmd.wh_object_cookie);
1469 lnet_res_lock(cpt);
1470
1471
1472 md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
1473 if (!md || !md->md_threshold || md->md_me) {
1474
1475 CDEBUG(D_NET,
1476 "%s: Dropping ACK from %s to %s MD %#llx.%#llx\n",
1477 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1478 !md ? "invalid" : "inactive",
1479 hdr->msg.ack.dst_wmd.wh_interface_cookie,
1480 hdr->msg.ack.dst_wmd.wh_object_cookie);
1481 if (md && md->md_me)
1482 CERROR("Source MD also attached to portal %d\n",
1483 md->md_me->me_portal);
1484
1485 lnet_res_unlock(cpt);
1486 return -ENOENT;
1487 }
1488
1489 CDEBUG(D_NET, "%s: ACK from %s into md %#llx\n",
1490 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1491 hdr->msg.ack.dst_wmd.wh_object_cookie);
1492
1493 lnet_msg_attach_md(msg, md, 0, 0);
1494
1495 lnet_res_unlock(cpt);
1496
1497 lnet_build_msg_event(msg, LNET_EVENT_ACK);
1498
1499 lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
1500 return 0;
1501}
1502
1503
1504
1505
1506
1507
1508int
1509lnet_parse_forward_locked(lnet_ni_t *ni, lnet_msg_t *msg)
1510{
1511 int rc = 0;
1512
1513 if (!the_lnet.ln_routing)
1514 return -ECANCELED;
1515
1516 if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
1517 lnet_msg2bufpool(msg)->rbp_credits <= 0) {
1518 if (!ni->ni_lnd->lnd_eager_recv) {
1519 msg->msg_rx_ready_delay = 1;
1520 } else {
1521 lnet_net_unlock(msg->msg_rx_cpt);
1522 rc = lnet_ni_eager_recv(ni, msg);
1523 lnet_net_lock(msg->msg_rx_cpt);
1524 }
1525 }
1526
1527 if (!rc)
1528 rc = lnet_post_routed_recv_locked(msg, 0);
1529 return rc;
1530}
1531
1532int
1533lnet_parse_local(lnet_ni_t *ni, lnet_msg_t *msg)
1534{
1535 int rc;
1536
1537 switch (msg->msg_type) {
1538 case LNET_MSG_ACK:
1539 rc = lnet_parse_ack(ni, msg);
1540 break;
1541 case LNET_MSG_PUT:
1542 rc = lnet_parse_put(ni, msg);
1543 break;
1544 case LNET_MSG_GET:
1545 rc = lnet_parse_get(ni, msg, msg->msg_rdma_get);
1546 break;
1547 case LNET_MSG_REPLY:
1548 rc = lnet_parse_reply(ni, msg);
1549 break;
1550 default:
1551 LASSERT(0);
1552 return -EPROTO;
1553 }
1554
1555 LASSERT(!rc || rc == -ENOENT);
1556 return rc;
1557}
1558
1559char *
1560lnet_msgtyp2str(int type)
1561{
1562 switch (type) {
1563 case LNET_MSG_ACK:
1564 return "ACK";
1565 case LNET_MSG_PUT:
1566 return "PUT";
1567 case LNET_MSG_GET:
1568 return "GET";
1569 case LNET_MSG_REPLY:
1570 return "REPLY";
1571 case LNET_MSG_HELLO:
1572 return "HELLO";
1573 default:
1574 return "<UNKNOWN>";
1575 }
1576}
1577
1578void
1579lnet_print_hdr(lnet_hdr_t *hdr)
1580{
1581 lnet_process_id_t src = {0};
1582 lnet_process_id_t dst = {0};
1583 char *type_str = lnet_msgtyp2str(hdr->type);
1584
1585 src.nid = hdr->src_nid;
1586 src.pid = hdr->src_pid;
1587
1588 dst.nid = hdr->dest_nid;
1589 dst.pid = hdr->dest_pid;
1590
1591 CWARN("P3 Header at %p of type %s\n", hdr, type_str);
1592 CWARN(" From %s\n", libcfs_id2str(src));
1593 CWARN(" To %s\n", libcfs_id2str(dst));
1594
1595 switch (hdr->type) {
1596 default:
1597 break;
1598
1599 case LNET_MSG_PUT:
1600 CWARN(" Ptl index %d, ack md %#llx.%#llx, match bits %llu\n",
1601 hdr->msg.put.ptl_index,
1602 hdr->msg.put.ack_wmd.wh_interface_cookie,
1603 hdr->msg.put.ack_wmd.wh_object_cookie,
1604 hdr->msg.put.match_bits);
1605 CWARN(" Length %d, offset %d, hdr data %#llx\n",
1606 hdr->payload_length, hdr->msg.put.offset,
1607 hdr->msg.put.hdr_data);
1608 break;
1609
1610 case LNET_MSG_GET:
1611 CWARN(" Ptl index %d, return md %#llx.%#llx, match bits %llu\n",
1612 hdr->msg.get.ptl_index,
1613 hdr->msg.get.return_wmd.wh_interface_cookie,
1614 hdr->msg.get.return_wmd.wh_object_cookie,
1615 hdr->msg.get.match_bits);
1616 CWARN(" Length %d, src offset %d\n",
1617 hdr->msg.get.sink_length,
1618 hdr->msg.get.src_offset);
1619 break;
1620
1621 case LNET_MSG_ACK:
1622 CWARN(" dst md %#llx.%#llx, manipulated length %d\n",
1623 hdr->msg.ack.dst_wmd.wh_interface_cookie,
1624 hdr->msg.ack.dst_wmd.wh_object_cookie,
1625 hdr->msg.ack.mlength);
1626 break;
1627
1628 case LNET_MSG_REPLY:
1629 CWARN(" dst md %#llx.%#llx, length %d\n",
1630 hdr->msg.reply.dst_wmd.wh_interface_cookie,
1631 hdr->msg.reply.dst_wmd.wh_object_cookie,
1632 hdr->payload_length);
1633 }
1634}
1635
1636int
1637lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
1638 void *private, int rdma_req)
1639{
1640 int rc = 0;
1641 int cpt;
1642 int for_me;
1643 struct lnet_msg *msg;
1644 lnet_pid_t dest_pid;
1645 lnet_nid_t dest_nid;
1646 lnet_nid_t src_nid;
1647 __u32 payload_length;
1648 __u32 type;
1649
1650 LASSERT(!in_interrupt());
1651
1652 type = le32_to_cpu(hdr->type);
1653 src_nid = le64_to_cpu(hdr->src_nid);
1654 dest_nid = le64_to_cpu(hdr->dest_nid);
1655 dest_pid = le32_to_cpu(hdr->dest_pid);
1656 payload_length = le32_to_cpu(hdr->payload_length);
1657
1658 for_me = (ni->ni_nid == dest_nid);
1659 cpt = lnet_cpt_of_nid(from_nid);
1660
1661 switch (type) {
1662 case LNET_MSG_ACK:
1663 case LNET_MSG_GET:
1664 if (payload_length > 0) {
1665 CERROR("%s, src %s: bad %s payload %d (0 expected)\n",
1666 libcfs_nid2str(from_nid),
1667 libcfs_nid2str(src_nid),
1668 lnet_msgtyp2str(type), payload_length);
1669 return -EPROTO;
1670 }
1671 break;
1672
1673 case LNET_MSG_PUT:
1674 case LNET_MSG_REPLY:
1675 if (payload_length >
1676 (__u32)(for_me ? LNET_MAX_PAYLOAD : LNET_MTU)) {
1677 CERROR("%s, src %s: bad %s payload %d (%d max expected)\n",
1678 libcfs_nid2str(from_nid),
1679 libcfs_nid2str(src_nid),
1680 lnet_msgtyp2str(type),
1681 payload_length,
1682 for_me ? LNET_MAX_PAYLOAD : LNET_MTU);
1683 return -EPROTO;
1684 }
1685 break;
1686
1687 default:
1688 CERROR("%s, src %s: Bad message type 0x%x\n",
1689 libcfs_nid2str(from_nid),
1690 libcfs_nid2str(src_nid), type);
1691 return -EPROTO;
1692 }
1693
1694 if (the_lnet.ln_routing &&
1695 ni->ni_last_alive != ktime_get_real_seconds()) {
1696
1697 lnet_ni_lock(ni);
1698 ni->ni_last_alive = ktime_get_real_seconds();
1699 if (ni->ni_status &&
1700 ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
1701 ni->ni_status->ns_status = LNET_NI_STATUS_UP;
1702 lnet_ni_unlock(ni);
1703 }
1704
1705
1706
1707
1708
1709
1710 if (!for_me) {
1711 if (LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
1712
1713 CERROR("%s, src %s: Bad dest nid %s (should have been sent direct)\n",
1714 libcfs_nid2str(from_nid),
1715 libcfs_nid2str(src_nid),
1716 libcfs_nid2str(dest_nid));
1717 return -EPROTO;
1718 }
1719
1720 if (lnet_islocalnid(dest_nid)) {
1721
1722
1723
1724
1725 CERROR("%s, src %s: Bad dest nid %s (it's my nid but on a different network)\n",
1726 libcfs_nid2str(from_nid),
1727 libcfs_nid2str(src_nid),
1728 libcfs_nid2str(dest_nid));
1729 return -EPROTO;
1730 }
1731
1732 if (rdma_req && type == LNET_MSG_GET) {
1733 CERROR("%s, src %s: Bad optimized GET for %s (final destination must be me)\n",
1734 libcfs_nid2str(from_nid),
1735 libcfs_nid2str(src_nid),
1736 libcfs_nid2str(dest_nid));
1737 return -EPROTO;
1738 }
1739
1740 if (!the_lnet.ln_routing) {
1741 CERROR("%s, src %s: Dropping message for %s (routing not enabled)\n",
1742 libcfs_nid2str(from_nid),
1743 libcfs_nid2str(src_nid),
1744 libcfs_nid2str(dest_nid));
1745 goto drop;
1746 }
1747 }
1748
1749
1750
1751
1752
1753 if (!list_empty(&the_lnet.ln_test_peers) &&
1754 fail_peer(src_nid, 0)) {
1755 CERROR("%s, src %s: Dropping %s to simulate failure\n",
1756 libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1757 lnet_msgtyp2str(type));
1758 goto drop;
1759 }
1760
1761 if (!list_empty(&the_lnet.ln_drop_rules) &&
1762 lnet_drop_rule_match(hdr)) {
1763 CDEBUG(D_NET, "%s, src %s, dst %s: Dropping %s to simulate silent message loss\n",
1764 libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1765 libcfs_nid2str(dest_nid), lnet_msgtyp2str(type));
1766 goto drop;
1767 }
1768
1769 msg = lnet_msg_alloc();
1770 if (!msg) {
1771 CERROR("%s, src %s: Dropping %s (out of memory)\n",
1772 libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1773 lnet_msgtyp2str(type));
1774 goto drop;
1775 }
1776
1777
1778
1779
1780 msg->msg_type = type;
1781 msg->msg_private = private;
1782 msg->msg_receiving = 1;
1783 msg->msg_rdma_get = rdma_req;
1784 msg->msg_wanted = payload_length;
1785 msg->msg_len = payload_length;
1786 msg->msg_offset = 0;
1787 msg->msg_hdr = *hdr;
1788
1789 msg->msg_from = from_nid;
1790 if (!for_me) {
1791 msg->msg_target.pid = dest_pid;
1792 msg->msg_target.nid = dest_nid;
1793 msg->msg_routing = 1;
1794
1795 } else {
1796
1797 msg->msg_hdr.type = type;
1798 msg->msg_hdr.src_nid = src_nid;
1799 msg->msg_hdr.src_pid = le32_to_cpu(msg->msg_hdr.src_pid);
1800 msg->msg_hdr.dest_nid = dest_nid;
1801 msg->msg_hdr.dest_pid = dest_pid;
1802 msg->msg_hdr.payload_length = payload_length;
1803 }
1804
1805 lnet_net_lock(cpt);
1806 rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid, cpt);
1807 if (rc) {
1808 lnet_net_unlock(cpt);
1809 CERROR("%s, src %s: Dropping %s (error %d looking up sender)\n",
1810 libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1811 lnet_msgtyp2str(type), rc);
1812 lnet_msg_free(msg);
1813 if (rc == -ESHUTDOWN)
1814
1815 return 0;
1816 goto drop;
1817 }
1818
1819 if (lnet_isrouter(msg->msg_rxpeer)) {
1820 lnet_peer_set_alive(msg->msg_rxpeer);
1821 if (avoid_asym_router_failure &&
1822 LNET_NIDNET(src_nid) != LNET_NIDNET(from_nid)) {
1823
1824
1825
1826
1827 lnet_router_ni_update_locked(msg->msg_rxpeer,
1828 LNET_NIDNET(src_nid));
1829 }
1830 }
1831
1832 lnet_msg_commit(msg, cpt);
1833
1834
1835 if (unlikely(!list_empty(&the_lnet.ln_delay_rules) &&
1836 lnet_delay_rule_match_locked(hdr, msg))) {
1837 lnet_net_unlock(cpt);
1838 return 0;
1839 }
1840
1841 if (!for_me) {
1842 rc = lnet_parse_forward_locked(ni, msg);
1843 lnet_net_unlock(cpt);
1844
1845 if (rc < 0)
1846 goto free_drop;
1847
1848 if (rc == LNET_CREDIT_OK) {
1849 lnet_ni_recv(ni, msg->msg_private, msg, 0,
1850 0, payload_length, payload_length);
1851 }
1852 return 0;
1853 }
1854
1855 lnet_net_unlock(cpt);
1856
1857 rc = lnet_parse_local(ni, msg);
1858 if (rc)
1859 goto free_drop;
1860 return 0;
1861
1862 free_drop:
1863 LASSERT(!msg->msg_md);
1864 lnet_finalize(ni, msg, rc);
1865
1866 drop:
1867 lnet_drop_message(ni, cpt, private, payload_length);
1868 return 0;
1869}
1870EXPORT_SYMBOL(lnet_parse);
1871
1872void
1873lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
1874{
1875 while (!list_empty(head)) {
1876 lnet_process_id_t id = {0};
1877 lnet_msg_t *msg;
1878
1879 msg = list_entry(head->next, lnet_msg_t, msg_list);
1880 list_del(&msg->msg_list);
1881
1882 id.nid = msg->msg_hdr.src_nid;
1883 id.pid = msg->msg_hdr.src_pid;
1884
1885 LASSERT(!msg->msg_md);
1886 LASSERT(msg->msg_rx_delayed);
1887 LASSERT(msg->msg_rxpeer);
1888 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
1889
1890 CWARN("Dropping delayed PUT from %s portal %d match %llu offset %d length %d: %s\n",
1891 libcfs_id2str(id),
1892 msg->msg_hdr.msg.put.ptl_index,
1893 msg->msg_hdr.msg.put.match_bits,
1894 msg->msg_hdr.msg.put.offset,
1895 msg->msg_hdr.payload_length, reason);
1896
1897
1898
1899
1900
1901
1902 lnet_drop_message(msg->msg_rxpeer->lp_ni,
1903 msg->msg_rxpeer->lp_cpt,
1904 msg->msg_private, msg->msg_len);
1905
1906
1907
1908
1909
1910 lnet_finalize(msg->msg_rxpeer->lp_ni, msg, -ENOENT);
1911 }
1912}
1913
1914void
1915lnet_recv_delayed_msg_list(struct list_head *head)
1916{
1917 while (!list_empty(head)) {
1918 lnet_msg_t *msg;
1919 lnet_process_id_t id;
1920
1921 msg = list_entry(head->next, lnet_msg_t, msg_list);
1922 list_del(&msg->msg_list);
1923
1924
1925
1926
1927
1928 id.nid = msg->msg_hdr.src_nid;
1929 id.pid = msg->msg_hdr.src_pid;
1930
1931 LASSERT(msg->msg_rx_delayed);
1932 LASSERT(msg->msg_md);
1933 LASSERT(msg->msg_rxpeer);
1934 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
1935
1936 CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d match %llu offset %d length %d.\n",
1937 libcfs_id2str(id), msg->msg_hdr.msg.put.ptl_index,
1938 msg->msg_hdr.msg.put.match_bits,
1939 msg->msg_hdr.msg.put.offset,
1940 msg->msg_hdr.payload_length);
1941
1942 lnet_recv_put(msg->msg_rxpeer->lp_ni, msg);
1943 }
1944}
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990int
1991LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
1992 lnet_process_id_t target, unsigned int portal,
1993 __u64 match_bits, unsigned int offset,
1994 __u64 hdr_data)
1995{
1996 struct lnet_msg *msg;
1997 struct lnet_libmd *md;
1998 int cpt;
1999 int rc;
2000
2001 LASSERT(the_lnet.ln_refcount > 0);
2002
2003 if (!list_empty(&the_lnet.ln_test_peers) &&
2004 fail_peer(target.nid, 1)) {
2005 CERROR("Dropping PUT to %s: simulated failure\n",
2006 libcfs_id2str(target));
2007 return -EIO;
2008 }
2009
2010 msg = lnet_msg_alloc();
2011 if (!msg) {
2012 CERROR("Dropping PUT to %s: ENOMEM on lnet_msg_t\n",
2013 libcfs_id2str(target));
2014 return -ENOMEM;
2015 }
2016 msg->msg_vmflush = !!memory_pressure_get();
2017
2018 cpt = lnet_cpt_of_cookie(mdh.cookie);
2019 lnet_res_lock(cpt);
2020
2021 md = lnet_handle2md(&mdh);
2022 if (!md || !md->md_threshold || md->md_me) {
2023 CERROR("Dropping PUT (%llu:%d:%s): MD (%d) invalid\n",
2024 match_bits, portal, libcfs_id2str(target),
2025 !md ? -1 : md->md_threshold);
2026 if (md && md->md_me)
2027 CERROR("Source MD also attached to portal %d\n",
2028 md->md_me->me_portal);
2029 lnet_res_unlock(cpt);
2030
2031 lnet_msg_free(msg);
2032 return -ENOENT;
2033 }
2034
2035 CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
2036
2037 lnet_msg_attach_md(msg, md, 0, 0);
2038
2039 lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
2040
2041 msg->msg_hdr.msg.put.match_bits = cpu_to_le64(match_bits);
2042 msg->msg_hdr.msg.put.ptl_index = cpu_to_le32(portal);
2043 msg->msg_hdr.msg.put.offset = cpu_to_le32(offset);
2044 msg->msg_hdr.msg.put.hdr_data = hdr_data;
2045
2046
2047 if (ack == LNET_ACK_REQ) {
2048 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
2049 the_lnet.ln_interface_cookie;
2050 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
2051 md->md_lh.lh_cookie;
2052 } else {
2053 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
2054 LNET_WIRE_HANDLE_COOKIE_NONE;
2055 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
2056 LNET_WIRE_HANDLE_COOKIE_NONE;
2057 }
2058
2059 lnet_res_unlock(cpt);
2060
2061 lnet_build_msg_event(msg, LNET_EVENT_SEND);
2062
2063 rc = lnet_send(self, msg, LNET_NID_ANY);
2064 if (rc) {
2065 CNETERR("Error sending PUT to %s: %d\n",
2066 libcfs_id2str(target), rc);
2067 lnet_finalize(NULL, msg, rc);
2068 }
2069
2070
2071 return 0;
2072}
2073EXPORT_SYMBOL(LNetPut);
2074
2075lnet_msg_t *
2076lnet_create_reply_msg(lnet_ni_t *ni, lnet_msg_t *getmsg)
2077{
2078
2079
2080
2081
2082
2083
2084
2085
2086 struct lnet_msg *msg = lnet_msg_alloc();
2087 struct lnet_libmd *getmd = getmsg->msg_md;
2088 lnet_process_id_t peer_id = getmsg->msg_target;
2089 int cpt;
2090
2091 LASSERT(!getmsg->msg_target_is_router);
2092 LASSERT(!getmsg->msg_routing);
2093
2094 if (!msg) {
2095 CERROR("%s: Dropping REPLY from %s: can't allocate msg\n",
2096 libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id));
2097 goto drop;
2098 }
2099
2100 cpt = lnet_cpt_of_cookie(getmd->md_lh.lh_cookie);
2101 lnet_res_lock(cpt);
2102
2103 LASSERT(getmd->md_refcount > 0);
2104
2105 if (!getmd->md_threshold) {
2106 CERROR("%s: Dropping REPLY from %s for inactive MD %p\n",
2107 libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id),
2108 getmd);
2109 lnet_res_unlock(cpt);
2110 goto drop;
2111 }
2112
2113 LASSERT(!getmd->md_offset);
2114
2115 CDEBUG(D_NET, "%s: Reply from %s md %p\n",
2116 libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
2117
2118
2119 msg->msg_from = peer_id.nid;
2120 msg->msg_type = LNET_MSG_GET;
2121 msg->msg_hdr.src_nid = peer_id.nid;
2122 msg->msg_hdr.payload_length = getmd->md_length;
2123 msg->msg_receiving = 1;
2124
2125 lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
2126 lnet_res_unlock(cpt);
2127
2128 cpt = lnet_cpt_of_nid(peer_id.nid);
2129
2130 lnet_net_lock(cpt);
2131 lnet_msg_commit(msg, cpt);
2132 lnet_net_unlock(cpt);
2133
2134 lnet_build_msg_event(msg, LNET_EVENT_REPLY);
2135
2136 return msg;
2137
2138 drop:
2139 cpt = lnet_cpt_of_nid(peer_id.nid);
2140
2141 lnet_net_lock(cpt);
2142 the_lnet.ln_counters[cpt]->drop_count++;
2143 the_lnet.ln_counters[cpt]->drop_length += getmd->md_length;
2144 lnet_net_unlock(cpt);
2145
2146 if (msg)
2147 lnet_msg_free(msg);
2148
2149 return NULL;
2150}
2151EXPORT_SYMBOL(lnet_create_reply_msg);
2152
2153void
2154lnet_set_reply_msg_len(lnet_ni_t *ni, lnet_msg_t *reply, unsigned int len)
2155{
2156
2157
2158
2159
2160 LASSERT(reply);
2161 LASSERT(reply->msg_type == LNET_MSG_GET);
2162 LASSERT(reply->msg_ev.type == LNET_EVENT_REPLY);
2163
2164
2165
2166
2167
2168 LASSERT(len <= reply->msg_ev.mlength);
2169
2170 reply->msg_ev.mlength = len;
2171}
2172EXPORT_SYMBOL(lnet_set_reply_msg_len);
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195int
2196LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
2197 lnet_process_id_t target, unsigned int portal,
2198 __u64 match_bits, unsigned int offset)
2199{
2200 struct lnet_msg *msg;
2201 struct lnet_libmd *md;
2202 int cpt;
2203 int rc;
2204
2205 LASSERT(the_lnet.ln_refcount > 0);
2206
2207 if (!list_empty(&the_lnet.ln_test_peers) &&
2208 fail_peer(target.nid, 1)) {
2209 CERROR("Dropping GET to %s: simulated failure\n",
2210 libcfs_id2str(target));
2211 return -EIO;
2212 }
2213
2214 msg = lnet_msg_alloc();
2215 if (!msg) {
2216 CERROR("Dropping GET to %s: ENOMEM on lnet_msg_t\n",
2217 libcfs_id2str(target));
2218 return -ENOMEM;
2219 }
2220
2221 cpt = lnet_cpt_of_cookie(mdh.cookie);
2222 lnet_res_lock(cpt);
2223
2224 md = lnet_handle2md(&mdh);
2225 if (!md || !md->md_threshold || md->md_me) {
2226 CERROR("Dropping GET (%llu:%d:%s): MD (%d) invalid\n",
2227 match_bits, portal, libcfs_id2str(target),
2228 !md ? -1 : md->md_threshold);
2229 if (md && md->md_me)
2230 CERROR("REPLY MD also attached to portal %d\n",
2231 md->md_me->me_portal);
2232
2233 lnet_res_unlock(cpt);
2234
2235 lnet_msg_free(msg);
2236 return -ENOENT;
2237 }
2238
2239 CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
2240
2241 lnet_msg_attach_md(msg, md, 0, 0);
2242
2243 lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
2244
2245 msg->msg_hdr.msg.get.match_bits = cpu_to_le64(match_bits);
2246 msg->msg_hdr.msg.get.ptl_index = cpu_to_le32(portal);
2247 msg->msg_hdr.msg.get.src_offset = cpu_to_le32(offset);
2248 msg->msg_hdr.msg.get.sink_length = cpu_to_le32(md->md_length);
2249
2250
2251 msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie =
2252 the_lnet.ln_interface_cookie;
2253 msg->msg_hdr.msg.get.return_wmd.wh_object_cookie =
2254 md->md_lh.lh_cookie;
2255
2256 lnet_res_unlock(cpt);
2257
2258 lnet_build_msg_event(msg, LNET_EVENT_SEND);
2259
2260 rc = lnet_send(self, msg, LNET_NID_ANY);
2261 if (rc < 0) {
2262 CNETERR("Error sending GET to %s: %d\n",
2263 libcfs_id2str(target), rc);
2264 lnet_finalize(NULL, msg, rc);
2265 }
2266
2267
2268 return 0;
2269}
2270EXPORT_SYMBOL(LNetGet);
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286int
2287LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
2288{
2289 struct list_head *e;
2290 struct lnet_ni *ni;
2291 lnet_remotenet_t *rnet;
2292 __u32 dstnet = LNET_NIDNET(dstnid);
2293 int hops;
2294 int cpt;
2295 __u32 order = 2;
2296 struct list_head *rn_list;
2297
2298
2299
2300
2301
2302
2303
2304 LASSERT(the_lnet.ln_refcount > 0);
2305
2306 cpt = lnet_net_lock_current();
2307
2308 list_for_each(e, &the_lnet.ln_nis) {
2309 ni = list_entry(e, lnet_ni_t, ni_list);
2310
2311 if (ni->ni_nid == dstnid) {
2312 if (srcnidp)
2313 *srcnidp = dstnid;
2314 if (orderp) {
2315 if (LNET_NETTYP(LNET_NIDNET(dstnid)) == LOLND)
2316 *orderp = 0;
2317 else
2318 *orderp = 1;
2319 }
2320 lnet_net_unlock(cpt);
2321
2322 return local_nid_dist_zero ? 0 : 1;
2323 }
2324
2325 if (LNET_NIDNET(ni->ni_nid) == dstnet) {
2326
2327
2328
2329
2330
2331
2332 if (!net_eq(ni->ni_net_ns, current->nsproxy->net_ns))
2333 order += 0xffff0000;
2334
2335 if (srcnidp)
2336 *srcnidp = ni->ni_nid;
2337 if (orderp)
2338 *orderp = order;
2339 lnet_net_unlock(cpt);
2340 return 1;
2341 }
2342
2343 order++;
2344 }
2345
2346 rn_list = lnet_net2rnethash(dstnet);
2347 list_for_each(e, rn_list) {
2348 rnet = list_entry(e, lnet_remotenet_t, lrn_list);
2349
2350 if (rnet->lrn_net == dstnet) {
2351 lnet_route_t *route;
2352 lnet_route_t *shortest = NULL;
2353 __u32 shortest_hops = LNET_UNDEFINED_HOPS;
2354 __u32 route_hops;
2355
2356 LASSERT(!list_empty(&rnet->lrn_routes));
2357
2358 list_for_each_entry(route, &rnet->lrn_routes,
2359 lr_list) {
2360 route_hops = route->lr_hops;
2361 if (route_hops == LNET_UNDEFINED_HOPS)
2362 route_hops = 1;
2363 if (!shortest ||
2364 route_hops < shortest_hops) {
2365 shortest = route;
2366 shortest_hops = route_hops;
2367 }
2368 }
2369
2370 LASSERT(shortest);
2371 hops = shortest_hops;
2372 if (srcnidp)
2373 *srcnidp = shortest->lr_gateway->lp_ni->ni_nid;
2374 if (orderp)
2375 *orderp = order;
2376 lnet_net_unlock(cpt);
2377 return hops + 1;
2378 }
2379 order++;
2380 }
2381
2382 lnet_net_unlock(cpt);
2383 return -EHOSTUNREACH;
2384}
2385EXPORT_SYMBOL(LNetDist);
2386