1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41#define DEBUG_SUBSYSTEM S_LNET
42
43#include "../../include/linux/lnet/lib-lnet.h"
44
45void
46lnet_build_unlink_event(lnet_libmd_t *md, lnet_event_t *ev)
47{
48 memset(ev, 0, sizeof(*ev));
49
50 ev->status = 0;
51 ev->unlinked = 1;
52 ev->type = LNET_EVENT_UNLINK;
53 lnet_md_deconstruct(md, &ev->md);
54 lnet_md2handle(&ev->md_handle, md);
55}
56
57
58
59
60void
61lnet_build_msg_event(lnet_msg_t *msg, lnet_event_kind_t ev_type)
62{
63 lnet_hdr_t *hdr = &msg->msg_hdr;
64 lnet_event_t *ev = &msg->msg_ev;
65
66 LASSERT(!msg->msg_routing);
67
68 ev->type = ev_type;
69
70 if (ev_type == LNET_EVENT_SEND) {
71
72 ev->target.nid = le64_to_cpu(hdr->dest_nid);
73 ev->target.pid = le32_to_cpu(hdr->dest_pid);
74 ev->initiator.nid = LNET_NID_ANY;
75 ev->initiator.pid = the_lnet.ln_pid;
76 ev->sender = LNET_NID_ANY;
77 } else {
78
79 ev->target.pid = hdr->dest_pid;
80 ev->target.nid = hdr->dest_nid;
81 ev->initiator.pid = hdr->src_pid;
82 ev->initiator.nid = hdr->src_nid;
83 ev->rlength = hdr->payload_length;
84 ev->sender = msg->msg_from;
85 ev->mlength = msg->msg_wanted;
86 ev->offset = msg->msg_offset;
87 }
88
89 switch (ev_type) {
90 default:
91 LBUG();
92
93 case LNET_EVENT_PUT:
94 ev->pt_index = hdr->msg.put.ptl_index;
95 ev->match_bits = hdr->msg.put.match_bits;
96 ev->hdr_data = hdr->msg.put.hdr_data;
97 return;
98
99 case LNET_EVENT_GET:
100 ev->pt_index = hdr->msg.get.ptl_index;
101 ev->match_bits = hdr->msg.get.match_bits;
102 ev->hdr_data = 0;
103 return;
104
105 case LNET_EVENT_ACK:
106 ev->match_bits = hdr->msg.ack.match_bits;
107 ev->mlength = hdr->msg.ack.mlength;
108 return;
109
110 case LNET_EVENT_REPLY:
111 return;
112
113 case LNET_EVENT_SEND:
114 if (msg->msg_type == LNET_MSG_PUT) {
115 ev->pt_index = le32_to_cpu(hdr->msg.put.ptl_index);
116 ev->match_bits = le64_to_cpu(hdr->msg.put.match_bits);
117 ev->offset = le32_to_cpu(hdr->msg.put.offset);
118 ev->mlength =
119 ev->rlength = le32_to_cpu(hdr->payload_length);
120 ev->hdr_data = le64_to_cpu(hdr->msg.put.hdr_data);
121
122 } else {
123 LASSERT(msg->msg_type == LNET_MSG_GET);
124 ev->pt_index = le32_to_cpu(hdr->msg.get.ptl_index);
125 ev->match_bits = le64_to_cpu(hdr->msg.get.match_bits);
126 ev->mlength =
127 ev->rlength = le32_to_cpu(hdr->msg.get.sink_length);
128 ev->offset = le32_to_cpu(hdr->msg.get.src_offset);
129 ev->hdr_data = 0;
130 }
131 return;
132 }
133}
134
135void
136lnet_msg_commit(lnet_msg_t *msg, int cpt)
137{
138 struct lnet_msg_container *container = the_lnet.ln_msg_containers[cpt];
139 lnet_counters_t *counters = the_lnet.ln_counters[cpt];
140
141
142 LASSERT(!msg->msg_tx_committed);
143
144 if (msg->msg_sending) {
145 LASSERT(!msg->msg_receiving);
146
147 msg->msg_tx_cpt = cpt;
148 msg->msg_tx_committed = 1;
149 if (msg->msg_rx_committed) {
150 LASSERT(msg->msg_onactivelist);
151 return;
152 }
153 } else {
154 LASSERT(!msg->msg_sending);
155 msg->msg_rx_cpt = cpt;
156 msg->msg_rx_committed = 1;
157 }
158
159 LASSERT(!msg->msg_onactivelist);
160 msg->msg_onactivelist = 1;
161 list_add(&msg->msg_activelist, &container->msc_active);
162
163 counters->msgs_alloc++;
164 if (counters->msgs_alloc > counters->msgs_max)
165 counters->msgs_max = counters->msgs_alloc;
166}
167
168static void
169lnet_msg_decommit_tx(lnet_msg_t *msg, int status)
170{
171 lnet_counters_t *counters;
172 lnet_event_t *ev = &msg->msg_ev;
173
174 LASSERT(msg->msg_tx_committed);
175 if (status)
176 goto out;
177
178 counters = the_lnet.ln_counters[msg->msg_tx_cpt];
179 switch (ev->type) {
180 default:
181 LASSERT(msg->msg_routing);
182 LASSERT(msg->msg_rx_committed);
183 LASSERT(!ev->type);
184
185 counters->route_length += msg->msg_len;
186 counters->route_count++;
187 goto out;
188
189 case LNET_EVENT_PUT:
190
191 LASSERT(!msg->msg_rx_committed);
192
193 LASSERT(msg->msg_type == LNET_MSG_ACK);
194 msg->msg_type = LNET_MSG_PUT;
195 break;
196
197 case LNET_EVENT_SEND:
198 LASSERT(!msg->msg_rx_committed);
199 if (msg->msg_type == LNET_MSG_PUT)
200 counters->send_length += msg->msg_len;
201 break;
202
203 case LNET_EVENT_GET:
204 LASSERT(msg->msg_rx_committed);
205
206
207
208
209 LASSERT(msg->msg_type == LNET_MSG_REPLY);
210 msg->msg_type = LNET_MSG_GET;
211 break;
212 }
213
214 counters->send_count++;
215 out:
216 lnet_return_tx_credits_locked(msg);
217 msg->msg_tx_committed = 0;
218}
219
220static void
221lnet_msg_decommit_rx(lnet_msg_t *msg, int status)
222{
223 lnet_counters_t *counters;
224 lnet_event_t *ev = &msg->msg_ev;
225
226 LASSERT(!msg->msg_tx_committed);
227 LASSERT(msg->msg_rx_committed);
228
229 if (status)
230 goto out;
231
232 counters = the_lnet.ln_counters[msg->msg_rx_cpt];
233 switch (ev->type) {
234 default:
235 LASSERT(!ev->type);
236 LASSERT(msg->msg_routing);
237 goto out;
238
239 case LNET_EVENT_ACK:
240 LASSERT(msg->msg_type == LNET_MSG_ACK);
241 break;
242
243 case LNET_EVENT_GET:
244
245
246
247
248
249
250 LASSERT(msg->msg_type == LNET_MSG_REPLY ||
251 msg->msg_type == LNET_MSG_GET);
252 counters->send_length += msg->msg_wanted;
253 break;
254
255 case LNET_EVENT_PUT:
256 LASSERT(msg->msg_type == LNET_MSG_PUT);
257 break;
258
259 case LNET_EVENT_REPLY:
260
261
262
263
264 LASSERT(msg->msg_type == LNET_MSG_GET ||
265 msg->msg_type == LNET_MSG_REPLY);
266 break;
267 }
268
269 counters->recv_count++;
270 if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
271 counters->recv_length += msg->msg_wanted;
272
273 out:
274 lnet_return_rx_credits_locked(msg);
275 msg->msg_rx_committed = 0;
276}
277
278void
279lnet_msg_decommit(lnet_msg_t *msg, int cpt, int status)
280{
281 int cpt2 = cpt;
282
283 LASSERT(msg->msg_tx_committed || msg->msg_rx_committed);
284 LASSERT(msg->msg_onactivelist);
285
286 if (msg->msg_tx_committed) {
287 LASSERT(cpt == msg->msg_tx_cpt);
288 lnet_msg_decommit_tx(msg, status);
289 }
290
291 if (msg->msg_rx_committed) {
292
293 if (cpt != msg->msg_rx_cpt) {
294 lnet_net_unlock(cpt);
295 cpt2 = msg->msg_rx_cpt;
296 lnet_net_lock(cpt2);
297 }
298 lnet_msg_decommit_rx(msg, status);
299 }
300
301 list_del(&msg->msg_activelist);
302 msg->msg_onactivelist = 0;
303
304 the_lnet.ln_counters[cpt2]->msgs_alloc--;
305
306 if (cpt2 != cpt) {
307 lnet_net_unlock(cpt2);
308 lnet_net_lock(cpt);
309 }
310}
311
312void
313lnet_msg_attach_md(lnet_msg_t *msg, lnet_libmd_t *md,
314 unsigned int offset, unsigned int mlen)
315{
316
317
318
319
320
321
322
323 LASSERT(!msg->msg_routing);
324
325 msg->msg_md = md;
326 if (msg->msg_receiving) {
327 msg->msg_offset = offset;
328 msg->msg_wanted = mlen;
329 }
330
331 md->md_refcount++;
332 if (md->md_threshold != LNET_MD_THRESH_INF) {
333 LASSERT(md->md_threshold > 0);
334 md->md_threshold--;
335 }
336
337
338 lnet_md2handle(&msg->msg_ev.md_handle, md);
339 lnet_md_deconstruct(md, &msg->msg_ev.md);
340}
341
342void
343lnet_msg_detach_md(lnet_msg_t *msg, int status)
344{
345 lnet_libmd_t *md = msg->msg_md;
346 int unlink;
347
348
349 md->md_refcount--;
350 LASSERT(md->md_refcount >= 0);
351
352 unlink = lnet_md_unlinkable(md);
353 if (md->md_eq) {
354 msg->msg_ev.status = status;
355 msg->msg_ev.unlinked = unlink;
356 lnet_eq_enqueue_event(md->md_eq, &msg->msg_ev);
357 }
358
359 if (unlink)
360 lnet_md_unlink(md);
361
362 msg->msg_md = NULL;
363}
364
365static int
366lnet_complete_msg_locked(lnet_msg_t *msg, int cpt)
367{
368 lnet_handle_wire_t ack_wmd;
369 int rc;
370 int status = msg->msg_ev.status;
371
372 LASSERT(msg->msg_onactivelist);
373
374 if (!status && msg->msg_ack) {
375
376
377 lnet_msg_decommit(msg, cpt, 0);
378
379 msg->msg_ack = 0;
380 lnet_net_unlock(cpt);
381
382 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
383 LASSERT(!msg->msg_routing);
384
385 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
386
387 lnet_prep_send(msg, LNET_MSG_ACK, msg->msg_ev.initiator, 0, 0);
388
389 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
390 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
391 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
392
393
394
395
396
397 rc = lnet_send(msg->msg_ev.target.nid, msg, LNET_NID_ANY);
398
399 lnet_net_lock(cpt);
400
401
402
403
404
405
406
407
408
409
410
411 return rc;
412
413 } else if (!status &&
414 (msg->msg_routing && !msg->msg_sending)) {
415
416 LASSERT(!msg->msg_receiving);
417 lnet_net_unlock(cpt);
418
419 rc = lnet_send(LNET_NID_ANY, msg, LNET_NID_ANY);
420
421 lnet_net_lock(cpt);
422
423
424
425
426
427
428
429
430
431
432
433
434
435 return rc;
436 }
437
438 lnet_msg_decommit(msg, cpt, status);
439 lnet_msg_free(msg);
440 return 0;
441}
442
443void
444lnet_finalize(lnet_ni_t *ni, lnet_msg_t *msg, int status)
445{
446 struct lnet_msg_container *container;
447 int my_slot;
448 int cpt;
449 int rc;
450 int i;
451
452 LASSERT(!in_interrupt());
453
454 if (!msg)
455 return;
456#if 0
457 CDEBUG(D_WARNING, "%s msg->%s Flags:%s%s%s%s%s%s%s%s%s%s%s txp %s rxp %s\n",
458 lnet_msgtyp2str(msg->msg_type), libcfs_id2str(msg->msg_target),
459 msg->msg_target_is_router ? "t" : "",
460 msg->msg_routing ? "X" : "",
461 msg->msg_ack ? "A" : "",
462 msg->msg_sending ? "S" : "",
463 msg->msg_receiving ? "R" : "",
464 msg->msg_delayed ? "d" : "",
465 msg->msg_txcredit ? "C" : "",
466 msg->msg_peertxcredit ? "c" : "",
467 msg->msg_rtrcredit ? "F" : "",
468 msg->msg_peerrtrcredit ? "f" : "",
469 msg->msg_onactivelist ? "!" : "",
470 !msg->msg_txpeer ? "<none>" : libcfs_nid2str(msg->msg_txpeer->lp_nid),
471 !msg->msg_rxpeer ? "<none>" : libcfs_nid2str(msg->msg_rxpeer->lp_nid));
472#endif
473 msg->msg_ev.status = status;
474
475 if (msg->msg_md) {
476 cpt = lnet_cpt_of_cookie(msg->msg_md->md_lh.lh_cookie);
477
478 lnet_res_lock(cpt);
479 lnet_msg_detach_md(msg, status);
480 lnet_res_unlock(cpt);
481 }
482
483 again:
484 rc = 0;
485 if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
486
487 LASSERT(!msg->msg_onactivelist);
488 lnet_msg_free(msg);
489 return;
490 }
491
492
493
494
495
496
497 cpt = msg->msg_tx_committed ? msg->msg_tx_cpt : msg->msg_rx_cpt;
498 lnet_net_lock(cpt);
499
500 container = the_lnet.ln_msg_containers[cpt];
501 list_add_tail(&msg->msg_list, &container->msc_finalizing);
502
503
504
505
506
507 my_slot = -1;
508 for (i = 0; i < container->msc_nfinalizers; i++) {
509 if (container->msc_finalizers[i] == current)
510 break;
511
512 if (my_slot < 0 && !container->msc_finalizers[i])
513 my_slot = i;
514 }
515
516 if (i < container->msc_nfinalizers || my_slot < 0) {
517 lnet_net_unlock(cpt);
518 return;
519 }
520
521 container->msc_finalizers[my_slot] = current;
522
523 while (!list_empty(&container->msc_finalizing)) {
524 msg = list_entry(container->msc_finalizing.next,
525 lnet_msg_t, msg_list);
526
527 list_del(&msg->msg_list);
528
529
530
531
532
533 rc = lnet_complete_msg_locked(msg, cpt);
534 if (rc)
535 break;
536 }
537
538 if (unlikely(!list_empty(&the_lnet.ln_delay_rules))) {
539 lnet_net_unlock(cpt);
540 lnet_delay_rule_check();
541 lnet_net_lock(cpt);
542 }
543
544 container->msc_finalizers[my_slot] = NULL;
545 lnet_net_unlock(cpt);
546
547 if (rc)
548 goto again;
549}
550EXPORT_SYMBOL(lnet_finalize);
551
552void
553lnet_msg_container_cleanup(struct lnet_msg_container *container)
554{
555 int count = 0;
556
557 if (!container->msc_init)
558 return;
559
560 while (!list_empty(&container->msc_active)) {
561 lnet_msg_t *msg = list_entry(container->msc_active.next,
562 lnet_msg_t, msg_activelist);
563
564 LASSERT(msg->msg_onactivelist);
565 msg->msg_onactivelist = 0;
566 list_del(&msg->msg_activelist);
567 lnet_msg_free(msg);
568 count++;
569 }
570
571 if (count > 0)
572 CERROR("%d active msg on exit\n", count);
573
574 if (container->msc_finalizers) {
575 LIBCFS_FREE(container->msc_finalizers,
576 container->msc_nfinalizers *
577 sizeof(*container->msc_finalizers));
578 container->msc_finalizers = NULL;
579 }
580 container->msc_init = 0;
581}
582
583int
584lnet_msg_container_setup(struct lnet_msg_container *container, int cpt)
585{
586 container->msc_init = 1;
587
588 INIT_LIST_HEAD(&container->msc_active);
589 INIT_LIST_HEAD(&container->msc_finalizing);
590
591
592 container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt);
593
594 LIBCFS_CPT_ALLOC(container->msc_finalizers, lnet_cpt_table(), cpt,
595 container->msc_nfinalizers *
596 sizeof(*container->msc_finalizers));
597
598 if (!container->msc_finalizers) {
599 CERROR("Failed to allocate message finalizers\n");
600 lnet_msg_container_cleanup(container);
601 return -ENOMEM;
602 }
603
604 return 0;
605}
606
607void
608lnet_msg_containers_destroy(void)
609{
610 struct lnet_msg_container *container;
611 int i;
612
613 if (!the_lnet.ln_msg_containers)
614 return;
615
616 cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers)
617 lnet_msg_container_cleanup(container);
618
619 cfs_percpt_free(the_lnet.ln_msg_containers);
620 the_lnet.ln_msg_containers = NULL;
621}
622
623int
624lnet_msg_containers_create(void)
625{
626 struct lnet_msg_container *container;
627 int rc;
628 int i;
629
630 the_lnet.ln_msg_containers = cfs_percpt_alloc(lnet_cpt_table(),
631 sizeof(*container));
632
633 if (!the_lnet.ln_msg_containers) {
634 CERROR("Failed to allocate cpu-partition data for network\n");
635 return -ENOMEM;
636 }
637
638 cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers) {
639 rc = lnet_msg_container_setup(container, i);
640 if (rc) {
641 lnet_msg_containers_destroy();
642 return rc;
643 }
644 }
645
646 return 0;
647}
648