1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133#define DLM_DEBUG_FENCE_TERMINATION 0
134
135#include <net/tcp.h>
136
137#include "dlm_internal.h"
138#include "lowcomms.h"
139#include "config.h"
140#include "memory.h"
141#include "lock.h"
142#include "util.h"
143#include "midcomms.h"
144
145
146#define DLM_SEQ_INIT 0
147
148#define DLM_SHUTDOWN_TIMEOUT msecs_to_jiffies(3 * 60 * 1000)
149#define DLM_VERSION_NOT_SET 0
150
151struct midcomms_node {
152 int nodeid;
153 uint32_t version;
154 uint32_t seq_send;
155 uint32_t seq_next;
156
157
158
159
160
161 struct list_head send_queue;
162 spinlock_t send_queue_lock;
163 atomic_t send_queue_cnt;
164#define DLM_NODE_FLAG_CLOSE 1
165#define DLM_NODE_FLAG_STOP_TX 2
166#define DLM_NODE_FLAG_STOP_RX 3
167#define DLM_NODE_ULP_DELIVERED 4
168 unsigned long flags;
169 wait_queue_head_t shutdown_wait;
170
171
172#define DLM_CLOSED 1
173#define DLM_ESTABLISHED 2
174#define DLM_FIN_WAIT1 3
175#define DLM_FIN_WAIT2 4
176#define DLM_CLOSE_WAIT 5
177#define DLM_LAST_ACK 6
178#define DLM_CLOSING 7
179 int state;
180 spinlock_t state_lock;
181
182
183
184
185
186 int users;
187
188
189 void *debugfs;
190
191 struct hlist_node hlist;
192 struct rcu_head rcu;
193};
194
195struct dlm_mhandle {
196 const struct dlm_header *inner_hd;
197 struct midcomms_node *node;
198 struct dlm_opts *opts;
199 struct dlm_msg *msg;
200 bool committed;
201 uint32_t seq;
202
203 void (*ack_rcv)(struct midcomms_node *node);
204
205
206 int idx;
207
208 struct list_head list;
209 struct rcu_head rcu;
210};
211
212static struct hlist_head node_hash[CONN_HASH_SIZE];
213static DEFINE_SPINLOCK(nodes_lock);
214DEFINE_STATIC_SRCU(nodes_srcu);
215
216
217
218
219
220
221
222static DEFINE_MUTEX(close_lock);
223
224struct kmem_cache *dlm_midcomms_cache_create(void)
225{
226 return kmem_cache_create("dlm_mhandle", sizeof(struct dlm_mhandle),
227 0, 0, NULL);
228}
229
230static inline const char *dlm_state_str(int state)
231{
232 switch (state) {
233 case DLM_CLOSED:
234 return "CLOSED";
235 case DLM_ESTABLISHED:
236 return "ESTABLISHED";
237 case DLM_FIN_WAIT1:
238 return "FIN_WAIT1";
239 case DLM_FIN_WAIT2:
240 return "FIN_WAIT2";
241 case DLM_CLOSE_WAIT:
242 return "CLOSE_WAIT";
243 case DLM_LAST_ACK:
244 return "LAST_ACK";
245 case DLM_CLOSING:
246 return "CLOSING";
247 default:
248 return "UNKNOWN";
249 }
250}
251
252const char *dlm_midcomms_state(struct midcomms_node *node)
253{
254 return dlm_state_str(node->state);
255}
256
257unsigned long dlm_midcomms_flags(struct midcomms_node *node)
258{
259 return node->flags;
260}
261
262int dlm_midcomms_send_queue_cnt(struct midcomms_node *node)
263{
264 return atomic_read(&node->send_queue_cnt);
265}
266
267uint32_t dlm_midcomms_version(struct midcomms_node *node)
268{
269 return node->version;
270}
271
272static struct midcomms_node *__find_node(int nodeid, int r)
273{
274 struct midcomms_node *node;
275
276 hlist_for_each_entry_rcu(node, &node_hash[r], hlist) {
277 if (node->nodeid == nodeid)
278 return node;
279 }
280
281 return NULL;
282}
283
284static void dlm_mhandle_release(struct rcu_head *rcu)
285{
286 struct dlm_mhandle *mh = container_of(rcu, struct dlm_mhandle, rcu);
287
288 dlm_lowcomms_put_msg(mh->msg);
289 dlm_free_mhandle(mh);
290}
291
292static void dlm_mhandle_delete(struct midcomms_node *node,
293 struct dlm_mhandle *mh)
294{
295 list_del_rcu(&mh->list);
296 atomic_dec(&node->send_queue_cnt);
297 call_rcu(&mh->rcu, dlm_mhandle_release);
298}
299
300static void dlm_send_queue_flush(struct midcomms_node *node)
301{
302 struct dlm_mhandle *mh;
303
304 pr_debug("flush midcomms send queue of node %d\n", node->nodeid);
305
306 rcu_read_lock();
307 spin_lock(&node->send_queue_lock);
308 list_for_each_entry_rcu(mh, &node->send_queue, list) {
309 dlm_mhandle_delete(node, mh);
310 }
311 spin_unlock(&node->send_queue_lock);
312 rcu_read_unlock();
313}
314
315static void midcomms_node_reset(struct midcomms_node *node)
316{
317 pr_debug("reset node %d\n", node->nodeid);
318
319 node->seq_next = DLM_SEQ_INIT;
320 node->seq_send = DLM_SEQ_INIT;
321 node->version = DLM_VERSION_NOT_SET;
322 node->flags = 0;
323
324 dlm_send_queue_flush(node);
325 node->state = DLM_CLOSED;
326 wake_up(&node->shutdown_wait);
327}
328
329static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc)
330{
331 struct midcomms_node *node, *tmp;
332 int r = nodeid_hash(nodeid);
333
334 node = __find_node(nodeid, r);
335 if (node || !alloc)
336 return node;
337
338 node = kmalloc(sizeof(*node), alloc);
339 if (!node)
340 return NULL;
341
342 node->nodeid = nodeid;
343 spin_lock_init(&node->state_lock);
344 spin_lock_init(&node->send_queue_lock);
345 atomic_set(&node->send_queue_cnt, 0);
346 INIT_LIST_HEAD(&node->send_queue);
347 init_waitqueue_head(&node->shutdown_wait);
348 node->users = 0;
349 midcomms_node_reset(node);
350
351 spin_lock(&nodes_lock);
352
353
354
355 tmp = __find_node(nodeid, r);
356 if (tmp) {
357 spin_unlock(&nodes_lock);
358 kfree(node);
359 return tmp;
360 }
361
362 hlist_add_head_rcu(&node->hlist, &node_hash[r]);
363 spin_unlock(&nodes_lock);
364
365 node->debugfs = dlm_create_debug_comms_file(nodeid, node);
366 return node;
367}
368
369static int dlm_send_ack(int nodeid, uint32_t seq)
370{
371 int mb_len = sizeof(struct dlm_header);
372 struct dlm_header *m_header;
373 struct dlm_msg *msg;
374 char *ppc;
375
376 msg = dlm_lowcomms_new_msg(nodeid, mb_len, GFP_NOFS, &ppc,
377 NULL, NULL);
378 if (!msg)
379 return -ENOMEM;
380
381 m_header = (struct dlm_header *)ppc;
382
383 m_header->h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
384 m_header->h_nodeid = dlm_our_nodeid();
385 m_header->h_length = mb_len;
386 m_header->h_cmd = DLM_ACK;
387 m_header->u.h_seq = seq;
388
389 header_out(m_header);
390 dlm_lowcomms_commit_msg(msg);
391 dlm_lowcomms_put_msg(msg);
392
393 return 0;
394}
395
396static int dlm_send_fin(struct midcomms_node *node,
397 void (*ack_rcv)(struct midcomms_node *node))
398{
399 int mb_len = sizeof(struct dlm_header);
400 struct dlm_header *m_header;
401 struct dlm_mhandle *mh;
402 char *ppc;
403
404 mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, GFP_NOFS, &ppc);
405 if (!mh)
406 return -ENOMEM;
407
408 mh->ack_rcv = ack_rcv;
409
410 m_header = (struct dlm_header *)ppc;
411
412 m_header->h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
413 m_header->h_nodeid = dlm_our_nodeid();
414 m_header->h_length = mb_len;
415 m_header->h_cmd = DLM_FIN;
416
417 header_out(m_header);
418
419 pr_debug("sending fin msg to node %d\n", node->nodeid);
420 dlm_midcomms_commit_mhandle(mh);
421 set_bit(DLM_NODE_FLAG_STOP_TX, &node->flags);
422
423 return 0;
424}
425
426static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
427{
428 struct dlm_mhandle *mh;
429
430 rcu_read_lock();
431 list_for_each_entry_rcu(mh, &node->send_queue, list) {
432 if (before(mh->seq, seq)) {
433 if (mh->ack_rcv)
434 mh->ack_rcv(node);
435 } else {
436
437 break;
438 }
439 }
440
441 spin_lock(&node->send_queue_lock);
442 list_for_each_entry_rcu(mh, &node->send_queue, list) {
443 if (before(mh->seq, seq)) {
444 dlm_mhandle_delete(node, mh);
445 } else {
446
447 break;
448 }
449 }
450 spin_unlock(&node->send_queue_lock);
451 rcu_read_unlock();
452}
453
454static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
455{
456 spin_lock(&node->state_lock);
457 pr_debug("receive passive fin ack from node %d with state %s\n",
458 node->nodeid, dlm_state_str(node->state));
459
460 switch (node->state) {
461 case DLM_LAST_ACK:
462
463 midcomms_node_reset(node);
464 break;
465 case DLM_CLOSED:
466
467 wake_up(&node->shutdown_wait);
468 break;
469 default:
470 spin_unlock(&node->state_lock);
471 log_print("%s: unexpected state: %d\n",
472 __func__, node->state);
473 WARN_ON(1);
474 return;
475 }
476 spin_unlock(&node->state_lock);
477}
478
479static void dlm_midcomms_receive_buffer(union dlm_packet *p,
480 struct midcomms_node *node,
481 uint32_t seq)
482{
483 if (seq == node->seq_next) {
484 node->seq_next++;
485
486 switch (p->header.h_cmd) {
487 case DLM_FIN:
488
489 dlm_send_ack(node->nodeid, node->seq_next);
490
491 spin_lock(&node->state_lock);
492 pr_debug("receive fin msg from node %d with state %s\n",
493 node->nodeid, dlm_state_str(node->state));
494
495 switch (node->state) {
496 case DLM_ESTABLISHED:
497 node->state = DLM_CLOSE_WAIT;
498 pr_debug("switch node %d to state %s\n",
499 node->nodeid, dlm_state_str(node->state));
500
501
502
503
504 if (node->users == 0) {
505 node->state = DLM_LAST_ACK;
506 pr_debug("switch node %d to state %s case 1\n",
507 node->nodeid, dlm_state_str(node->state));
508 spin_unlock(&node->state_lock);
509 goto send_fin;
510 }
511 break;
512 case DLM_FIN_WAIT1:
513 node->state = DLM_CLOSING;
514 pr_debug("switch node %d to state %s\n",
515 node->nodeid, dlm_state_str(node->state));
516 break;
517 case DLM_FIN_WAIT2:
518 midcomms_node_reset(node);
519 pr_debug("switch node %d to state %s\n",
520 node->nodeid, dlm_state_str(node->state));
521 wake_up(&node->shutdown_wait);
522 break;
523 case DLM_LAST_ACK:
524
525 break;
526 default:
527 spin_unlock(&node->state_lock);
528 log_print("%s: unexpected state: %d\n",
529 __func__, node->state);
530 WARN_ON(1);
531 return;
532 }
533 spin_unlock(&node->state_lock);
534
535 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
536 break;
537 default:
538 WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
539 dlm_receive_buffer(p, node->nodeid);
540 set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
541 break;
542 }
543 } else {
544
545
546
547 if (seq < node->seq_next)
548 dlm_send_ack(node->nodeid, node->seq_next);
549
550 log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d",
551 seq, node->seq_next, node->nodeid);
552 }
553
554 return;
555
556send_fin:
557 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
558 dlm_send_fin(node, dlm_pas_fin_ack_rcv);
559}
560
561static struct midcomms_node *
562dlm_midcomms_recv_node_lookup(int nodeid, const union dlm_packet *p,
563 uint16_t msglen, int (*cb)(struct midcomms_node *node))
564{
565 struct midcomms_node *node = NULL;
566 gfp_t allocation = 0;
567 int ret;
568
569 switch (p->header.h_cmd) {
570 case DLM_RCOM:
571 if (msglen < sizeof(struct dlm_rcom)) {
572 log_print("rcom msg too small: %u, will skip this message from node %d",
573 msglen, nodeid);
574 return NULL;
575 }
576
577 switch (le32_to_cpu(p->rcom.rc_type)) {
578 case DLM_RCOM_NAMES:
579 fallthrough;
580 case DLM_RCOM_NAMES_REPLY:
581 fallthrough;
582 case DLM_RCOM_STATUS:
583 fallthrough;
584 case DLM_RCOM_STATUS_REPLY:
585 node = nodeid2node(nodeid, 0);
586 if (node) {
587 spin_lock(&node->state_lock);
588 if (node->state != DLM_ESTABLISHED)
589 pr_debug("receive begin RCOM msg from node %d with state %s\n",
590 node->nodeid, dlm_state_str(node->state));
591
592 switch (node->state) {
593 case DLM_CLOSED:
594 node->state = DLM_ESTABLISHED;
595 pr_debug("switch node %d to state %s\n",
596 node->nodeid, dlm_state_str(node->state));
597 break;
598 case DLM_ESTABLISHED:
599 break;
600 default:
601
602
603
604
605 log_print("reset node %d because shutdown stuck",
606 node->nodeid);
607
608 midcomms_node_reset(node);
609 node->state = DLM_ESTABLISHED;
610 break;
611 }
612 spin_unlock(&node->state_lock);
613 }
614
615 allocation = GFP_NOFS;
616 break;
617 default:
618 break;
619 }
620
621 break;
622 default:
623 break;
624 }
625
626 node = nodeid2node(nodeid, allocation);
627 if (!node) {
628 switch (p->header.h_cmd) {
629 case DLM_OPTS:
630 if (msglen < sizeof(struct dlm_opts)) {
631 log_print("opts msg too small: %u, will skip this message from node %d",
632 msglen, nodeid);
633 return NULL;
634 }
635
636 log_print_ratelimited("received dlm opts message nextcmd %d from node %d in an invalid sequence",
637 p->opts.o_nextcmd, nodeid);
638 break;
639 default:
640 log_print_ratelimited("received dlm message cmd %d from node %d in an invalid sequence",
641 p->header.h_cmd, nodeid);
642 break;
643 }
644
645 return NULL;
646 }
647
648 ret = cb(node);
649 if (ret < 0)
650 return NULL;
651
652 return node;
653}
654
655static int dlm_midcomms_version_check_3_2(struct midcomms_node *node)
656{
657 switch (node->version) {
658 case DLM_VERSION_NOT_SET:
659 node->version = DLM_VERSION_3_2;
660 log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
661 node->nodeid);
662 break;
663 case DLM_VERSION_3_2:
664 break;
665 default:
666 log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
667 DLM_VERSION_3_2, node->nodeid, node->version);
668 return -1;
669 }
670
671 return 0;
672}
673
674static int dlm_opts_check_msglen(union dlm_packet *p, uint16_t msglen, int nodeid)
675{
676 int len = msglen;
677
678
679
680
681 if (len < sizeof(struct dlm_opts))
682 return -1;
683 len -= sizeof(struct dlm_opts);
684
685 if (len < le16_to_cpu(p->opts.o_optlen))
686 return -1;
687 len -= le16_to_cpu(p->opts.o_optlen);
688
689 switch (p->opts.o_nextcmd) {
690 case DLM_FIN:
691 if (len < sizeof(struct dlm_header)) {
692 log_print("fin too small: %d, will skip this message from node %d",
693 len, nodeid);
694 return -1;
695 }
696
697 break;
698 case DLM_MSG:
699 if (len < sizeof(struct dlm_message)) {
700 log_print("msg too small: %d, will skip this message from node %d",
701 msglen, nodeid);
702 return -1;
703 }
704
705 break;
706 case DLM_RCOM:
707 if (len < sizeof(struct dlm_rcom)) {
708 log_print("rcom msg too small: %d, will skip this message from node %d",
709 len, nodeid);
710 return -1;
711 }
712
713 break;
714 default:
715 log_print("unsupported o_nextcmd received: %u, will skip this message from node %d",
716 p->opts.o_nextcmd, nodeid);
717 return -1;
718 }
719
720 return 0;
721}
722
723static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid)
724{
725 uint16_t msglen = le16_to_cpu(p->header.h_length);
726 struct midcomms_node *node;
727 uint32_t seq;
728 int ret, idx;
729
730 idx = srcu_read_lock(&nodes_srcu);
731 node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
732 dlm_midcomms_version_check_3_2);
733 if (!node)
734 goto out;
735
736 switch (p->header.h_cmd) {
737 case DLM_RCOM:
738
739
740
741
742
743
744 switch (le32_to_cpu(p->rcom.rc_type)) {
745 case DLM_RCOM_NAMES:
746 fallthrough;
747 case DLM_RCOM_NAMES_REPLY:
748 fallthrough;
749 case DLM_RCOM_STATUS:
750 fallthrough;
751 case DLM_RCOM_STATUS_REPLY:
752 break;
753 default:
754 log_print("unsupported rcom type received: %u, will skip this message from node %d",
755 le32_to_cpu(p->rcom.rc_type), nodeid);
756 goto out;
757 }
758
759 WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
760 dlm_receive_buffer(p, nodeid);
761 break;
762 case DLM_OPTS:
763 seq = le32_to_cpu(p->header.u.h_seq);
764
765 ret = dlm_opts_check_msglen(p, msglen, nodeid);
766 if (ret < 0) {
767 log_print("opts msg too small: %u, will skip this message from node %d",
768 msglen, nodeid);
769 goto out;
770 }
771
772 p = (union dlm_packet *)((unsigned char *)p->opts.o_opts +
773 le16_to_cpu(p->opts.o_optlen));
774
775
776 msglen = le16_to_cpu(p->header.h_length);
777 switch (p->header.h_cmd) {
778 case DLM_RCOM:
779 if (msglen < sizeof(struct dlm_rcom)) {
780 log_print("inner rcom msg too small: %u, will skip this message from node %d",
781 msglen, nodeid);
782 goto out;
783 }
784
785 break;
786 case DLM_MSG:
787 if (msglen < sizeof(struct dlm_message)) {
788 log_print("inner msg too small: %u, will skip this message from node %d",
789 msglen, nodeid);
790 goto out;
791 }
792
793 break;
794 case DLM_FIN:
795 if (msglen < sizeof(struct dlm_header)) {
796 log_print("inner fin too small: %u, will skip this message from node %d",
797 msglen, nodeid);
798 goto out;
799 }
800
801 break;
802 default:
803 log_print("unsupported inner h_cmd received: %u, will skip this message from node %d",
804 msglen, nodeid);
805 goto out;
806 }
807
808 dlm_midcomms_receive_buffer(p, node, seq);
809 break;
810 case DLM_ACK:
811 seq = le32_to_cpu(p->header.u.h_seq);
812 dlm_receive_ack(node, seq);
813 break;
814 default:
815 log_print("unsupported h_cmd received: %u, will skip this message from node %d",
816 p->header.h_cmd, nodeid);
817 break;
818 }
819
820out:
821 srcu_read_unlock(&nodes_srcu, idx);
822}
823
824static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
825{
826 switch (node->version) {
827 case DLM_VERSION_NOT_SET:
828 node->version = DLM_VERSION_3_1;
829 log_print("version 0x%08x for node %d detected", DLM_VERSION_3_1,
830 node->nodeid);
831 break;
832 case DLM_VERSION_3_1:
833 break;
834 default:
835 log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
836 DLM_VERSION_3_1, node->nodeid, node->version);
837 return -1;
838 }
839
840 return 0;
841}
842
843static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid)
844{
845 uint16_t msglen = le16_to_cpu(p->header.h_length);
846 struct midcomms_node *node;
847 int idx;
848
849 idx = srcu_read_lock(&nodes_srcu);
850 node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
851 dlm_midcomms_version_check_3_1);
852 if (!node) {
853 srcu_read_unlock(&nodes_srcu, idx);
854 return;
855 }
856 srcu_read_unlock(&nodes_srcu, idx);
857
858 switch (p->header.h_cmd) {
859 case DLM_RCOM:
860
861 break;
862 case DLM_MSG:
863 if (msglen < sizeof(struct dlm_message)) {
864 log_print("msg too small: %u, will skip this message from node %d",
865 msglen, nodeid);
866 return;
867 }
868
869 break;
870 default:
871 log_print("unsupported h_cmd received: %u, will skip this message from node %d",
872 p->header.h_cmd, nodeid);
873 return;
874 }
875
876 dlm_receive_buffer(p, nodeid);
877}
878
879
880
881
882
883
884int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
885{
886 const unsigned char *ptr = buf;
887 const struct dlm_header *hd;
888 uint16_t msglen;
889 int ret = 0;
890
891 while (len >= sizeof(struct dlm_header)) {
892 hd = (struct dlm_header *)ptr;
893
894
895
896
897
898
899
900
901
902
903
904
905 msglen = le16_to_cpu(hd->h_length);
906 if (msglen > DLM_MAX_SOCKET_BUFSIZE ||
907 msglen < sizeof(struct dlm_header)) {
908 log_print("received invalid length header: %u from node %d, will abort message parsing",
909 msglen, nodeid);
910 return -EBADMSG;
911 }
912
913
914
915
916 if (msglen > len)
917 break;
918
919 switch (hd->h_version) {
920 case cpu_to_le32(DLM_VERSION_3_1):
921 dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid);
922 break;
923 case cpu_to_le32(DLM_VERSION_3_2):
924 dlm_midcomms_receive_buffer_3_2((union dlm_packet *)ptr, nodeid);
925 break;
926 default:
927 log_print("received invalid version header: %u from node %d, will skip this message",
928 le32_to_cpu(hd->h_version), nodeid);
929 break;
930 }
931
932 ret += msglen;
933 len -= msglen;
934 ptr += msglen;
935 }
936
937 return ret;
938}
939
940void dlm_midcomms_receive_done(int nodeid)
941{
942 struct midcomms_node *node;
943 int idx;
944
945 idx = srcu_read_lock(&nodes_srcu);
946 node = nodeid2node(nodeid, 0);
947 if (!node) {
948 srcu_read_unlock(&nodes_srcu, idx);
949 return;
950 }
951
952
953 switch (node->version) {
954 case DLM_VERSION_3_2:
955 break;
956 default:
957 srcu_read_unlock(&nodes_srcu, idx);
958 return;
959 }
960
961
962 if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
963 &node->flags)) {
964 srcu_read_unlock(&nodes_srcu, idx);
965 return;
966 }
967
968 spin_lock(&node->state_lock);
969
970 switch (node->state) {
971 case DLM_ESTABLISHED:
972 spin_unlock(&node->state_lock);
973 dlm_send_ack(node->nodeid, node->seq_next);
974 break;
975 default:
976 spin_unlock(&node->state_lock);
977
978 break;
979 }
980 srcu_read_unlock(&nodes_srcu, idx);
981}
982
983void dlm_midcomms_unack_msg_resend(int nodeid)
984{
985 struct midcomms_node *node;
986 struct dlm_mhandle *mh;
987 int idx, ret;
988
989 idx = srcu_read_lock(&nodes_srcu);
990 node = nodeid2node(nodeid, 0);
991 if (!node) {
992 srcu_read_unlock(&nodes_srcu, idx);
993 return;
994 }
995
996
997 switch (node->version) {
998 case DLM_VERSION_3_2:
999 break;
1000 default:
1001 srcu_read_unlock(&nodes_srcu, idx);
1002 return;
1003 }
1004
1005 rcu_read_lock();
1006 list_for_each_entry_rcu(mh, &node->send_queue, list) {
1007 if (!mh->committed)
1008 continue;
1009
1010 ret = dlm_lowcomms_resend_msg(mh->msg);
1011 if (!ret)
1012 log_print_ratelimited("retransmit dlm msg, seq %u, nodeid %d",
1013 mh->seq, node->nodeid);
1014 }
1015 rcu_read_unlock();
1016 srcu_read_unlock(&nodes_srcu, idx);
1017}
1018
1019static void dlm_fill_opts_header(struct dlm_opts *opts, uint16_t inner_len,
1020 uint32_t seq)
1021{
1022 opts->o_header.h_cmd = DLM_OPTS;
1023 opts->o_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
1024 opts->o_header.h_nodeid = dlm_our_nodeid();
1025 opts->o_header.h_length = DLM_MIDCOMMS_OPT_LEN + inner_len;
1026 opts->o_header.u.h_seq = seq;
1027 header_out(&opts->o_header);
1028}
1029
1030static void midcomms_new_msg_cb(void *data)
1031{
1032 struct dlm_mhandle *mh = data;
1033
1034 atomic_inc(&mh->node->send_queue_cnt);
1035
1036 spin_lock(&mh->node->send_queue_lock);
1037 list_add_tail_rcu(&mh->list, &mh->node->send_queue);
1038 spin_unlock(&mh->node->send_queue_lock);
1039
1040 mh->seq = mh->node->seq_send++;
1041}
1042
1043static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid,
1044 int len, gfp_t allocation, char **ppc)
1045{
1046 struct dlm_opts *opts;
1047 struct dlm_msg *msg;
1048
1049 msg = dlm_lowcomms_new_msg(nodeid, len + DLM_MIDCOMMS_OPT_LEN,
1050 allocation, ppc, midcomms_new_msg_cb, mh);
1051 if (!msg)
1052 return NULL;
1053
1054 opts = (struct dlm_opts *)*ppc;
1055 mh->opts = opts;
1056
1057
1058 dlm_fill_opts_header(opts, len, mh->seq);
1059
1060 *ppc += sizeof(*opts);
1061 mh->inner_hd = (const struct dlm_header *)*ppc;
1062 return msg;
1063}
1064
1065struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
1066 gfp_t allocation, char **ppc)
1067{
1068 struct midcomms_node *node;
1069 struct dlm_mhandle *mh;
1070 struct dlm_msg *msg;
1071 int idx;
1072
1073 idx = srcu_read_lock(&nodes_srcu);
1074 node = nodeid2node(nodeid, 0);
1075 if (!node) {
1076 WARN_ON_ONCE(1);
1077 goto err;
1078 }
1079
1080
1081 WARN_ON(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
1082
1083 mh = dlm_allocate_mhandle();
1084 if (!mh)
1085 goto err;
1086
1087 mh->committed = false;
1088 mh->ack_rcv = NULL;
1089 mh->idx = idx;
1090 mh->node = node;
1091
1092 switch (node->version) {
1093 case DLM_VERSION_3_1:
1094 msg = dlm_lowcomms_new_msg(nodeid, len, allocation, ppc,
1095 NULL, NULL);
1096 if (!msg) {
1097 dlm_free_mhandle(mh);
1098 goto err;
1099 }
1100
1101 break;
1102 case DLM_VERSION_3_2:
1103 msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation,
1104 ppc);
1105 if (!msg) {
1106 dlm_free_mhandle(mh);
1107 goto err;
1108 }
1109
1110 break;
1111 default:
1112 dlm_free_mhandle(mh);
1113 WARN_ON(1);
1114 goto err;
1115 }
1116
1117 mh->msg = msg;
1118
1119
1120
1121
1122
1123
1124 return mh;
1125
1126err:
1127 srcu_read_unlock(&nodes_srcu, idx);
1128 return NULL;
1129}
1130
1131static void dlm_midcomms_commit_msg_3_2(struct dlm_mhandle *mh)
1132{
1133
1134 mh->opts->o_nextcmd = mh->inner_hd->h_cmd;
1135 mh->committed = true;
1136 dlm_lowcomms_commit_msg(mh->msg);
1137}
1138
1139void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh)
1140{
1141 switch (mh->node->version) {
1142 case DLM_VERSION_3_1:
1143 srcu_read_unlock(&nodes_srcu, mh->idx);
1144
1145 dlm_lowcomms_commit_msg(mh->msg);
1146 dlm_lowcomms_put_msg(mh->msg);
1147
1148 dlm_free_mhandle(mh);
1149 break;
1150 case DLM_VERSION_3_2:
1151 dlm_midcomms_commit_msg_3_2(mh);
1152 srcu_read_unlock(&nodes_srcu, mh->idx);
1153 break;
1154 default:
1155 srcu_read_unlock(&nodes_srcu, mh->idx);
1156 WARN_ON(1);
1157 break;
1158 }
1159}
1160
1161int dlm_midcomms_start(void)
1162{
1163 int i;
1164
1165 for (i = 0; i < CONN_HASH_SIZE; i++)
1166 INIT_HLIST_HEAD(&node_hash[i]);
1167
1168 return dlm_lowcomms_start();
1169}
1170
1171static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
1172{
1173 spin_lock(&node->state_lock);
1174 pr_debug("receive active fin ack from node %d with state %s\n",
1175 node->nodeid, dlm_state_str(node->state));
1176
1177 switch (node->state) {
1178 case DLM_FIN_WAIT1:
1179 node->state = DLM_FIN_WAIT2;
1180 pr_debug("switch node %d to state %s\n",
1181 node->nodeid, dlm_state_str(node->state));
1182 break;
1183 case DLM_CLOSING:
1184 midcomms_node_reset(node);
1185 pr_debug("switch node %d to state %s\n",
1186 node->nodeid, dlm_state_str(node->state));
1187 wake_up(&node->shutdown_wait);
1188 break;
1189 case DLM_CLOSED:
1190
1191 wake_up(&node->shutdown_wait);
1192 break;
1193 default:
1194 spin_unlock(&node->state_lock);
1195 log_print("%s: unexpected state: %d\n",
1196 __func__, node->state);
1197 WARN_ON(1);
1198 return;
1199 }
1200 spin_unlock(&node->state_lock);
1201}
1202
1203void dlm_midcomms_add_member(int nodeid)
1204{
1205 struct midcomms_node *node;
1206 int idx;
1207
1208 if (nodeid == dlm_our_nodeid())
1209 return;
1210
1211 idx = srcu_read_lock(&nodes_srcu);
1212 node = nodeid2node(nodeid, GFP_NOFS);
1213 if (!node) {
1214 srcu_read_unlock(&nodes_srcu, idx);
1215 return;
1216 }
1217
1218 spin_lock(&node->state_lock);
1219 if (!node->users) {
1220 pr_debug("receive add member from node %d with state %s\n",
1221 node->nodeid, dlm_state_str(node->state));
1222 switch (node->state) {
1223 case DLM_ESTABLISHED:
1224 break;
1225 case DLM_CLOSED:
1226 node->state = DLM_ESTABLISHED;
1227 pr_debug("switch node %d to state %s\n",
1228 node->nodeid, dlm_state_str(node->state));
1229 break;
1230 default:
1231
1232
1233
1234
1235 log_print("reset node %d because shutdown stuck",
1236 node->nodeid);
1237
1238 midcomms_node_reset(node);
1239 node->state = DLM_ESTABLISHED;
1240 break;
1241 }
1242 }
1243
1244 node->users++;
1245 pr_debug("node %d users inc count %d\n", nodeid, node->users);
1246 spin_unlock(&node->state_lock);
1247
1248 srcu_read_unlock(&nodes_srcu, idx);
1249}
1250
1251void dlm_midcomms_remove_member(int nodeid)
1252{
1253 struct midcomms_node *node;
1254 int idx;
1255
1256 if (nodeid == dlm_our_nodeid())
1257 return;
1258
1259 idx = srcu_read_lock(&nodes_srcu);
1260 node = nodeid2node(nodeid, 0);
1261 if (!node) {
1262 srcu_read_unlock(&nodes_srcu, idx);
1263 return;
1264 }
1265
1266 spin_lock(&node->state_lock);
1267 node->users--;
1268 pr_debug("node %d users dec count %d\n", nodeid, node->users);
1269
1270
1271
1272
1273
1274 if (node->users == 0) {
1275 pr_debug("receive remove member from node %d with state %s\n",
1276 node->nodeid, dlm_state_str(node->state));
1277 switch (node->state) {
1278 case DLM_ESTABLISHED:
1279 break;
1280 case DLM_CLOSE_WAIT:
1281
1282 node->state = DLM_LAST_ACK;
1283 spin_unlock(&node->state_lock);
1284
1285 pr_debug("switch node %d to state %s case 2\n",
1286 node->nodeid, dlm_state_str(node->state));
1287 goto send_fin;
1288 case DLM_LAST_ACK:
1289
1290 break;
1291 case DLM_CLOSED:
1292
1293 break;
1294 default:
1295 log_print("%s: unexpected state: %d\n",
1296 __func__, node->state);
1297 break;
1298 }
1299 }
1300 spin_unlock(&node->state_lock);
1301
1302 srcu_read_unlock(&nodes_srcu, idx);
1303 return;
1304
1305send_fin:
1306 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
1307 dlm_send_fin(node, dlm_pas_fin_ack_rcv);
1308 srcu_read_unlock(&nodes_srcu, idx);
1309}
1310
1311static void midcomms_node_release(struct rcu_head *rcu)
1312{
1313 struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
1314
1315 WARN_ON(atomic_read(&node->send_queue_cnt));
1316 kfree(node);
1317}
1318
1319static void midcomms_shutdown(struct midcomms_node *node)
1320{
1321 int ret;
1322
1323
1324 switch (node->version) {
1325 case DLM_VERSION_3_2:
1326 break;
1327 default:
1328 return;
1329 }
1330
1331 spin_lock(&node->state_lock);
1332 pr_debug("receive active shutdown for node %d with state %s\n",
1333 node->nodeid, dlm_state_str(node->state));
1334 switch (node->state) {
1335 case DLM_ESTABLISHED:
1336 node->state = DLM_FIN_WAIT1;
1337 pr_debug("switch node %d to state %s case 2\n",
1338 node->nodeid, dlm_state_str(node->state));
1339 break;
1340 case DLM_CLOSED:
1341
1342 spin_unlock(&node->state_lock);
1343 return;
1344 default:
1345
1346
1347
1348 break;
1349 }
1350 spin_unlock(&node->state_lock);
1351
1352 if (node->state == DLM_FIN_WAIT1) {
1353 dlm_send_fin(node, dlm_act_fin_ack_rcv);
1354
1355 if (DLM_DEBUG_FENCE_TERMINATION)
1356 msleep(5000);
1357 }
1358
1359
1360 ret = wait_event_timeout(node->shutdown_wait,
1361 node->state == DLM_CLOSED ||
1362 test_bit(DLM_NODE_FLAG_CLOSE, &node->flags),
1363 DLM_SHUTDOWN_TIMEOUT);
1364 if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags)) {
1365 pr_debug("active shutdown timed out for node %d with state %s\n",
1366 node->nodeid, dlm_state_str(node->state));
1367 midcomms_node_reset(node);
1368 return;
1369 }
1370
1371 pr_debug("active shutdown done for node %d with state %s\n",
1372 node->nodeid, dlm_state_str(node->state));
1373}
1374
1375void dlm_midcomms_shutdown(void)
1376{
1377 struct midcomms_node *node;
1378 int i, idx;
1379
1380 mutex_lock(&close_lock);
1381 idx = srcu_read_lock(&nodes_srcu);
1382 for (i = 0; i < CONN_HASH_SIZE; i++) {
1383 hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
1384 midcomms_shutdown(node);
1385
1386 dlm_delete_debug_comms_file(node->debugfs);
1387
1388 spin_lock(&nodes_lock);
1389 hlist_del_rcu(&node->hlist);
1390 spin_unlock(&nodes_lock);
1391
1392 call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
1393 }
1394 }
1395 srcu_read_unlock(&nodes_srcu, idx);
1396 mutex_unlock(&close_lock);
1397
1398 dlm_lowcomms_shutdown();
1399}
1400
1401int dlm_midcomms_close(int nodeid)
1402{
1403 struct midcomms_node *node;
1404 int idx, ret;
1405
1406 if (nodeid == dlm_our_nodeid())
1407 return 0;
1408
1409 idx = srcu_read_lock(&nodes_srcu);
1410
1411 node = nodeid2node(nodeid, 0);
1412 if (node) {
1413
1414 set_bit(DLM_NODE_FLAG_CLOSE, &node->flags);
1415 wake_up(&node->shutdown_wait);
1416 }
1417 srcu_read_unlock(&nodes_srcu, idx);
1418
1419 synchronize_srcu(&nodes_srcu);
1420
1421 idx = srcu_read_lock(&nodes_srcu);
1422 mutex_lock(&close_lock);
1423 node = nodeid2node(nodeid, 0);
1424 if (!node) {
1425 mutex_unlock(&close_lock);
1426 srcu_read_unlock(&nodes_srcu, idx);
1427 return dlm_lowcomms_close(nodeid);
1428 }
1429
1430 ret = dlm_lowcomms_close(nodeid);
1431 spin_lock(&node->state_lock);
1432 midcomms_node_reset(node);
1433 spin_unlock(&node->state_lock);
1434 srcu_read_unlock(&nodes_srcu, idx);
1435 mutex_unlock(&close_lock);
1436
1437 return ret;
1438}
1439
1440
1441struct dlm_rawmsg_data {
1442 struct midcomms_node *node;
1443 void *buf;
1444};
1445
1446static void midcomms_new_rawmsg_cb(void *data)
1447{
1448 struct dlm_rawmsg_data *rd = data;
1449 struct dlm_header *h = rd->buf;
1450
1451 switch (h->h_version) {
1452 case cpu_to_le32(DLM_VERSION_3_1):
1453 break;
1454 default:
1455 switch (h->h_cmd) {
1456 case DLM_OPTS:
1457 if (!h->u.h_seq)
1458 h->u.h_seq = rd->node->seq_send++;
1459 break;
1460 default:
1461 break;
1462 }
1463 break;
1464 }
1465}
1466
1467int dlm_midcomms_rawmsg_send(struct midcomms_node *node, void *buf,
1468 int buflen)
1469{
1470 struct dlm_rawmsg_data rd;
1471 struct dlm_msg *msg;
1472 char *msgbuf;
1473
1474 rd.node = node;
1475 rd.buf = buf;
1476
1477 msg = dlm_lowcomms_new_msg(node->nodeid, buflen, GFP_NOFS,
1478 &msgbuf, midcomms_new_rawmsg_cb, &rd);
1479 if (!msg)
1480 return -ENOMEM;
1481
1482 memcpy(msgbuf, buf, buflen);
1483 dlm_lowcomms_commit_msg(msg);
1484 return 0;
1485}
1486
1487