1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37#define DEBUG_SUBSYSTEM S_LNET
38
39#include "../../include/linux/lnet/lib-lnet.h"
40#include <linux/nsproxy.h>
41#include <net/net_namespace.h>
42
43static int local_nid_dist_zero = 1;
44module_param(local_nid_dist_zero, int, 0444);
45MODULE_PARM_DESC(local_nid_dist_zero, "Reserved");
46
47int
48lnet_fail_nid(lnet_nid_t nid, unsigned int threshold)
49{
50 lnet_test_peer_t *tp;
51 lnet_test_peer_t *temp;
52 struct list_head *el;
53 struct list_head *next;
54 struct list_head cull;
55
56
57 if (threshold) {
58
59 LIBCFS_ALLOC(tp, sizeof(*tp));
60 if (!tp)
61 return -ENOMEM;
62
63 tp->tp_nid = nid;
64 tp->tp_threshold = threshold;
65
66 lnet_net_lock(0);
67 list_add_tail(&tp->tp_list, &the_lnet.ln_test_peers);
68 lnet_net_unlock(0);
69 return 0;
70 }
71
72
73 INIT_LIST_HEAD(&cull);
74
75 lnet_net_lock(0);
76
77 list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
78 tp = list_entry(el, lnet_test_peer_t, tp_list);
79
80 if (!tp->tp_threshold ||
81 nid == LNET_NID_ANY ||
82 tp->tp_nid == nid) {
83 list_del(&tp->tp_list);
84 list_add(&tp->tp_list, &cull);
85 }
86 }
87
88 lnet_net_unlock(0);
89
90 list_for_each_entry_safe(tp, temp, &cull, tp_list) {
91 list_del(&tp->tp_list);
92 LIBCFS_FREE(tp, sizeof(*tp));
93 }
94 return 0;
95}
96
97static int
98fail_peer(lnet_nid_t nid, int outgoing)
99{
100 lnet_test_peer_t *tp;
101 lnet_test_peer_t *temp;
102 struct list_head *el;
103 struct list_head *next;
104 struct list_head cull;
105 int fail = 0;
106
107 INIT_LIST_HEAD(&cull);
108
109
110 lnet_net_lock(0);
111
112 list_for_each_safe(el, next, &the_lnet.ln_test_peers) {
113 tp = list_entry(el, lnet_test_peer_t, tp_list);
114
115 if (!tp->tp_threshold) {
116
117 if (outgoing) {
118
119
120
121
122
123 list_del(&tp->tp_list);
124 list_add(&tp->tp_list, &cull);
125 }
126 continue;
127 }
128
129 if (tp->tp_nid == LNET_NID_ANY ||
130 nid == tp->tp_nid) {
131 fail = 1;
132
133 if (tp->tp_threshold != LNET_MD_THRESH_INF) {
134 tp->tp_threshold--;
135 if (outgoing &&
136 !tp->tp_threshold) {
137
138 list_del(&tp->tp_list);
139 list_add(&tp->tp_list, &cull);
140 }
141 }
142 break;
143 }
144 }
145
146 lnet_net_unlock(0);
147
148 list_for_each_entry_safe(tp, temp, &cull, tp_list) {
149 list_del(&tp->tp_list);
150
151 LIBCFS_FREE(tp, sizeof(*tp));
152 }
153
154 return fail;
155}
156
157unsigned int
158lnet_iov_nob(unsigned int niov, struct kvec *iov)
159{
160 unsigned int nob = 0;
161
162 LASSERT(!niov || iov);
163 while (niov-- > 0)
164 nob += (iov++)->iov_len;
165
166 return nob;
167}
168EXPORT_SYMBOL(lnet_iov_nob);
169
170void
171lnet_copy_iov2iter(struct iov_iter *to,
172 unsigned int nsiov, const struct kvec *siov,
173 unsigned int soffset, unsigned int nob)
174{
175
176 const char *s;
177 size_t left;
178
179 if (!nob)
180 return;
181
182
183 LASSERT(nsiov > 0);
184 while (soffset >= siov->iov_len) {
185 soffset -= siov->iov_len;
186 siov++;
187 nsiov--;
188 LASSERT(nsiov > 0);
189 }
190
191 s = (char *)siov->iov_base + soffset;
192 left = siov->iov_len - soffset;
193 do {
194 size_t n, copy = left;
195 LASSERT(nsiov > 0);
196
197 if (copy > nob)
198 copy = nob;
199 n = copy_to_iter(s, copy, to);
200 if (n != copy)
201 return;
202 nob -= n;
203
204 siov++;
205 s = (char *)siov->iov_base;
206 left = siov->iov_len;
207 nsiov--;
208 } while (nob > 0);
209}
210EXPORT_SYMBOL(lnet_copy_iov2iter);
211
212void
213lnet_copy_kiov2iter(struct iov_iter *to,
214 unsigned int nsiov, const lnet_kiov_t *siov,
215 unsigned int soffset, unsigned int nob)
216{
217 if (!nob)
218 return;
219
220 LASSERT(!in_interrupt());
221
222 LASSERT(nsiov > 0);
223 while (soffset >= siov->bv_len) {
224 soffset -= siov->bv_len;
225 siov++;
226 nsiov--;
227 LASSERT(nsiov > 0);
228 }
229
230 do {
231 size_t copy = siov->bv_len - soffset, n;
232
233 LASSERT(nsiov > 0);
234
235 if (copy > nob)
236 copy = nob;
237 n = copy_page_to_iter(siov->bv_page,
238 siov->bv_offset + soffset,
239 copy, to);
240 if (n != copy)
241 return;
242 nob -= n;
243 siov++;
244 nsiov--;
245 soffset = 0;
246 } while (nob > 0);
247}
248EXPORT_SYMBOL(lnet_copy_kiov2iter);
249
250int
251lnet_extract_iov(int dst_niov, struct kvec *dst,
252 int src_niov, const struct kvec *src,
253 unsigned int offset, unsigned int len)
254{
255
256
257
258
259
260 unsigned int frag_len;
261 unsigned int niov;
262
263 if (!len)
264 return 0;
265
266 LASSERT(src_niov > 0);
267 while (offset >= src->iov_len) {
268 offset -= src->iov_len;
269 src_niov--;
270 src++;
271 LASSERT(src_niov > 0);
272 }
273
274 niov = 1;
275 for (;;) {
276 LASSERT(src_niov > 0);
277 LASSERT((int)niov <= dst_niov);
278
279 frag_len = src->iov_len - offset;
280 dst->iov_base = ((char *)src->iov_base) + offset;
281
282 if (len <= frag_len) {
283 dst->iov_len = len;
284 return niov;
285 }
286
287 dst->iov_len = frag_len;
288
289 len -= frag_len;
290 dst++;
291 src++;
292 niov++;
293 src_niov--;
294 offset = 0;
295 }
296}
297EXPORT_SYMBOL(lnet_extract_iov);
298
299unsigned int
300lnet_kiov_nob(unsigned int niov, lnet_kiov_t *kiov)
301{
302 unsigned int nob = 0;
303
304 LASSERT(!niov || kiov);
305 while (niov-- > 0)
306 nob += (kiov++)->bv_len;
307
308 return nob;
309}
310EXPORT_SYMBOL(lnet_kiov_nob);
311
312int
313lnet_extract_kiov(int dst_niov, lnet_kiov_t *dst,
314 int src_niov, const lnet_kiov_t *src,
315 unsigned int offset, unsigned int len)
316{
317
318
319
320
321
322 unsigned int frag_len;
323 unsigned int niov;
324
325 if (!len)
326 return 0;
327
328 LASSERT(src_niov > 0);
329 while (offset >= src->bv_len) {
330 offset -= src->bv_len;
331 src_niov--;
332 src++;
333 LASSERT(src_niov > 0);
334 }
335
336 niov = 1;
337 for (;;) {
338 LASSERT(src_niov > 0);
339 LASSERT((int)niov <= dst_niov);
340
341 frag_len = src->bv_len - offset;
342 dst->bv_page = src->bv_page;
343 dst->bv_offset = src->bv_offset + offset;
344
345 if (len <= frag_len) {
346 dst->bv_len = len;
347 LASSERT(dst->bv_offset + dst->bv_len
348 <= PAGE_SIZE);
349 return niov;
350 }
351
352 dst->bv_len = frag_len;
353 LASSERT(dst->bv_offset + dst->bv_len <= PAGE_SIZE);
354
355 len -= frag_len;
356 dst++;
357 src++;
358 niov++;
359 src_niov--;
360 offset = 0;
361 }
362}
363EXPORT_SYMBOL(lnet_extract_kiov);
364
365void
366lnet_ni_recv(lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
367 unsigned int offset, unsigned int mlen, unsigned int rlen)
368{
369 unsigned int niov = 0;
370 struct kvec *iov = NULL;
371 lnet_kiov_t *kiov = NULL;
372 struct iov_iter to;
373 int rc;
374
375 LASSERT(!in_interrupt());
376 LASSERT(!mlen || msg);
377
378 if (msg) {
379 LASSERT(msg->msg_receiving);
380 LASSERT(!msg->msg_sending);
381 LASSERT(rlen == msg->msg_len);
382 LASSERT(mlen <= msg->msg_len);
383 LASSERT(msg->msg_offset == offset);
384 LASSERT(msg->msg_wanted == mlen);
385
386 msg->msg_receiving = 0;
387
388 if (mlen) {
389 niov = msg->msg_niov;
390 iov = msg->msg_iov;
391 kiov = msg->msg_kiov;
392
393 LASSERT(niov > 0);
394 LASSERT(!iov != !kiov);
395 }
396 }
397
398 if (iov) {
399 iov_iter_kvec(&to, ITER_KVEC | READ, iov, niov, mlen + offset);
400 iov_iter_advance(&to, offset);
401 } else {
402 iov_iter_bvec(&to, ITER_BVEC | READ, kiov, niov, mlen + offset);
403 iov_iter_advance(&to, offset);
404 }
405 rc = ni->ni_lnd->lnd_recv(ni, private, msg, delayed, &to, rlen);
406 if (rc < 0)
407 lnet_finalize(ni, msg, rc);
408}
409
410static void
411lnet_setpayloadbuffer(lnet_msg_t *msg)
412{
413 lnet_libmd_t *md = msg->msg_md;
414
415 LASSERT(msg->msg_len > 0);
416 LASSERT(!msg->msg_routing);
417 LASSERT(md);
418 LASSERT(!msg->msg_niov);
419 LASSERT(!msg->msg_iov);
420 LASSERT(!msg->msg_kiov);
421
422 msg->msg_niov = md->md_niov;
423 if (md->md_options & LNET_MD_KIOV)
424 msg->msg_kiov = md->md_iov.kiov;
425 else
426 msg->msg_iov = md->md_iov.iov;
427}
428
429void
430lnet_prep_send(lnet_msg_t *msg, int type, lnet_process_id_t target,
431 unsigned int offset, unsigned int len)
432{
433 msg->msg_type = type;
434 msg->msg_target = target;
435 msg->msg_len = len;
436 msg->msg_offset = offset;
437
438 if (len)
439 lnet_setpayloadbuffer(msg);
440
441 memset(&msg->msg_hdr, 0, sizeof(msg->msg_hdr));
442 msg->msg_hdr.type = cpu_to_le32(type);
443 msg->msg_hdr.dest_nid = cpu_to_le64(target.nid);
444 msg->msg_hdr.dest_pid = cpu_to_le32(target.pid);
445
446 msg->msg_hdr.src_pid = cpu_to_le32(the_lnet.ln_pid);
447 msg->msg_hdr.payload_length = cpu_to_le32(len);
448}
449
450static void
451lnet_ni_send(lnet_ni_t *ni, lnet_msg_t *msg)
452{
453 void *priv = msg->msg_private;
454 int rc;
455
456 LASSERT(!in_interrupt());
457 LASSERT(LNET_NETTYP(LNET_NIDNET(ni->ni_nid)) == LOLND ||
458 (msg->msg_txcredit && msg->msg_peertxcredit));
459
460 rc = ni->ni_lnd->lnd_send(ni, priv, msg);
461 if (rc < 0)
462 lnet_finalize(ni, msg, rc);
463}
464
465static int
466lnet_ni_eager_recv(lnet_ni_t *ni, lnet_msg_t *msg)
467{
468 int rc;
469
470 LASSERT(!msg->msg_sending);
471 LASSERT(msg->msg_receiving);
472 LASSERT(!msg->msg_rx_ready_delay);
473 LASSERT(ni->ni_lnd->lnd_eager_recv);
474
475 msg->msg_rx_ready_delay = 1;
476 rc = ni->ni_lnd->lnd_eager_recv(ni, msg->msg_private, msg,
477 &msg->msg_private);
478 if (rc) {
479 CERROR("recv from %s / send to %s aborted: eager_recv failed %d\n",
480 libcfs_nid2str(msg->msg_rxpeer->lp_nid),
481 libcfs_id2str(msg->msg_target), rc);
482 LASSERT(rc < 0);
483 }
484
485 return rc;
486}
487
488
489static void
490lnet_ni_query_locked(lnet_ni_t *ni, lnet_peer_t *lp)
491{
492 unsigned long last_alive = 0;
493
494 LASSERT(lnet_peer_aliveness_enabled(lp));
495 LASSERT(ni->ni_lnd->lnd_query);
496
497 lnet_net_unlock(lp->lp_cpt);
498 ni->ni_lnd->lnd_query(ni, lp->lp_nid, &last_alive);
499 lnet_net_lock(lp->lp_cpt);
500
501 lp->lp_last_query = cfs_time_current();
502
503 if (last_alive)
504 lp->lp_last_alive = last_alive;
505}
506
507
508static inline int
509lnet_peer_is_alive(lnet_peer_t *lp, unsigned long now)
510{
511 int alive;
512 unsigned long deadline;
513
514 LASSERT(lnet_peer_aliveness_enabled(lp));
515
516
517
518
519 if (!lp->lp_alive && lp->lp_alive_count > 0 &&
520 cfs_time_aftereq(lp->lp_timestamp, lp->lp_last_alive))
521 return 0;
522
523 deadline = cfs_time_add(lp->lp_last_alive,
524 cfs_time_seconds(lp->lp_ni->ni_peertimeout));
525 alive = cfs_time_after(deadline, now);
526
527
528
529
530
531 if (alive && !lp->lp_alive &&
532 !(lnet_isrouter(lp) && !lp->lp_alive_count))
533 lnet_notify_locked(lp, 0, 1, lp->lp_last_alive);
534
535 return alive;
536}
537
538
539
540
541
542static int
543lnet_peer_alive_locked(lnet_peer_t *lp)
544{
545 unsigned long now = cfs_time_current();
546
547 if (!lnet_peer_aliveness_enabled(lp))
548 return -ENODEV;
549
550 if (lnet_peer_is_alive(lp, now))
551 return 1;
552
553
554
555
556
557 if (lp->lp_last_query) {
558 static const int lnet_queryinterval = 1;
559
560 unsigned long next_query =
561 cfs_time_add(lp->lp_last_query,
562 cfs_time_seconds(lnet_queryinterval));
563
564 if (time_before(now, next_query)) {
565 if (lp->lp_alive)
566 CWARN("Unexpected aliveness of peer %s: %d < %d (%d/%d)\n",
567 libcfs_nid2str(lp->lp_nid),
568 (int)now, (int)next_query,
569 lnet_queryinterval,
570 lp->lp_ni->ni_peertimeout);
571 return 0;
572 }
573 }
574
575
576 lnet_ni_query_locked(lp->lp_ni, lp);
577
578 if (lnet_peer_is_alive(lp, now))
579 return 1;
580
581 lnet_notify_locked(lp, 0, 0, lp->lp_last_alive);
582 return 0;
583}
584
585
586
587
588
589
590
591
592
593
594
595
596static int
597lnet_post_send_locked(lnet_msg_t *msg, int do_send)
598{
599 lnet_peer_t *lp = msg->msg_txpeer;
600 lnet_ni_t *ni = lp->lp_ni;
601 int cpt = msg->msg_tx_cpt;
602 struct lnet_tx_queue *tq = ni->ni_tx_queues[cpt];
603
604
605 LASSERT(!do_send || msg->msg_tx_delayed);
606 LASSERT(!msg->msg_receiving);
607 LASSERT(msg->msg_tx_committed);
608
609
610 if (!(msg->msg_target.pid & LNET_PID_USERFLAG) &&
611 !lnet_peer_alive_locked(lp)) {
612 the_lnet.ln_counters[cpt]->drop_count++;
613 the_lnet.ln_counters[cpt]->drop_length += msg->msg_len;
614 lnet_net_unlock(cpt);
615
616 CNETERR("Dropping message for %s: peer not alive\n",
617 libcfs_id2str(msg->msg_target));
618 if (do_send)
619 lnet_finalize(ni, msg, -EHOSTUNREACH);
620
621 lnet_net_lock(cpt);
622 return -EHOSTUNREACH;
623 }
624
625 if (msg->msg_md &&
626 (msg->msg_md->md_flags & LNET_MD_FLAG_ABORTED)) {
627 lnet_net_unlock(cpt);
628
629 CNETERR("Aborting message for %s: LNetM[DE]Unlink() already called on the MD/ME.\n",
630 libcfs_id2str(msg->msg_target));
631 if (do_send)
632 lnet_finalize(ni, msg, -ECANCELED);
633
634 lnet_net_lock(cpt);
635 return -ECANCELED;
636 }
637
638 if (!msg->msg_peertxcredit) {
639 LASSERT((lp->lp_txcredits < 0) ==
640 !list_empty(&lp->lp_txq));
641
642 msg->msg_peertxcredit = 1;
643 lp->lp_txqnob += msg->msg_len + sizeof(lnet_hdr_t);
644 lp->lp_txcredits--;
645
646 if (lp->lp_txcredits < lp->lp_mintxcredits)
647 lp->lp_mintxcredits = lp->lp_txcredits;
648
649 if (lp->lp_txcredits < 0) {
650 msg->msg_tx_delayed = 1;
651 list_add_tail(&msg->msg_list, &lp->lp_txq);
652 return LNET_CREDIT_WAIT;
653 }
654 }
655
656 if (!msg->msg_txcredit) {
657 LASSERT((tq->tq_credits < 0) ==
658 !list_empty(&tq->tq_delayed));
659
660 msg->msg_txcredit = 1;
661 tq->tq_credits--;
662
663 if (tq->tq_credits < tq->tq_credits_min)
664 tq->tq_credits_min = tq->tq_credits;
665
666 if (tq->tq_credits < 0) {
667 msg->msg_tx_delayed = 1;
668 list_add_tail(&msg->msg_list, &tq->tq_delayed);
669 return LNET_CREDIT_WAIT;
670 }
671 }
672
673 if (do_send) {
674 lnet_net_unlock(cpt);
675 lnet_ni_send(ni, msg);
676 lnet_net_lock(cpt);
677 }
678 return LNET_CREDIT_OK;
679}
680
681static lnet_rtrbufpool_t *
682lnet_msg2bufpool(lnet_msg_t *msg)
683{
684 lnet_rtrbufpool_t *rbp;
685 int cpt;
686
687 LASSERT(msg->msg_rx_committed);
688
689 cpt = msg->msg_rx_cpt;
690 rbp = &the_lnet.ln_rtrpools[cpt][0];
691
692 LASSERT(msg->msg_len <= LNET_MTU);
693 while (msg->msg_len > (unsigned int)rbp->rbp_npages * PAGE_SIZE) {
694 rbp++;
695 LASSERT(rbp < &the_lnet.ln_rtrpools[cpt][LNET_NRBPOOLS]);
696 }
697
698 return rbp;
699}
700
701static int
702lnet_post_routed_recv_locked(lnet_msg_t *msg, int do_recv)
703{
704
705
706
707
708
709
710 lnet_peer_t *lp = msg->msg_rxpeer;
711 lnet_rtrbufpool_t *rbp;
712 lnet_rtrbuf_t *rb;
713
714 LASSERT(!msg->msg_iov);
715 LASSERT(!msg->msg_kiov);
716 LASSERT(!msg->msg_niov);
717 LASSERT(msg->msg_routing);
718 LASSERT(msg->msg_receiving);
719 LASSERT(!msg->msg_sending);
720
721
722 LASSERT(!do_recv || msg->msg_rx_delayed);
723
724 if (!msg->msg_peerrtrcredit) {
725 LASSERT((lp->lp_rtrcredits < 0) ==
726 !list_empty(&lp->lp_rtrq));
727
728 msg->msg_peerrtrcredit = 1;
729 lp->lp_rtrcredits--;
730 if (lp->lp_rtrcredits < lp->lp_minrtrcredits)
731 lp->lp_minrtrcredits = lp->lp_rtrcredits;
732
733 if (lp->lp_rtrcredits < 0) {
734
735 LASSERT(msg->msg_rx_ready_delay);
736 msg->msg_rx_delayed = 1;
737 list_add_tail(&msg->msg_list, &lp->lp_rtrq);
738 return LNET_CREDIT_WAIT;
739 }
740 }
741
742 rbp = lnet_msg2bufpool(msg);
743
744 if (!msg->msg_rtrcredit) {
745 msg->msg_rtrcredit = 1;
746 rbp->rbp_credits--;
747 if (rbp->rbp_credits < rbp->rbp_mincredits)
748 rbp->rbp_mincredits = rbp->rbp_credits;
749
750 if (rbp->rbp_credits < 0) {
751
752 LASSERT(msg->msg_rx_ready_delay);
753 msg->msg_rx_delayed = 1;
754 list_add_tail(&msg->msg_list, &rbp->rbp_msgs);
755 return LNET_CREDIT_WAIT;
756 }
757 }
758
759 LASSERT(!list_empty(&rbp->rbp_bufs));
760 rb = list_entry(rbp->rbp_bufs.next, lnet_rtrbuf_t, rb_list);
761 list_del(&rb->rb_list);
762
763 msg->msg_niov = rbp->rbp_npages;
764 msg->msg_kiov = &rb->rb_kiov[0];
765
766 if (do_recv) {
767 int cpt = msg->msg_rx_cpt;
768
769 lnet_net_unlock(cpt);
770 lnet_ni_recv(lp->lp_ni, msg->msg_private, msg, 1,
771 0, msg->msg_len, msg->msg_len);
772 lnet_net_lock(cpt);
773 }
774 return LNET_CREDIT_OK;
775}
776
777void
778lnet_return_tx_credits_locked(lnet_msg_t *msg)
779{
780 lnet_peer_t *txpeer = msg->msg_txpeer;
781 lnet_msg_t *msg2;
782
783 if (msg->msg_txcredit) {
784 struct lnet_ni *ni = txpeer->lp_ni;
785 struct lnet_tx_queue *tq = ni->ni_tx_queues[msg->msg_tx_cpt];
786
787
788 msg->msg_txcredit = 0;
789
790 LASSERT((tq->tq_credits < 0) ==
791 !list_empty(&tq->tq_delayed));
792
793 tq->tq_credits++;
794 if (tq->tq_credits <= 0) {
795 msg2 = list_entry(tq->tq_delayed.next,
796 lnet_msg_t, msg_list);
797 list_del(&msg2->msg_list);
798
799 LASSERT(msg2->msg_txpeer->lp_ni == ni);
800 LASSERT(msg2->msg_tx_delayed);
801
802 (void)lnet_post_send_locked(msg2, 1);
803 }
804 }
805
806 if (msg->msg_peertxcredit) {
807
808 msg->msg_peertxcredit = 0;
809
810 LASSERT((txpeer->lp_txcredits < 0) ==
811 !list_empty(&txpeer->lp_txq));
812
813 txpeer->lp_txqnob -= msg->msg_len + sizeof(lnet_hdr_t);
814 LASSERT(txpeer->lp_txqnob >= 0);
815
816 txpeer->lp_txcredits++;
817 if (txpeer->lp_txcredits <= 0) {
818 msg2 = list_entry(txpeer->lp_txq.next,
819 lnet_msg_t, msg_list);
820 list_del(&msg2->msg_list);
821
822 LASSERT(msg2->msg_txpeer == txpeer);
823 LASSERT(msg2->msg_tx_delayed);
824
825 (void)lnet_post_send_locked(msg2, 1);
826 }
827 }
828
829 if (txpeer) {
830 msg->msg_txpeer = NULL;
831 lnet_peer_decref_locked(txpeer);
832 }
833}
834
835void
836lnet_schedule_blocked_locked(lnet_rtrbufpool_t *rbp)
837{
838 lnet_msg_t *msg;
839
840 if (list_empty(&rbp->rbp_msgs))
841 return;
842 msg = list_entry(rbp->rbp_msgs.next,
843 lnet_msg_t, msg_list);
844 list_del(&msg->msg_list);
845
846 (void)lnet_post_routed_recv_locked(msg, 1);
847}
848
849void
850lnet_drop_routed_msgs_locked(struct list_head *list, int cpt)
851{
852 struct list_head drop;
853 lnet_msg_t *msg;
854 lnet_msg_t *tmp;
855
856 INIT_LIST_HEAD(&drop);
857
858 list_splice_init(list, &drop);
859
860 lnet_net_unlock(cpt);
861
862 list_for_each_entry_safe(msg, tmp, &drop, msg_list) {
863 lnet_ni_recv(msg->msg_rxpeer->lp_ni, msg->msg_private, NULL,
864 0, 0, 0, msg->msg_hdr.payload_length);
865 list_del_init(&msg->msg_list);
866 lnet_finalize(NULL, msg, -ECANCELED);
867 }
868
869 lnet_net_lock(cpt);
870}
871
872void
873lnet_return_rx_credits_locked(lnet_msg_t *msg)
874{
875 lnet_peer_t *rxpeer = msg->msg_rxpeer;
876 lnet_msg_t *msg2;
877
878 if (msg->msg_rtrcredit) {
879
880 lnet_rtrbuf_t *rb;
881 lnet_rtrbufpool_t *rbp;
882
883
884
885
886
887
888 LASSERT(msg->msg_kiov);
889
890 rb = list_entry(msg->msg_kiov, lnet_rtrbuf_t, rb_kiov[0]);
891 rbp = rb->rb_pool;
892
893 msg->msg_kiov = NULL;
894 msg->msg_rtrcredit = 0;
895
896 LASSERT(rbp == lnet_msg2bufpool(msg));
897
898 LASSERT((rbp->rbp_credits > 0) ==
899 !list_empty(&rbp->rbp_bufs));
900
901
902
903
904
905 if (!the_lnet.ln_routing) {
906 lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
907 goto routing_off;
908 }
909
910
911
912
913
914
915 if (unlikely(rbp->rbp_credits >= rbp->rbp_req_nbuffers)) {
916
917 lnet_destroy_rtrbuf(rb, rbp->rbp_npages);
918 rbp->rbp_nbuffers--;
919 } else {
920 list_add(&rb->rb_list, &rbp->rbp_bufs);
921 rbp->rbp_credits++;
922 if (rbp->rbp_credits <= 0)
923 lnet_schedule_blocked_locked(rbp);
924 }
925 }
926
927routing_off:
928 if (msg->msg_peerrtrcredit) {
929
930 msg->msg_peerrtrcredit = 0;
931
932 LASSERT((rxpeer->lp_rtrcredits < 0) ==
933 !list_empty(&rxpeer->lp_rtrq));
934
935 rxpeer->lp_rtrcredits++;
936
937
938
939
940 if (!the_lnet.ln_routing) {
941 lnet_drop_routed_msgs_locked(&rxpeer->lp_rtrq,
942 msg->msg_rx_cpt);
943 } else if (rxpeer->lp_rtrcredits <= 0) {
944 msg2 = list_entry(rxpeer->lp_rtrq.next,
945 lnet_msg_t, msg_list);
946 list_del(&msg2->msg_list);
947
948 (void)lnet_post_routed_recv_locked(msg2, 1);
949 }
950 }
951 if (rxpeer) {
952 msg->msg_rxpeer = NULL;
953 lnet_peer_decref_locked(rxpeer);
954 }
955}
956
957static int
958lnet_compare_routes(lnet_route_t *r1, lnet_route_t *r2)
959{
960 lnet_peer_t *p1 = r1->lr_gateway;
961 lnet_peer_t *p2 = r2->lr_gateway;
962 int r1_hops = (r1->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r1->lr_hops;
963 int r2_hops = (r2->lr_hops == LNET_UNDEFINED_HOPS) ? 1 : r2->lr_hops;
964
965 if (r1->lr_priority < r2->lr_priority)
966 return 1;
967
968 if (r1->lr_priority > r2->lr_priority)
969 return -ERANGE;
970
971 if (r1_hops < r2_hops)
972 return 1;
973
974 if (r1_hops > r2_hops)
975 return -ERANGE;
976
977 if (p1->lp_txqnob < p2->lp_txqnob)
978 return 1;
979
980 if (p1->lp_txqnob > p2->lp_txqnob)
981 return -ERANGE;
982
983 if (p1->lp_txcredits > p2->lp_txcredits)
984 return 1;
985
986 if (p1->lp_txcredits < p2->lp_txcredits)
987 return -ERANGE;
988
989 if (r1->lr_seq - r2->lr_seq <= 0)
990 return 1;
991
992 return -ERANGE;
993}
994
995static lnet_peer_t *
996lnet_find_route_locked(lnet_ni_t *ni, lnet_nid_t target, lnet_nid_t rtr_nid)
997{
998 lnet_remotenet_t *rnet;
999 lnet_route_t *route;
1000 lnet_route_t *best_route;
1001 lnet_route_t *last_route;
1002 struct lnet_peer *lp_best;
1003 struct lnet_peer *lp;
1004 int rc;
1005
1006
1007
1008
1009
1010 rnet = lnet_find_net_locked(LNET_NIDNET(target));
1011 if (!rnet)
1012 return NULL;
1013
1014 lp_best = NULL;
1015 best_route = NULL;
1016 last_route = NULL;
1017 list_for_each_entry(route, &rnet->lrn_routes, lr_list) {
1018 lp = route->lr_gateway;
1019
1020 if (!lnet_is_route_alive(route))
1021 continue;
1022
1023 if (ni && lp->lp_ni != ni)
1024 continue;
1025
1026 if (lp->lp_nid == rtr_nid)
1027 return lp;
1028
1029 if (!lp_best) {
1030 best_route = route;
1031 last_route = route;
1032 lp_best = lp;
1033 continue;
1034 }
1035
1036
1037 if (last_route->lr_seq - route->lr_seq < 0)
1038 last_route = route;
1039
1040 rc = lnet_compare_routes(route, best_route);
1041 if (rc < 0)
1042 continue;
1043
1044 best_route = route;
1045 lp_best = lp;
1046 }
1047
1048
1049
1050
1051
1052
1053 if (best_route)
1054 best_route->lr_seq = last_route->lr_seq + 1;
1055 return lp_best;
1056}
1057
1058int
1059lnet_send(lnet_nid_t src_nid, lnet_msg_t *msg, lnet_nid_t rtr_nid)
1060{
1061 lnet_nid_t dst_nid = msg->msg_target.nid;
1062 struct lnet_ni *src_ni;
1063 struct lnet_ni *local_ni;
1064 struct lnet_peer *lp;
1065 int cpt;
1066 int cpt2;
1067 int rc;
1068
1069
1070
1071
1072
1073
1074
1075 LASSERT(!msg->msg_txpeer);
1076 LASSERT(!msg->msg_sending);
1077 LASSERT(!msg->msg_target_is_router);
1078 LASSERT(!msg->msg_receiving);
1079
1080 msg->msg_sending = 1;
1081
1082 LASSERT(!msg->msg_tx_committed);
1083 cpt = lnet_cpt_of_nid(rtr_nid == LNET_NID_ANY ? dst_nid : rtr_nid);
1084 again:
1085 lnet_net_lock(cpt);
1086
1087 if (the_lnet.ln_shutdown) {
1088 lnet_net_unlock(cpt);
1089 return -ESHUTDOWN;
1090 }
1091
1092 if (src_nid == LNET_NID_ANY) {
1093 src_ni = NULL;
1094 } else {
1095 src_ni = lnet_nid2ni_locked(src_nid, cpt);
1096 if (!src_ni) {
1097 lnet_net_unlock(cpt);
1098 LCONSOLE_WARN("Can't send to %s: src %s is not a local nid\n",
1099 libcfs_nid2str(dst_nid),
1100 libcfs_nid2str(src_nid));
1101 return -EINVAL;
1102 }
1103 LASSERT(!msg->msg_routing);
1104 }
1105
1106
1107 local_ni = lnet_net2ni_locked(LNET_NIDNET(dst_nid), cpt);
1108
1109 if (local_ni) {
1110 if (!src_ni) {
1111 src_ni = local_ni;
1112 src_nid = src_ni->ni_nid;
1113 } else if (src_ni == local_ni) {
1114 lnet_ni_decref_locked(local_ni, cpt);
1115 } else {
1116 lnet_ni_decref_locked(local_ni, cpt);
1117 lnet_ni_decref_locked(src_ni, cpt);
1118 lnet_net_unlock(cpt);
1119 LCONSOLE_WARN("No route to %s via from %s\n",
1120 libcfs_nid2str(dst_nid),
1121 libcfs_nid2str(src_nid));
1122 return -EINVAL;
1123 }
1124
1125 LASSERT(src_nid != LNET_NID_ANY);
1126 lnet_msg_commit(msg, cpt);
1127
1128 if (!msg->msg_routing)
1129 msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1130
1131 if (src_ni == the_lnet.ln_loni) {
1132
1133 lnet_net_unlock(cpt);
1134 lnet_ni_send(src_ni, msg);
1135
1136 lnet_net_lock(cpt);
1137 lnet_ni_decref_locked(src_ni, cpt);
1138 lnet_net_unlock(cpt);
1139 return 0;
1140 }
1141
1142 rc = lnet_nid2peer_locked(&lp, dst_nid, cpt);
1143
1144 lnet_ni_decref_locked(src_ni, cpt);
1145 if (rc) {
1146 lnet_net_unlock(cpt);
1147 LCONSOLE_WARN("Error %d finding peer %s\n", rc,
1148 libcfs_nid2str(dst_nid));
1149
1150 return rc;
1151 }
1152 LASSERT(lp->lp_ni == src_ni);
1153 } else {
1154
1155 lp = lnet_find_route_locked(src_ni, dst_nid, rtr_nid);
1156 if (!lp) {
1157 if (src_ni)
1158 lnet_ni_decref_locked(src_ni, cpt);
1159 lnet_net_unlock(cpt);
1160
1161 LCONSOLE_WARN("No route to %s via %s (all routers down)\n",
1162 libcfs_id2str(msg->msg_target),
1163 libcfs_nid2str(src_nid));
1164 return -EHOSTUNREACH;
1165 }
1166
1167
1168
1169
1170
1171
1172
1173 if (rtr_nid != lp->lp_nid) {
1174 cpt2 = lnet_cpt_of_nid_locked(lp->lp_nid);
1175 if (cpt2 != cpt) {
1176 if (src_ni)
1177 lnet_ni_decref_locked(src_ni, cpt);
1178 lnet_net_unlock(cpt);
1179
1180 rtr_nid = lp->lp_nid;
1181 cpt = cpt2;
1182 goto again;
1183 }
1184 }
1185
1186 CDEBUG(D_NET, "Best route to %s via %s for %s %d\n",
1187 libcfs_nid2str(dst_nid), libcfs_nid2str(lp->lp_nid),
1188 lnet_msgtyp2str(msg->msg_type), msg->msg_len);
1189
1190 if (!src_ni) {
1191 src_ni = lp->lp_ni;
1192 src_nid = src_ni->ni_nid;
1193 } else {
1194 LASSERT(src_ni == lp->lp_ni);
1195 lnet_ni_decref_locked(src_ni, cpt);
1196 }
1197
1198 lnet_peer_addref_locked(lp);
1199
1200 LASSERT(src_nid != LNET_NID_ANY);
1201 lnet_msg_commit(msg, cpt);
1202
1203 if (!msg->msg_routing) {
1204
1205 msg->msg_hdr.src_nid = cpu_to_le64(src_nid);
1206 }
1207
1208 msg->msg_target_is_router = 1;
1209 msg->msg_target.nid = lp->lp_nid;
1210 msg->msg_target.pid = LNET_PID_LUSTRE;
1211 }
1212
1213
1214
1215 LASSERT(!msg->msg_peertxcredit);
1216 LASSERT(!msg->msg_txcredit);
1217 LASSERT(!msg->msg_txpeer);
1218
1219 msg->msg_txpeer = lp;
1220
1221 rc = lnet_post_send_locked(msg, 0);
1222 lnet_net_unlock(cpt);
1223
1224 if (rc < 0)
1225 return rc;
1226
1227 if (rc == LNET_CREDIT_OK)
1228 lnet_ni_send(src_ni, msg);
1229
1230 return 0;
1231}
1232
1233void
1234lnet_drop_message(lnet_ni_t *ni, int cpt, void *private, unsigned int nob)
1235{
1236 lnet_net_lock(cpt);
1237 the_lnet.ln_counters[cpt]->drop_count++;
1238 the_lnet.ln_counters[cpt]->drop_length += nob;
1239 lnet_net_unlock(cpt);
1240
1241 lnet_ni_recv(ni, private, NULL, 0, 0, 0, nob);
1242}
1243
1244static void
1245lnet_recv_put(lnet_ni_t *ni, lnet_msg_t *msg)
1246{
1247 lnet_hdr_t *hdr = &msg->msg_hdr;
1248
1249 if (msg->msg_wanted)
1250 lnet_setpayloadbuffer(msg);
1251
1252 lnet_build_msg_event(msg, LNET_EVENT_PUT);
1253
1254
1255
1256
1257
1258 msg->msg_ack = !lnet_is_wire_handle_none(&hdr->msg.put.ack_wmd) &&
1259 !(msg->msg_md->md_options & LNET_MD_ACK_DISABLE);
1260
1261 lnet_ni_recv(ni, msg->msg_private, msg, msg->msg_rx_delayed,
1262 msg->msg_offset, msg->msg_wanted, hdr->payload_length);
1263}
1264
1265static int
1266lnet_parse_put(lnet_ni_t *ni, lnet_msg_t *msg)
1267{
1268 lnet_hdr_t *hdr = &msg->msg_hdr;
1269 struct lnet_match_info info;
1270 bool ready_delay;
1271 int rc;
1272
1273
1274 hdr->msg.put.match_bits = le64_to_cpu(hdr->msg.put.match_bits);
1275 hdr->msg.put.ptl_index = le32_to_cpu(hdr->msg.put.ptl_index);
1276 hdr->msg.put.offset = le32_to_cpu(hdr->msg.put.offset);
1277
1278 info.mi_id.nid = hdr->src_nid;
1279 info.mi_id.pid = hdr->src_pid;
1280 info.mi_opc = LNET_MD_OP_PUT;
1281 info.mi_portal = hdr->msg.put.ptl_index;
1282 info.mi_rlength = hdr->payload_length;
1283 info.mi_roffset = hdr->msg.put.offset;
1284 info.mi_mbits = hdr->msg.put.match_bits;
1285
1286 msg->msg_rx_ready_delay = !ni->ni_lnd->lnd_eager_recv;
1287 ready_delay = msg->msg_rx_ready_delay;
1288
1289 again:
1290 rc = lnet_ptl_match_md(&info, msg);
1291 switch (rc) {
1292 default:
1293 LBUG();
1294
1295 case LNET_MATCHMD_OK:
1296 lnet_recv_put(ni, msg);
1297 return 0;
1298
1299 case LNET_MATCHMD_NONE:
1300
1301
1302
1303
1304 if (ready_delay)
1305 return 0;
1306
1307 rc = lnet_ni_eager_recv(ni, msg);
1308 if (!rc) {
1309 ready_delay = true;
1310 goto again;
1311 }
1312
1313
1314 case LNET_MATCHMD_DROP:
1315 CNETERR("Dropping PUT from %s portal %d match %llu offset %d length %d: %d\n",
1316 libcfs_id2str(info.mi_id), info.mi_portal,
1317 info.mi_mbits, info.mi_roffset, info.mi_rlength, rc);
1318
1319 return -ENOENT;
1320 }
1321}
1322
1323static int
1324lnet_parse_get(lnet_ni_t *ni, lnet_msg_t *msg, int rdma_get)
1325{
1326 struct lnet_match_info info;
1327 lnet_hdr_t *hdr = &msg->msg_hdr;
1328 lnet_handle_wire_t reply_wmd;
1329 int rc;
1330
1331
1332 hdr->msg.get.match_bits = le64_to_cpu(hdr->msg.get.match_bits);
1333 hdr->msg.get.ptl_index = le32_to_cpu(hdr->msg.get.ptl_index);
1334 hdr->msg.get.sink_length = le32_to_cpu(hdr->msg.get.sink_length);
1335 hdr->msg.get.src_offset = le32_to_cpu(hdr->msg.get.src_offset);
1336
1337 info.mi_id.nid = hdr->src_nid;
1338 info.mi_id.pid = hdr->src_pid;
1339 info.mi_opc = LNET_MD_OP_GET;
1340 info.mi_portal = hdr->msg.get.ptl_index;
1341 info.mi_rlength = hdr->msg.get.sink_length;
1342 info.mi_roffset = hdr->msg.get.src_offset;
1343 info.mi_mbits = hdr->msg.get.match_bits;
1344
1345 rc = lnet_ptl_match_md(&info, msg);
1346 if (rc == LNET_MATCHMD_DROP) {
1347 CNETERR("Dropping GET from %s portal %d match %llu offset %d length %d\n",
1348 libcfs_id2str(info.mi_id), info.mi_portal,
1349 info.mi_mbits, info.mi_roffset, info.mi_rlength);
1350 return -ENOENT;
1351 }
1352
1353 LASSERT(rc == LNET_MATCHMD_OK);
1354
1355 lnet_build_msg_event(msg, LNET_EVENT_GET);
1356
1357 reply_wmd = hdr->msg.get.return_wmd;
1358
1359 lnet_prep_send(msg, LNET_MSG_REPLY, info.mi_id,
1360 msg->msg_offset, msg->msg_wanted);
1361
1362 msg->msg_hdr.msg.reply.dst_wmd = reply_wmd;
1363
1364 if (rdma_get) {
1365
1366 lnet_ni_recv(ni, msg->msg_private, msg, 0,
1367 msg->msg_offset, msg->msg_len, msg->msg_len);
1368 return 0;
1369 }
1370
1371 lnet_ni_recv(ni, msg->msg_private, NULL, 0, 0, 0, 0);
1372 msg->msg_receiving = 0;
1373
1374 rc = lnet_send(ni->ni_nid, msg, LNET_NID_ANY);
1375 if (rc < 0) {
1376
1377 CERROR("%s: Unable to send REPLY for GET from %s: %d\n",
1378 libcfs_nid2str(ni->ni_nid),
1379 libcfs_id2str(info.mi_id), rc);
1380
1381 lnet_finalize(ni, msg, rc);
1382 }
1383
1384 return 0;
1385}
1386
1387static int
1388lnet_parse_reply(lnet_ni_t *ni, lnet_msg_t *msg)
1389{
1390 void *private = msg->msg_private;
1391 lnet_hdr_t *hdr = &msg->msg_hdr;
1392 lnet_process_id_t src = {0};
1393 lnet_libmd_t *md;
1394 int rlength;
1395 int mlength;
1396 int cpt;
1397
1398 cpt = lnet_cpt_of_cookie(hdr->msg.reply.dst_wmd.wh_object_cookie);
1399 lnet_res_lock(cpt);
1400
1401 src.nid = hdr->src_nid;
1402 src.pid = hdr->src_pid;
1403
1404
1405 md = lnet_wire_handle2md(&hdr->msg.reply.dst_wmd);
1406 if (!md || !md->md_threshold || md->md_me) {
1407 CNETERR("%s: Dropping REPLY from %s for %s MD %#llx.%#llx\n",
1408 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1409 !md ? "invalid" : "inactive",
1410 hdr->msg.reply.dst_wmd.wh_interface_cookie,
1411 hdr->msg.reply.dst_wmd.wh_object_cookie);
1412 if (md && md->md_me)
1413 CERROR("REPLY MD also attached to portal %d\n",
1414 md->md_me->me_portal);
1415
1416 lnet_res_unlock(cpt);
1417 return -ENOENT;
1418 }
1419
1420 LASSERT(!md->md_offset);
1421
1422 rlength = hdr->payload_length;
1423 mlength = min_t(uint, rlength, md->md_length);
1424
1425 if (mlength < rlength &&
1426 !(md->md_options & LNET_MD_TRUNCATE)) {
1427 CNETERR("%s: Dropping REPLY from %s length %d for MD %#llx would overflow (%d)\n",
1428 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1429 rlength, hdr->msg.reply.dst_wmd.wh_object_cookie,
1430 mlength);
1431 lnet_res_unlock(cpt);
1432 return -ENOENT;
1433 }
1434
1435 CDEBUG(D_NET, "%s: Reply from %s of length %d/%d into md %#llx\n",
1436 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1437 mlength, rlength, hdr->msg.reply.dst_wmd.wh_object_cookie);
1438
1439 lnet_msg_attach_md(msg, md, 0, mlength);
1440
1441 if (mlength)
1442 lnet_setpayloadbuffer(msg);
1443
1444 lnet_res_unlock(cpt);
1445
1446 lnet_build_msg_event(msg, LNET_EVENT_REPLY);
1447
1448 lnet_ni_recv(ni, private, msg, 0, 0, mlength, rlength);
1449 return 0;
1450}
1451
1452static int
1453lnet_parse_ack(lnet_ni_t *ni, lnet_msg_t *msg)
1454{
1455 lnet_hdr_t *hdr = &msg->msg_hdr;
1456 lnet_process_id_t src = {0};
1457 lnet_libmd_t *md;
1458 int cpt;
1459
1460 src.nid = hdr->src_nid;
1461 src.pid = hdr->src_pid;
1462
1463
1464 hdr->msg.ack.match_bits = le64_to_cpu(hdr->msg.ack.match_bits);
1465 hdr->msg.ack.mlength = le32_to_cpu(hdr->msg.ack.mlength);
1466
1467 cpt = lnet_cpt_of_cookie(hdr->msg.ack.dst_wmd.wh_object_cookie);
1468 lnet_res_lock(cpt);
1469
1470
1471 md = lnet_wire_handle2md(&hdr->msg.ack.dst_wmd);
1472 if (!md || !md->md_threshold || md->md_me) {
1473
1474 CDEBUG(D_NET,
1475 "%s: Dropping ACK from %s to %s MD %#llx.%#llx\n",
1476 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1477 !md ? "invalid" : "inactive",
1478 hdr->msg.ack.dst_wmd.wh_interface_cookie,
1479 hdr->msg.ack.dst_wmd.wh_object_cookie);
1480 if (md && md->md_me)
1481 CERROR("Source MD also attached to portal %d\n",
1482 md->md_me->me_portal);
1483
1484 lnet_res_unlock(cpt);
1485 return -ENOENT;
1486 }
1487
1488 CDEBUG(D_NET, "%s: ACK from %s into md %#llx\n",
1489 libcfs_nid2str(ni->ni_nid), libcfs_id2str(src),
1490 hdr->msg.ack.dst_wmd.wh_object_cookie);
1491
1492 lnet_msg_attach_md(msg, md, 0, 0);
1493
1494 lnet_res_unlock(cpt);
1495
1496 lnet_build_msg_event(msg, LNET_EVENT_ACK);
1497
1498 lnet_ni_recv(ni, msg->msg_private, msg, 0, 0, 0, msg->msg_len);
1499 return 0;
1500}
1501
1502
1503
1504
1505
1506
1507int
1508lnet_parse_forward_locked(lnet_ni_t *ni, lnet_msg_t *msg)
1509{
1510 int rc = 0;
1511
1512 if (!the_lnet.ln_routing)
1513 return -ECANCELED;
1514
1515 if (msg->msg_rxpeer->lp_rtrcredits <= 0 ||
1516 lnet_msg2bufpool(msg)->rbp_credits <= 0) {
1517 if (!ni->ni_lnd->lnd_eager_recv) {
1518 msg->msg_rx_ready_delay = 1;
1519 } else {
1520 lnet_net_unlock(msg->msg_rx_cpt);
1521 rc = lnet_ni_eager_recv(ni, msg);
1522 lnet_net_lock(msg->msg_rx_cpt);
1523 }
1524 }
1525
1526 if (!rc)
1527 rc = lnet_post_routed_recv_locked(msg, 0);
1528 return rc;
1529}
1530
1531int
1532lnet_parse_local(lnet_ni_t *ni, lnet_msg_t *msg)
1533{
1534 int rc;
1535
1536 switch (msg->msg_type) {
1537 case LNET_MSG_ACK:
1538 rc = lnet_parse_ack(ni, msg);
1539 break;
1540 case LNET_MSG_PUT:
1541 rc = lnet_parse_put(ni, msg);
1542 break;
1543 case LNET_MSG_GET:
1544 rc = lnet_parse_get(ni, msg, msg->msg_rdma_get);
1545 break;
1546 case LNET_MSG_REPLY:
1547 rc = lnet_parse_reply(ni, msg);
1548 break;
1549 default:
1550 LASSERT(0);
1551 return -EPROTO;
1552 }
1553
1554 LASSERT(!rc || rc == -ENOENT);
1555 return rc;
1556}
1557
1558char *
1559lnet_msgtyp2str(int type)
1560{
1561 switch (type) {
1562 case LNET_MSG_ACK:
1563 return "ACK";
1564 case LNET_MSG_PUT:
1565 return "PUT";
1566 case LNET_MSG_GET:
1567 return "GET";
1568 case LNET_MSG_REPLY:
1569 return "REPLY";
1570 case LNET_MSG_HELLO:
1571 return "HELLO";
1572 default:
1573 return "<UNKNOWN>";
1574 }
1575}
1576
1577void
1578lnet_print_hdr(lnet_hdr_t *hdr)
1579{
1580 lnet_process_id_t src = {0};
1581 lnet_process_id_t dst = {0};
1582 char *type_str = lnet_msgtyp2str(hdr->type);
1583
1584 src.nid = hdr->src_nid;
1585 src.pid = hdr->src_pid;
1586
1587 dst.nid = hdr->dest_nid;
1588 dst.pid = hdr->dest_pid;
1589
1590 CWARN("P3 Header at %p of type %s\n", hdr, type_str);
1591 CWARN(" From %s\n", libcfs_id2str(src));
1592 CWARN(" To %s\n", libcfs_id2str(dst));
1593
1594 switch (hdr->type) {
1595 default:
1596 break;
1597
1598 case LNET_MSG_PUT:
1599 CWARN(" Ptl index %d, ack md %#llx.%#llx, match bits %llu\n",
1600 hdr->msg.put.ptl_index,
1601 hdr->msg.put.ack_wmd.wh_interface_cookie,
1602 hdr->msg.put.ack_wmd.wh_object_cookie,
1603 hdr->msg.put.match_bits);
1604 CWARN(" Length %d, offset %d, hdr data %#llx\n",
1605 hdr->payload_length, hdr->msg.put.offset,
1606 hdr->msg.put.hdr_data);
1607 break;
1608
1609 case LNET_MSG_GET:
1610 CWARN(" Ptl index %d, return md %#llx.%#llx, match bits %llu\n",
1611 hdr->msg.get.ptl_index,
1612 hdr->msg.get.return_wmd.wh_interface_cookie,
1613 hdr->msg.get.return_wmd.wh_object_cookie,
1614 hdr->msg.get.match_bits);
1615 CWARN(" Length %d, src offset %d\n",
1616 hdr->msg.get.sink_length,
1617 hdr->msg.get.src_offset);
1618 break;
1619
1620 case LNET_MSG_ACK:
1621 CWARN(" dst md %#llx.%#llx, manipulated length %d\n",
1622 hdr->msg.ack.dst_wmd.wh_interface_cookie,
1623 hdr->msg.ack.dst_wmd.wh_object_cookie,
1624 hdr->msg.ack.mlength);
1625 break;
1626
1627 case LNET_MSG_REPLY:
1628 CWARN(" dst md %#llx.%#llx, length %d\n",
1629 hdr->msg.reply.dst_wmd.wh_interface_cookie,
1630 hdr->msg.reply.dst_wmd.wh_object_cookie,
1631 hdr->payload_length);
1632 }
1633}
1634
1635int
1636lnet_parse(lnet_ni_t *ni, lnet_hdr_t *hdr, lnet_nid_t from_nid,
1637 void *private, int rdma_req)
1638{
1639 int rc = 0;
1640 int cpt;
1641 int for_me;
1642 struct lnet_msg *msg;
1643 lnet_pid_t dest_pid;
1644 lnet_nid_t dest_nid;
1645 lnet_nid_t src_nid;
1646 __u32 payload_length;
1647 __u32 type;
1648
1649 LASSERT(!in_interrupt());
1650
1651 type = le32_to_cpu(hdr->type);
1652 src_nid = le64_to_cpu(hdr->src_nid);
1653 dest_nid = le64_to_cpu(hdr->dest_nid);
1654 dest_pid = le32_to_cpu(hdr->dest_pid);
1655 payload_length = le32_to_cpu(hdr->payload_length);
1656
1657 for_me = (ni->ni_nid == dest_nid);
1658 cpt = lnet_cpt_of_nid(from_nid);
1659
1660 switch (type) {
1661 case LNET_MSG_ACK:
1662 case LNET_MSG_GET:
1663 if (payload_length > 0) {
1664 CERROR("%s, src %s: bad %s payload %d (0 expected)\n",
1665 libcfs_nid2str(from_nid),
1666 libcfs_nid2str(src_nid),
1667 lnet_msgtyp2str(type), payload_length);
1668 return -EPROTO;
1669 }
1670 break;
1671
1672 case LNET_MSG_PUT:
1673 case LNET_MSG_REPLY:
1674 if (payload_length >
1675 (__u32)(for_me ? LNET_MAX_PAYLOAD : LNET_MTU)) {
1676 CERROR("%s, src %s: bad %s payload %d (%d max expected)\n",
1677 libcfs_nid2str(from_nid),
1678 libcfs_nid2str(src_nid),
1679 lnet_msgtyp2str(type),
1680 payload_length,
1681 for_me ? LNET_MAX_PAYLOAD : LNET_MTU);
1682 return -EPROTO;
1683 }
1684 break;
1685
1686 default:
1687 CERROR("%s, src %s: Bad message type 0x%x\n",
1688 libcfs_nid2str(from_nid),
1689 libcfs_nid2str(src_nid), type);
1690 return -EPROTO;
1691 }
1692
1693 if (the_lnet.ln_routing &&
1694 ni->ni_last_alive != ktime_get_real_seconds()) {
1695
1696 lnet_ni_lock(ni);
1697 ni->ni_last_alive = ktime_get_real_seconds();
1698 if (ni->ni_status &&
1699 ni->ni_status->ns_status == LNET_NI_STATUS_DOWN)
1700 ni->ni_status->ns_status = LNET_NI_STATUS_UP;
1701 lnet_ni_unlock(ni);
1702 }
1703
1704
1705
1706
1707
1708
1709 if (!for_me) {
1710 if (LNET_NIDNET(dest_nid) == LNET_NIDNET(ni->ni_nid)) {
1711
1712 CERROR("%s, src %s: Bad dest nid %s (should have been sent direct)\n",
1713 libcfs_nid2str(from_nid),
1714 libcfs_nid2str(src_nid),
1715 libcfs_nid2str(dest_nid));
1716 return -EPROTO;
1717 }
1718
1719 if (lnet_islocalnid(dest_nid)) {
1720
1721
1722
1723
1724 CERROR("%s, src %s: Bad dest nid %s (it's my nid but on a different network)\n",
1725 libcfs_nid2str(from_nid),
1726 libcfs_nid2str(src_nid),
1727 libcfs_nid2str(dest_nid));
1728 return -EPROTO;
1729 }
1730
1731 if (rdma_req && type == LNET_MSG_GET) {
1732 CERROR("%s, src %s: Bad optimized GET for %s (final destination must be me)\n",
1733 libcfs_nid2str(from_nid),
1734 libcfs_nid2str(src_nid),
1735 libcfs_nid2str(dest_nid));
1736 return -EPROTO;
1737 }
1738
1739 if (!the_lnet.ln_routing) {
1740 CERROR("%s, src %s: Dropping message for %s (routing not enabled)\n",
1741 libcfs_nid2str(from_nid),
1742 libcfs_nid2str(src_nid),
1743 libcfs_nid2str(dest_nid));
1744 goto drop;
1745 }
1746 }
1747
1748
1749
1750
1751
1752 if (!list_empty(&the_lnet.ln_test_peers) &&
1753 fail_peer(src_nid, 0)) {
1754 CERROR("%s, src %s: Dropping %s to simulate failure\n",
1755 libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1756 lnet_msgtyp2str(type));
1757 goto drop;
1758 }
1759
1760 if (!list_empty(&the_lnet.ln_drop_rules) &&
1761 lnet_drop_rule_match(hdr)) {
1762 CDEBUG(D_NET, "%s, src %s, dst %s: Dropping %s to simulate silent message loss\n",
1763 libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1764 libcfs_nid2str(dest_nid), lnet_msgtyp2str(type));
1765 goto drop;
1766 }
1767
1768 msg = lnet_msg_alloc();
1769 if (!msg) {
1770 CERROR("%s, src %s: Dropping %s (out of memory)\n",
1771 libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1772 lnet_msgtyp2str(type));
1773 goto drop;
1774 }
1775
1776
1777
1778
1779 msg->msg_type = type;
1780 msg->msg_private = private;
1781 msg->msg_receiving = 1;
1782 msg->msg_rdma_get = rdma_req;
1783 msg->msg_wanted = payload_length;
1784 msg->msg_len = payload_length;
1785 msg->msg_offset = 0;
1786 msg->msg_hdr = *hdr;
1787
1788 msg->msg_from = from_nid;
1789 if (!for_me) {
1790 msg->msg_target.pid = dest_pid;
1791 msg->msg_target.nid = dest_nid;
1792 msg->msg_routing = 1;
1793
1794 } else {
1795
1796 msg->msg_hdr.type = type;
1797 msg->msg_hdr.src_nid = src_nid;
1798 msg->msg_hdr.src_pid = le32_to_cpu(msg->msg_hdr.src_pid);
1799 msg->msg_hdr.dest_nid = dest_nid;
1800 msg->msg_hdr.dest_pid = dest_pid;
1801 msg->msg_hdr.payload_length = payload_length;
1802 }
1803
1804 lnet_net_lock(cpt);
1805 rc = lnet_nid2peer_locked(&msg->msg_rxpeer, from_nid, cpt);
1806 if (rc) {
1807 lnet_net_unlock(cpt);
1808 CERROR("%s, src %s: Dropping %s (error %d looking up sender)\n",
1809 libcfs_nid2str(from_nid), libcfs_nid2str(src_nid),
1810 lnet_msgtyp2str(type), rc);
1811 lnet_msg_free(msg);
1812 if (rc == -ESHUTDOWN)
1813
1814 return 0;
1815 goto drop;
1816 }
1817
1818 if (lnet_isrouter(msg->msg_rxpeer)) {
1819 lnet_peer_set_alive(msg->msg_rxpeer);
1820 if (avoid_asym_router_failure &&
1821 LNET_NIDNET(src_nid) != LNET_NIDNET(from_nid)) {
1822
1823
1824
1825
1826 lnet_router_ni_update_locked(msg->msg_rxpeer,
1827 LNET_NIDNET(src_nid));
1828 }
1829 }
1830
1831 lnet_msg_commit(msg, cpt);
1832
1833
1834 if (unlikely(!list_empty(&the_lnet.ln_delay_rules) &&
1835 lnet_delay_rule_match_locked(hdr, msg))) {
1836 lnet_net_unlock(cpt);
1837 return 0;
1838 }
1839
1840 if (!for_me) {
1841 rc = lnet_parse_forward_locked(ni, msg);
1842 lnet_net_unlock(cpt);
1843
1844 if (rc < 0)
1845 goto free_drop;
1846
1847 if (rc == LNET_CREDIT_OK) {
1848 lnet_ni_recv(ni, msg->msg_private, msg, 0,
1849 0, payload_length, payload_length);
1850 }
1851 return 0;
1852 }
1853
1854 lnet_net_unlock(cpt);
1855
1856 rc = lnet_parse_local(ni, msg);
1857 if (rc)
1858 goto free_drop;
1859 return 0;
1860
1861 free_drop:
1862 LASSERT(!msg->msg_md);
1863 lnet_finalize(ni, msg, rc);
1864
1865 drop:
1866 lnet_drop_message(ni, cpt, private, payload_length);
1867 return 0;
1868}
1869EXPORT_SYMBOL(lnet_parse);
1870
1871void
1872lnet_drop_delayed_msg_list(struct list_head *head, char *reason)
1873{
1874 while (!list_empty(head)) {
1875 lnet_process_id_t id = {0};
1876 lnet_msg_t *msg;
1877
1878 msg = list_entry(head->next, lnet_msg_t, msg_list);
1879 list_del(&msg->msg_list);
1880
1881 id.nid = msg->msg_hdr.src_nid;
1882 id.pid = msg->msg_hdr.src_pid;
1883
1884 LASSERT(!msg->msg_md);
1885 LASSERT(msg->msg_rx_delayed);
1886 LASSERT(msg->msg_rxpeer);
1887 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
1888
1889 CWARN("Dropping delayed PUT from %s portal %d match %llu offset %d length %d: %s\n",
1890 libcfs_id2str(id),
1891 msg->msg_hdr.msg.put.ptl_index,
1892 msg->msg_hdr.msg.put.match_bits,
1893 msg->msg_hdr.msg.put.offset,
1894 msg->msg_hdr.payload_length, reason);
1895
1896
1897
1898
1899
1900
1901 lnet_drop_message(msg->msg_rxpeer->lp_ni,
1902 msg->msg_rxpeer->lp_cpt,
1903 msg->msg_private, msg->msg_len);
1904
1905
1906
1907
1908
1909 lnet_finalize(msg->msg_rxpeer->lp_ni, msg, -ENOENT);
1910 }
1911}
1912
1913void
1914lnet_recv_delayed_msg_list(struct list_head *head)
1915{
1916 while (!list_empty(head)) {
1917 lnet_msg_t *msg;
1918 lnet_process_id_t id;
1919
1920 msg = list_entry(head->next, lnet_msg_t, msg_list);
1921 list_del(&msg->msg_list);
1922
1923
1924
1925
1926
1927 id.nid = msg->msg_hdr.src_nid;
1928 id.pid = msg->msg_hdr.src_pid;
1929
1930 LASSERT(msg->msg_rx_delayed);
1931 LASSERT(msg->msg_md);
1932 LASSERT(msg->msg_rxpeer);
1933 LASSERT(msg->msg_hdr.type == LNET_MSG_PUT);
1934
1935 CDEBUG(D_NET, "Resuming delayed PUT from %s portal %d match %llu offset %d length %d.\n",
1936 libcfs_id2str(id), msg->msg_hdr.msg.put.ptl_index,
1937 msg->msg_hdr.msg.put.match_bits,
1938 msg->msg_hdr.msg.put.offset,
1939 msg->msg_hdr.payload_length);
1940
1941 lnet_recv_put(msg->msg_rxpeer->lp_ni, msg);
1942 }
1943}
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989int
1990LNetPut(lnet_nid_t self, lnet_handle_md_t mdh, lnet_ack_req_t ack,
1991 lnet_process_id_t target, unsigned int portal,
1992 __u64 match_bits, unsigned int offset,
1993 __u64 hdr_data)
1994{
1995 struct lnet_msg *msg;
1996 struct lnet_libmd *md;
1997 int cpt;
1998 int rc;
1999
2000 LASSERT(the_lnet.ln_refcount > 0);
2001
2002 if (!list_empty(&the_lnet.ln_test_peers) &&
2003 fail_peer(target.nid, 1)) {
2004 CERROR("Dropping PUT to %s: simulated failure\n",
2005 libcfs_id2str(target));
2006 return -EIO;
2007 }
2008
2009 msg = lnet_msg_alloc();
2010 if (!msg) {
2011 CERROR("Dropping PUT to %s: ENOMEM on lnet_msg_t\n",
2012 libcfs_id2str(target));
2013 return -ENOMEM;
2014 }
2015 msg->msg_vmflush = !!memory_pressure_get();
2016
2017 cpt = lnet_cpt_of_cookie(mdh.cookie);
2018 lnet_res_lock(cpt);
2019
2020 md = lnet_handle2md(&mdh);
2021 if (!md || !md->md_threshold || md->md_me) {
2022 CERROR("Dropping PUT (%llu:%d:%s): MD (%d) invalid\n",
2023 match_bits, portal, libcfs_id2str(target),
2024 !md ? -1 : md->md_threshold);
2025 if (md && md->md_me)
2026 CERROR("Source MD also attached to portal %d\n",
2027 md->md_me->me_portal);
2028 lnet_res_unlock(cpt);
2029
2030 lnet_msg_free(msg);
2031 return -ENOENT;
2032 }
2033
2034 CDEBUG(D_NET, "LNetPut -> %s\n", libcfs_id2str(target));
2035
2036 lnet_msg_attach_md(msg, md, 0, 0);
2037
2038 lnet_prep_send(msg, LNET_MSG_PUT, target, 0, md->md_length);
2039
2040 msg->msg_hdr.msg.put.match_bits = cpu_to_le64(match_bits);
2041 msg->msg_hdr.msg.put.ptl_index = cpu_to_le32(portal);
2042 msg->msg_hdr.msg.put.offset = cpu_to_le32(offset);
2043 msg->msg_hdr.msg.put.hdr_data = hdr_data;
2044
2045
2046 if (ack == LNET_ACK_REQ) {
2047 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
2048 the_lnet.ln_interface_cookie;
2049 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
2050 md->md_lh.lh_cookie;
2051 } else {
2052 msg->msg_hdr.msg.put.ack_wmd.wh_interface_cookie =
2053 LNET_WIRE_HANDLE_COOKIE_NONE;
2054 msg->msg_hdr.msg.put.ack_wmd.wh_object_cookie =
2055 LNET_WIRE_HANDLE_COOKIE_NONE;
2056 }
2057
2058 lnet_res_unlock(cpt);
2059
2060 lnet_build_msg_event(msg, LNET_EVENT_SEND);
2061
2062 rc = lnet_send(self, msg, LNET_NID_ANY);
2063 if (rc) {
2064 CNETERR("Error sending PUT to %s: %d\n",
2065 libcfs_id2str(target), rc);
2066 lnet_finalize(NULL, msg, rc);
2067 }
2068
2069
2070 return 0;
2071}
2072EXPORT_SYMBOL(LNetPut);
2073
2074lnet_msg_t *
2075lnet_create_reply_msg(lnet_ni_t *ni, lnet_msg_t *getmsg)
2076{
2077
2078
2079
2080
2081
2082
2083
2084
2085 struct lnet_msg *msg = lnet_msg_alloc();
2086 struct lnet_libmd *getmd = getmsg->msg_md;
2087 lnet_process_id_t peer_id = getmsg->msg_target;
2088 int cpt;
2089
2090 LASSERT(!getmsg->msg_target_is_router);
2091 LASSERT(!getmsg->msg_routing);
2092
2093 if (!msg) {
2094 CERROR("%s: Dropping REPLY from %s: can't allocate msg\n",
2095 libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id));
2096 goto drop;
2097 }
2098
2099 cpt = lnet_cpt_of_cookie(getmd->md_lh.lh_cookie);
2100 lnet_res_lock(cpt);
2101
2102 LASSERT(getmd->md_refcount > 0);
2103
2104 if (!getmd->md_threshold) {
2105 CERROR("%s: Dropping REPLY from %s for inactive MD %p\n",
2106 libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id),
2107 getmd);
2108 lnet_res_unlock(cpt);
2109 goto drop;
2110 }
2111
2112 LASSERT(!getmd->md_offset);
2113
2114 CDEBUG(D_NET, "%s: Reply from %s md %p\n",
2115 libcfs_nid2str(ni->ni_nid), libcfs_id2str(peer_id), getmd);
2116
2117
2118 msg->msg_from = peer_id.nid;
2119 msg->msg_type = LNET_MSG_GET;
2120 msg->msg_hdr.src_nid = peer_id.nid;
2121 msg->msg_hdr.payload_length = getmd->md_length;
2122 msg->msg_receiving = 1;
2123
2124 lnet_msg_attach_md(msg, getmd, getmd->md_offset, getmd->md_length);
2125 lnet_res_unlock(cpt);
2126
2127 cpt = lnet_cpt_of_nid(peer_id.nid);
2128
2129 lnet_net_lock(cpt);
2130 lnet_msg_commit(msg, cpt);
2131 lnet_net_unlock(cpt);
2132
2133 lnet_build_msg_event(msg, LNET_EVENT_REPLY);
2134
2135 return msg;
2136
2137 drop:
2138 cpt = lnet_cpt_of_nid(peer_id.nid);
2139
2140 lnet_net_lock(cpt);
2141 the_lnet.ln_counters[cpt]->drop_count++;
2142 the_lnet.ln_counters[cpt]->drop_length += getmd->md_length;
2143 lnet_net_unlock(cpt);
2144
2145 if (msg)
2146 lnet_msg_free(msg);
2147
2148 return NULL;
2149}
2150EXPORT_SYMBOL(lnet_create_reply_msg);
2151
2152void
2153lnet_set_reply_msg_len(lnet_ni_t *ni, lnet_msg_t *reply, unsigned int len)
2154{
2155
2156
2157
2158
2159 LASSERT(reply);
2160 LASSERT(reply->msg_type == LNET_MSG_GET);
2161 LASSERT(reply->msg_ev.type == LNET_EVENT_REPLY);
2162
2163
2164
2165
2166
2167 LASSERT(len <= reply->msg_ev.mlength);
2168
2169 reply->msg_ev.mlength = len;
2170}
2171EXPORT_SYMBOL(lnet_set_reply_msg_len);
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194int
2195LNetGet(lnet_nid_t self, lnet_handle_md_t mdh,
2196 lnet_process_id_t target, unsigned int portal,
2197 __u64 match_bits, unsigned int offset)
2198{
2199 struct lnet_msg *msg;
2200 struct lnet_libmd *md;
2201 int cpt;
2202 int rc;
2203
2204 LASSERT(the_lnet.ln_refcount > 0);
2205
2206 if (!list_empty(&the_lnet.ln_test_peers) &&
2207 fail_peer(target.nid, 1)) {
2208 CERROR("Dropping GET to %s: simulated failure\n",
2209 libcfs_id2str(target));
2210 return -EIO;
2211 }
2212
2213 msg = lnet_msg_alloc();
2214 if (!msg) {
2215 CERROR("Dropping GET to %s: ENOMEM on lnet_msg_t\n",
2216 libcfs_id2str(target));
2217 return -ENOMEM;
2218 }
2219
2220 cpt = lnet_cpt_of_cookie(mdh.cookie);
2221 lnet_res_lock(cpt);
2222
2223 md = lnet_handle2md(&mdh);
2224 if (!md || !md->md_threshold || md->md_me) {
2225 CERROR("Dropping GET (%llu:%d:%s): MD (%d) invalid\n",
2226 match_bits, portal, libcfs_id2str(target),
2227 !md ? -1 : md->md_threshold);
2228 if (md && md->md_me)
2229 CERROR("REPLY MD also attached to portal %d\n",
2230 md->md_me->me_portal);
2231
2232 lnet_res_unlock(cpt);
2233
2234 lnet_msg_free(msg);
2235 return -ENOENT;
2236 }
2237
2238 CDEBUG(D_NET, "LNetGet -> %s\n", libcfs_id2str(target));
2239
2240 lnet_msg_attach_md(msg, md, 0, 0);
2241
2242 lnet_prep_send(msg, LNET_MSG_GET, target, 0, 0);
2243
2244 msg->msg_hdr.msg.get.match_bits = cpu_to_le64(match_bits);
2245 msg->msg_hdr.msg.get.ptl_index = cpu_to_le32(portal);
2246 msg->msg_hdr.msg.get.src_offset = cpu_to_le32(offset);
2247 msg->msg_hdr.msg.get.sink_length = cpu_to_le32(md->md_length);
2248
2249
2250 msg->msg_hdr.msg.get.return_wmd.wh_interface_cookie =
2251 the_lnet.ln_interface_cookie;
2252 msg->msg_hdr.msg.get.return_wmd.wh_object_cookie =
2253 md->md_lh.lh_cookie;
2254
2255 lnet_res_unlock(cpt);
2256
2257 lnet_build_msg_event(msg, LNET_EVENT_SEND);
2258
2259 rc = lnet_send(self, msg, LNET_NID_ANY);
2260 if (rc < 0) {
2261 CNETERR("Error sending GET to %s: %d\n",
2262 libcfs_id2str(target), rc);
2263 lnet_finalize(NULL, msg, rc);
2264 }
2265
2266
2267 return 0;
2268}
2269EXPORT_SYMBOL(LNetGet);
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285int
2286LNetDist(lnet_nid_t dstnid, lnet_nid_t *srcnidp, __u32 *orderp)
2287{
2288 struct list_head *e;
2289 struct lnet_ni *ni;
2290 lnet_remotenet_t *rnet;
2291 __u32 dstnet = LNET_NIDNET(dstnid);
2292 int hops;
2293 int cpt;
2294 __u32 order = 2;
2295 struct list_head *rn_list;
2296
2297
2298
2299
2300
2301
2302
2303 LASSERT(the_lnet.ln_refcount > 0);
2304
2305 cpt = lnet_net_lock_current();
2306
2307 list_for_each(e, &the_lnet.ln_nis) {
2308 ni = list_entry(e, lnet_ni_t, ni_list);
2309
2310 if (ni->ni_nid == dstnid) {
2311 if (srcnidp)
2312 *srcnidp = dstnid;
2313 if (orderp) {
2314 if (LNET_NETTYP(LNET_NIDNET(dstnid)) == LOLND)
2315 *orderp = 0;
2316 else
2317 *orderp = 1;
2318 }
2319 lnet_net_unlock(cpt);
2320
2321 return local_nid_dist_zero ? 0 : 1;
2322 }
2323
2324 if (LNET_NIDNET(ni->ni_nid) == dstnet) {
2325
2326
2327
2328
2329
2330
2331 if (!net_eq(ni->ni_net_ns, current->nsproxy->net_ns))
2332 order += 0xffff0000;
2333
2334 if (srcnidp)
2335 *srcnidp = ni->ni_nid;
2336 if (orderp)
2337 *orderp = order;
2338 lnet_net_unlock(cpt);
2339 return 1;
2340 }
2341
2342 order++;
2343 }
2344
2345 rn_list = lnet_net2rnethash(dstnet);
2346 list_for_each(e, rn_list) {
2347 rnet = list_entry(e, lnet_remotenet_t, lrn_list);
2348
2349 if (rnet->lrn_net == dstnet) {
2350 lnet_route_t *route;
2351 lnet_route_t *shortest = NULL;
2352 __u32 shortest_hops = LNET_UNDEFINED_HOPS;
2353 __u32 route_hops;
2354
2355 LASSERT(!list_empty(&rnet->lrn_routes));
2356
2357 list_for_each_entry(route, &rnet->lrn_routes,
2358 lr_list) {
2359 route_hops = route->lr_hops;
2360 if (route_hops == LNET_UNDEFINED_HOPS)
2361 route_hops = 1;
2362 if (!shortest ||
2363 route_hops < shortest_hops) {
2364 shortest = route;
2365 shortest_hops = route_hops;
2366 }
2367 }
2368
2369 LASSERT(shortest);
2370 hops = shortest_hops;
2371 if (srcnidp)
2372 *srcnidp = shortest->lr_gateway->lp_ni->ni_nid;
2373 if (orderp)
2374 *orderp = order;
2375 lnet_net_unlock(cpt);
2376 return hops + 1;
2377 }
2378 order++;
2379 }
2380
2381 lnet_net_unlock(cpt);
2382 return -EHOSTUNREACH;
2383}
2384EXPORT_SYMBOL(LNetDist);
2385