1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133#define DLM_DEBUG_FENCE_TERMINATION 0
134
135#include <net/tcp.h>
136
137#include "dlm_internal.h"
138#include "lowcomms.h"
139#include "config.h"
140#include "lock.h"
141#include "util.h"
142#include "midcomms.h"
143
144
145#define DLM_SEQ_INIT 0
146
147#define DLM_SHUTDOWN_TIMEOUT msecs_to_jiffies(3 * 60 * 1000)
148#define DLM_VERSION_NOT_SET 0
149
150struct midcomms_node {
151 int nodeid;
152 uint32_t version;
153 uint32_t seq_send;
154 uint32_t seq_next;
155
156
157
158
159
160 struct list_head send_queue;
161 spinlock_t send_queue_lock;
162 atomic_t send_queue_cnt;
163#define DLM_NODE_FLAG_CLOSE 1
164#define DLM_NODE_FLAG_STOP_TX 2
165#define DLM_NODE_FLAG_STOP_RX 3
166#define DLM_NODE_ULP_DELIVERED 4
167 unsigned long flags;
168 wait_queue_head_t shutdown_wait;
169
170
171#define DLM_CLOSED 1
172#define DLM_ESTABLISHED 2
173#define DLM_FIN_WAIT1 3
174#define DLM_FIN_WAIT2 4
175#define DLM_CLOSE_WAIT 5
176#define DLM_LAST_ACK 6
177#define DLM_CLOSING 7
178 int state;
179 spinlock_t state_lock;
180
181
182
183
184
185 int users;
186
187
188 void *debugfs;
189
190 struct hlist_node hlist;
191 struct rcu_head rcu;
192};
193
194struct dlm_mhandle {
195 const struct dlm_header *inner_hd;
196 struct midcomms_node *node;
197 struct dlm_opts *opts;
198 struct dlm_msg *msg;
199 bool committed;
200 uint32_t seq;
201
202 void (*ack_rcv)(struct midcomms_node *node);
203
204
205 int idx;
206
207 struct list_head list;
208 struct rcu_head rcu;
209};
210
211static struct hlist_head node_hash[CONN_HASH_SIZE];
212static DEFINE_SPINLOCK(nodes_lock);
213DEFINE_STATIC_SRCU(nodes_srcu);
214
215
216
217
218
219
220
221static DEFINE_MUTEX(close_lock);
222
223static inline const char *dlm_state_str(int state)
224{
225 switch (state) {
226 case DLM_CLOSED:
227 return "CLOSED";
228 case DLM_ESTABLISHED:
229 return "ESTABLISHED";
230 case DLM_FIN_WAIT1:
231 return "FIN_WAIT1";
232 case DLM_FIN_WAIT2:
233 return "FIN_WAIT2";
234 case DLM_CLOSE_WAIT:
235 return "CLOSE_WAIT";
236 case DLM_LAST_ACK:
237 return "LAST_ACK";
238 case DLM_CLOSING:
239 return "CLOSING";
240 default:
241 return "UNKNOWN";
242 }
243}
244
245const char *dlm_midcomms_state(struct midcomms_node *node)
246{
247 return dlm_state_str(node->state);
248}
249
250unsigned long dlm_midcomms_flags(struct midcomms_node *node)
251{
252 return node->flags;
253}
254
255int dlm_midcomms_send_queue_cnt(struct midcomms_node *node)
256{
257 return atomic_read(&node->send_queue_cnt);
258}
259
260uint32_t dlm_midcomms_version(struct midcomms_node *node)
261{
262 return node->version;
263}
264
265static struct midcomms_node *__find_node(int nodeid, int r)
266{
267 struct midcomms_node *node;
268
269 hlist_for_each_entry_rcu(node, &node_hash[r], hlist) {
270 if (node->nodeid == nodeid)
271 return node;
272 }
273
274 return NULL;
275}
276
277static void dlm_mhandle_release(struct rcu_head *rcu)
278{
279 struct dlm_mhandle *mh = container_of(rcu, struct dlm_mhandle, rcu);
280
281 dlm_lowcomms_put_msg(mh->msg);
282 kfree(mh);
283}
284
285static void dlm_mhandle_delete(struct midcomms_node *node,
286 struct dlm_mhandle *mh)
287{
288 list_del_rcu(&mh->list);
289 atomic_dec(&node->send_queue_cnt);
290 call_rcu(&mh->rcu, dlm_mhandle_release);
291}
292
293static void dlm_send_queue_flush(struct midcomms_node *node)
294{
295 struct dlm_mhandle *mh;
296
297 pr_debug("flush midcomms send queue of node %d\n", node->nodeid);
298
299 rcu_read_lock();
300 spin_lock(&node->send_queue_lock);
301 list_for_each_entry_rcu(mh, &node->send_queue, list) {
302 dlm_mhandle_delete(node, mh);
303 }
304 spin_unlock(&node->send_queue_lock);
305 rcu_read_unlock();
306}
307
308static void midcomms_node_reset(struct midcomms_node *node)
309{
310 pr_debug("reset node %d\n", node->nodeid);
311
312 node->seq_next = DLM_SEQ_INIT;
313 node->seq_send = DLM_SEQ_INIT;
314 node->version = DLM_VERSION_NOT_SET;
315 node->flags = 0;
316
317 dlm_send_queue_flush(node);
318 node->state = DLM_CLOSED;
319 wake_up(&node->shutdown_wait);
320}
321
322static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc)
323{
324 struct midcomms_node *node, *tmp;
325 int r = nodeid_hash(nodeid);
326
327 node = __find_node(nodeid, r);
328 if (node || !alloc)
329 return node;
330
331 node = kmalloc(sizeof(*node), alloc);
332 if (!node)
333 return NULL;
334
335 node->nodeid = nodeid;
336 spin_lock_init(&node->state_lock);
337 spin_lock_init(&node->send_queue_lock);
338 atomic_set(&node->send_queue_cnt, 0);
339 INIT_LIST_HEAD(&node->send_queue);
340 init_waitqueue_head(&node->shutdown_wait);
341 node->users = 0;
342 midcomms_node_reset(node);
343
344 spin_lock(&nodes_lock);
345
346
347
348 tmp = __find_node(nodeid, r);
349 if (tmp) {
350 spin_unlock(&nodes_lock);
351 kfree(node);
352 return tmp;
353 }
354
355 hlist_add_head_rcu(&node->hlist, &node_hash[r]);
356 spin_unlock(&nodes_lock);
357
358 node->debugfs = dlm_create_debug_comms_file(nodeid, node);
359 return node;
360}
361
362static int dlm_send_ack(int nodeid, uint32_t seq)
363{
364 int mb_len = sizeof(struct dlm_header);
365 struct dlm_header *m_header;
366 struct dlm_msg *msg;
367 char *ppc;
368
369 msg = dlm_lowcomms_new_msg(nodeid, mb_len, GFP_NOFS, &ppc,
370 NULL, NULL);
371 if (!msg)
372 return -ENOMEM;
373
374 m_header = (struct dlm_header *)ppc;
375
376 m_header->h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
377 m_header->h_nodeid = dlm_our_nodeid();
378 m_header->h_length = mb_len;
379 m_header->h_cmd = DLM_ACK;
380 m_header->u.h_seq = seq;
381
382 header_out(m_header);
383 dlm_lowcomms_commit_msg(msg);
384 dlm_lowcomms_put_msg(msg);
385
386 return 0;
387}
388
389static int dlm_send_fin(struct midcomms_node *node,
390 void (*ack_rcv)(struct midcomms_node *node))
391{
392 int mb_len = sizeof(struct dlm_header);
393 struct dlm_header *m_header;
394 struct dlm_mhandle *mh;
395 char *ppc;
396
397 mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, GFP_NOFS, &ppc);
398 if (!mh)
399 return -ENOMEM;
400
401 mh->ack_rcv = ack_rcv;
402
403 m_header = (struct dlm_header *)ppc;
404
405 m_header->h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
406 m_header->h_nodeid = dlm_our_nodeid();
407 m_header->h_length = mb_len;
408 m_header->h_cmd = DLM_FIN;
409
410 header_out(m_header);
411
412 pr_debug("sending fin msg to node %d\n", node->nodeid);
413 dlm_midcomms_commit_mhandle(mh);
414 set_bit(DLM_NODE_FLAG_STOP_TX, &node->flags);
415
416 return 0;
417}
418
419static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
420{
421 struct dlm_mhandle *mh;
422
423 rcu_read_lock();
424 list_for_each_entry_rcu(mh, &node->send_queue, list) {
425 if (before(mh->seq, seq)) {
426 if (mh->ack_rcv)
427 mh->ack_rcv(node);
428 } else {
429
430 break;
431 }
432 }
433
434 spin_lock(&node->send_queue_lock);
435 list_for_each_entry_rcu(mh, &node->send_queue, list) {
436 if (before(mh->seq, seq)) {
437 dlm_mhandle_delete(node, mh);
438 } else {
439
440 break;
441 }
442 }
443 spin_unlock(&node->send_queue_lock);
444 rcu_read_unlock();
445}
446
447static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
448{
449 spin_lock(&node->state_lock);
450 pr_debug("receive passive fin ack from node %d with state %s\n",
451 node->nodeid, dlm_state_str(node->state));
452
453 switch (node->state) {
454 case DLM_LAST_ACK:
455
456 midcomms_node_reset(node);
457 break;
458 case DLM_CLOSED:
459
460 wake_up(&node->shutdown_wait);
461 break;
462 default:
463 spin_unlock(&node->state_lock);
464 log_print("%s: unexpected state: %d\n",
465 __func__, node->state);
466 WARN_ON(1);
467 return;
468 }
469 spin_unlock(&node->state_lock);
470}
471
472static void dlm_midcomms_receive_buffer(union dlm_packet *p,
473 struct midcomms_node *node,
474 uint32_t seq)
475{
476 if (seq == node->seq_next) {
477 node->seq_next++;
478
479 switch (p->header.h_cmd) {
480 case DLM_FIN:
481
482 dlm_send_ack(node->nodeid, node->seq_next);
483
484 spin_lock(&node->state_lock);
485 pr_debug("receive fin msg from node %d with state %s\n",
486 node->nodeid, dlm_state_str(node->state));
487
488 switch (node->state) {
489 case DLM_ESTABLISHED:
490 node->state = DLM_CLOSE_WAIT;
491 pr_debug("switch node %d to state %s\n",
492 node->nodeid, dlm_state_str(node->state));
493
494
495
496
497 if (node->users == 0) {
498 node->state = DLM_LAST_ACK;
499 pr_debug("switch node %d to state %s case 1\n",
500 node->nodeid, dlm_state_str(node->state));
501 spin_unlock(&node->state_lock);
502 goto send_fin;
503 }
504 break;
505 case DLM_FIN_WAIT1:
506 node->state = DLM_CLOSING;
507 pr_debug("switch node %d to state %s\n",
508 node->nodeid, dlm_state_str(node->state));
509 break;
510 case DLM_FIN_WAIT2:
511 midcomms_node_reset(node);
512 pr_debug("switch node %d to state %s\n",
513 node->nodeid, dlm_state_str(node->state));
514 wake_up(&node->shutdown_wait);
515 break;
516 case DLM_LAST_ACK:
517
518 break;
519 default:
520 spin_unlock(&node->state_lock);
521 log_print("%s: unexpected state: %d\n",
522 __func__, node->state);
523 WARN_ON(1);
524 return;
525 }
526 spin_unlock(&node->state_lock);
527
528 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
529 break;
530 default:
531 WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
532 dlm_receive_buffer(p, node->nodeid);
533 set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
534 break;
535 }
536 } else {
537
538
539
540 if (seq < node->seq_next)
541 dlm_send_ack(node->nodeid, node->seq_next);
542
543 log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d",
544 seq, node->seq_next, node->nodeid);
545 }
546
547 return;
548
549send_fin:
550 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
551 dlm_send_fin(node, dlm_pas_fin_ack_rcv);
552}
553
554static struct midcomms_node *
555dlm_midcomms_recv_node_lookup(int nodeid, const union dlm_packet *p,
556 uint16_t msglen, int (*cb)(struct midcomms_node *node))
557{
558 struct midcomms_node *node = NULL;
559 gfp_t allocation = 0;
560 int ret;
561
562 switch (p->header.h_cmd) {
563 case DLM_RCOM:
564 if (msglen < sizeof(struct dlm_rcom)) {
565 log_print("rcom msg too small: %u, will skip this message from node %d",
566 msglen, nodeid);
567 return NULL;
568 }
569
570 switch (le32_to_cpu(p->rcom.rc_type)) {
571 case DLM_RCOM_NAMES:
572 fallthrough;
573 case DLM_RCOM_NAMES_REPLY:
574 fallthrough;
575 case DLM_RCOM_STATUS:
576 fallthrough;
577 case DLM_RCOM_STATUS_REPLY:
578 node = nodeid2node(nodeid, 0);
579 if (node) {
580 spin_lock(&node->state_lock);
581 if (node->state != DLM_ESTABLISHED)
582 pr_debug("receive begin RCOM msg from node %d with state %s\n",
583 node->nodeid, dlm_state_str(node->state));
584
585 switch (node->state) {
586 case DLM_CLOSED:
587 node->state = DLM_ESTABLISHED;
588 pr_debug("switch node %d to state %s\n",
589 node->nodeid, dlm_state_str(node->state));
590 break;
591 case DLM_ESTABLISHED:
592 break;
593 default:
594
595
596
597
598 log_print("reset node %d because shutdown stuck",
599 node->nodeid);
600
601 midcomms_node_reset(node);
602 node->state = DLM_ESTABLISHED;
603 break;
604 }
605 spin_unlock(&node->state_lock);
606 }
607
608 allocation = GFP_NOFS;
609 break;
610 default:
611 break;
612 }
613
614 break;
615 default:
616 break;
617 }
618
619 node = nodeid2node(nodeid, allocation);
620 if (!node) {
621 switch (p->header.h_cmd) {
622 case DLM_OPTS:
623 if (msglen < sizeof(struct dlm_opts)) {
624 log_print("opts msg too small: %u, will skip this message from node %d",
625 msglen, nodeid);
626 return NULL;
627 }
628
629 log_print_ratelimited("received dlm opts message nextcmd %d from node %d in an invalid sequence",
630 p->opts.o_nextcmd, nodeid);
631 break;
632 default:
633 log_print_ratelimited("received dlm message cmd %d from node %d in an invalid sequence",
634 p->header.h_cmd, nodeid);
635 break;
636 }
637
638 return NULL;
639 }
640
641 ret = cb(node);
642 if (ret < 0)
643 return NULL;
644
645 return node;
646}
647
648static int dlm_midcomms_version_check_3_2(struct midcomms_node *node)
649{
650 switch (node->version) {
651 case DLM_VERSION_NOT_SET:
652 node->version = DLM_VERSION_3_2;
653 log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
654 node->nodeid);
655 break;
656 case DLM_VERSION_3_2:
657 break;
658 default:
659 log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
660 DLM_VERSION_3_2, node->nodeid, node->version);
661 return -1;
662 }
663
664 return 0;
665}
666
667static int dlm_opts_check_msglen(union dlm_packet *p, uint16_t msglen, int nodeid)
668{
669 int len = msglen;
670
671
672
673
674 if (len < sizeof(struct dlm_opts))
675 return -1;
676 len -= sizeof(struct dlm_opts);
677
678 if (len < le16_to_cpu(p->opts.o_optlen))
679 return -1;
680 len -= le16_to_cpu(p->opts.o_optlen);
681
682 switch (p->opts.o_nextcmd) {
683 case DLM_FIN:
684 if (len < sizeof(struct dlm_header)) {
685 log_print("fin too small: %d, will skip this message from node %d",
686 len, nodeid);
687 return -1;
688 }
689
690 break;
691 case DLM_MSG:
692 if (len < sizeof(struct dlm_message)) {
693 log_print("msg too small: %d, will skip this message from node %d",
694 msglen, nodeid);
695 return -1;
696 }
697
698 break;
699 case DLM_RCOM:
700 if (len < sizeof(struct dlm_rcom)) {
701 log_print("rcom msg too small: %d, will skip this message from node %d",
702 len, nodeid);
703 return -1;
704 }
705
706 break;
707 default:
708 log_print("unsupported o_nextcmd received: %u, will skip this message from node %d",
709 p->opts.o_nextcmd, nodeid);
710 return -1;
711 }
712
713 return 0;
714}
715
716static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid)
717{
718 uint16_t msglen = le16_to_cpu(p->header.h_length);
719 struct midcomms_node *node;
720 uint32_t seq;
721 int ret, idx;
722
723 idx = srcu_read_lock(&nodes_srcu);
724 node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
725 dlm_midcomms_version_check_3_2);
726 if (!node)
727 goto out;
728
729 switch (p->header.h_cmd) {
730 case DLM_RCOM:
731
732
733
734
735
736
737 switch (le32_to_cpu(p->rcom.rc_type)) {
738 case DLM_RCOM_NAMES:
739 fallthrough;
740 case DLM_RCOM_NAMES_REPLY:
741 fallthrough;
742 case DLM_RCOM_STATUS:
743 fallthrough;
744 case DLM_RCOM_STATUS_REPLY:
745 break;
746 default:
747 log_print("unsupported rcom type received: %u, will skip this message from node %d",
748 le32_to_cpu(p->rcom.rc_type), nodeid);
749 goto out;
750 }
751
752 WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
753 dlm_receive_buffer(p, nodeid);
754 break;
755 case DLM_OPTS:
756 seq = le32_to_cpu(p->header.u.h_seq);
757
758 ret = dlm_opts_check_msglen(p, msglen, nodeid);
759 if (ret < 0) {
760 log_print("opts msg too small: %u, will skip this message from node %d",
761 msglen, nodeid);
762 goto out;
763 }
764
765 p = (union dlm_packet *)((unsigned char *)p->opts.o_opts +
766 le16_to_cpu(p->opts.o_optlen));
767
768
769 msglen = le16_to_cpu(p->header.h_length);
770 switch (p->header.h_cmd) {
771 case DLM_RCOM:
772 if (msglen < sizeof(struct dlm_rcom)) {
773 log_print("inner rcom msg too small: %u, will skip this message from node %d",
774 msglen, nodeid);
775 goto out;
776 }
777
778 break;
779 case DLM_MSG:
780 if (msglen < sizeof(struct dlm_message)) {
781 log_print("inner msg too small: %u, will skip this message from node %d",
782 msglen, nodeid);
783 goto out;
784 }
785
786 break;
787 case DLM_FIN:
788 if (msglen < sizeof(struct dlm_header)) {
789 log_print("inner fin too small: %u, will skip this message from node %d",
790 msglen, nodeid);
791 goto out;
792 }
793
794 break;
795 default:
796 log_print("unsupported inner h_cmd received: %u, will skip this message from node %d",
797 msglen, nodeid);
798 goto out;
799 }
800
801 dlm_midcomms_receive_buffer(p, node, seq);
802 break;
803 case DLM_ACK:
804 seq = le32_to_cpu(p->header.u.h_seq);
805 dlm_receive_ack(node, seq);
806 break;
807 default:
808 log_print("unsupported h_cmd received: %u, will skip this message from node %d",
809 p->header.h_cmd, nodeid);
810 break;
811 }
812
813out:
814 srcu_read_unlock(&nodes_srcu, idx);
815}
816
817static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
818{
819 switch (node->version) {
820 case DLM_VERSION_NOT_SET:
821 node->version = DLM_VERSION_3_1;
822 log_print("version 0x%08x for node %d detected", DLM_VERSION_3_1,
823 node->nodeid);
824 break;
825 case DLM_VERSION_3_1:
826 break;
827 default:
828 log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
829 DLM_VERSION_3_1, node->nodeid, node->version);
830 return -1;
831 }
832
833 return 0;
834}
835
836static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid)
837{
838 uint16_t msglen = le16_to_cpu(p->header.h_length);
839 struct midcomms_node *node;
840 int idx;
841
842 idx = srcu_read_lock(&nodes_srcu);
843 node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
844 dlm_midcomms_version_check_3_1);
845 if (!node) {
846 srcu_read_unlock(&nodes_srcu, idx);
847 return;
848 }
849 srcu_read_unlock(&nodes_srcu, idx);
850
851 switch (p->header.h_cmd) {
852 case DLM_RCOM:
853
854 break;
855 case DLM_MSG:
856 if (msglen < sizeof(struct dlm_message)) {
857 log_print("msg too small: %u, will skip this message from node %d",
858 msglen, nodeid);
859 return;
860 }
861
862 break;
863 default:
864 log_print("unsupported h_cmd received: %u, will skip this message from node %d",
865 p->header.h_cmd, nodeid);
866 return;
867 }
868
869 dlm_receive_buffer(p, nodeid);
870}
871
872
873
874
875
876
877int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
878{
879 const unsigned char *ptr = buf;
880 const struct dlm_header *hd;
881 uint16_t msglen;
882 int ret = 0;
883
884 while (len >= sizeof(struct dlm_header)) {
885 hd = (struct dlm_header *)ptr;
886
887
888
889
890
891
892
893
894
895
896
897
898 msglen = le16_to_cpu(hd->h_length);
899 if (msglen > DLM_MAX_SOCKET_BUFSIZE ||
900 msglen < sizeof(struct dlm_header)) {
901 log_print("received invalid length header: %u from node %d, will abort message parsing",
902 msglen, nodeid);
903 return -EBADMSG;
904 }
905
906
907
908
909 if (msglen > len)
910 break;
911
912 switch (le32_to_cpu(hd->h_version)) {
913 case DLM_VERSION_3_1:
914 dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid);
915 break;
916 case DLM_VERSION_3_2:
917 dlm_midcomms_receive_buffer_3_2((union dlm_packet *)ptr, nodeid);
918 break;
919 default:
920 log_print("received invalid version header: %u from node %d, will skip this message",
921 le32_to_cpu(hd->h_version), nodeid);
922 break;
923 }
924
925 ret += msglen;
926 len -= msglen;
927 ptr += msglen;
928 }
929
930 return ret;
931}
932
933void dlm_midcomms_receive_done(int nodeid)
934{
935 struct midcomms_node *node;
936 int idx;
937
938 idx = srcu_read_lock(&nodes_srcu);
939 node = nodeid2node(nodeid, 0);
940 if (!node) {
941 srcu_read_unlock(&nodes_srcu, idx);
942 return;
943 }
944
945
946 switch (node->version) {
947 case DLM_VERSION_3_2:
948 break;
949 default:
950 srcu_read_unlock(&nodes_srcu, idx);
951 return;
952 }
953
954
955 if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
956 &node->flags)) {
957 srcu_read_unlock(&nodes_srcu, idx);
958 return;
959 }
960
961 spin_lock(&node->state_lock);
962
963 switch (node->state) {
964 case DLM_ESTABLISHED:
965 spin_unlock(&node->state_lock);
966 dlm_send_ack(node->nodeid, node->seq_next);
967 break;
968 default:
969 spin_unlock(&node->state_lock);
970
971 break;
972 };
973 srcu_read_unlock(&nodes_srcu, idx);
974}
975
976void dlm_midcomms_unack_msg_resend(int nodeid)
977{
978 struct midcomms_node *node;
979 struct dlm_mhandle *mh;
980 int idx, ret;
981
982 idx = srcu_read_lock(&nodes_srcu);
983 node = nodeid2node(nodeid, 0);
984 if (!node) {
985 srcu_read_unlock(&nodes_srcu, idx);
986 return;
987 }
988
989
990 switch (node->version) {
991 case DLM_VERSION_3_2:
992 break;
993 default:
994 srcu_read_unlock(&nodes_srcu, idx);
995 return;
996 }
997
998 rcu_read_lock();
999 list_for_each_entry_rcu(mh, &node->send_queue, list) {
1000 if (!mh->committed)
1001 continue;
1002
1003 ret = dlm_lowcomms_resend_msg(mh->msg);
1004 if (!ret)
1005 log_print_ratelimited("retransmit dlm msg, seq %u, nodeid %d",
1006 mh->seq, node->nodeid);
1007 }
1008 rcu_read_unlock();
1009 srcu_read_unlock(&nodes_srcu, idx);
1010}
1011
1012static void dlm_fill_opts_header(struct dlm_opts *opts, uint16_t inner_len,
1013 uint32_t seq)
1014{
1015 opts->o_header.h_cmd = DLM_OPTS;
1016 opts->o_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
1017 opts->o_header.h_nodeid = dlm_our_nodeid();
1018 opts->o_header.h_length = DLM_MIDCOMMS_OPT_LEN + inner_len;
1019 opts->o_header.u.h_seq = seq;
1020 header_out(&opts->o_header);
1021}
1022
1023static void midcomms_new_msg_cb(struct dlm_mhandle *mh)
1024{
1025 atomic_inc(&mh->node->send_queue_cnt);
1026
1027 spin_lock(&mh->node->send_queue_lock);
1028 list_add_tail_rcu(&mh->list, &mh->node->send_queue);
1029 spin_unlock(&mh->node->send_queue_lock);
1030
1031 mh->seq = mh->node->seq_send++;
1032}
1033
1034static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid,
1035 int len, gfp_t allocation, char **ppc)
1036{
1037 struct dlm_opts *opts;
1038 struct dlm_msg *msg;
1039
1040 msg = dlm_lowcomms_new_msg(nodeid, len + DLM_MIDCOMMS_OPT_LEN,
1041 allocation, ppc, midcomms_new_msg_cb, mh);
1042 if (!msg)
1043 return NULL;
1044
1045 opts = (struct dlm_opts *)*ppc;
1046 mh->opts = opts;
1047
1048
1049 dlm_fill_opts_header(opts, len, mh->seq);
1050
1051 *ppc += sizeof(*opts);
1052 mh->inner_hd = (const struct dlm_header *)*ppc;
1053 return msg;
1054}
1055
1056struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
1057 gfp_t allocation, char **ppc)
1058{
1059 struct midcomms_node *node;
1060 struct dlm_mhandle *mh;
1061 struct dlm_msg *msg;
1062 int idx;
1063
1064 idx = srcu_read_lock(&nodes_srcu);
1065 node = nodeid2node(nodeid, 0);
1066 if (!node) {
1067 WARN_ON_ONCE(1);
1068 goto err;
1069 }
1070
1071
1072 WARN_ON(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
1073
1074 mh = kzalloc(sizeof(*mh), GFP_NOFS);
1075 if (!mh)
1076 goto err;
1077
1078 mh->idx = idx;
1079 mh->node = node;
1080
1081 switch (node->version) {
1082 case DLM_VERSION_3_1:
1083 msg = dlm_lowcomms_new_msg(nodeid, len, allocation, ppc,
1084 NULL, NULL);
1085 if (!msg) {
1086 kfree(mh);
1087 goto err;
1088 }
1089
1090 break;
1091 case DLM_VERSION_3_2:
1092 msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation,
1093 ppc);
1094 if (!msg) {
1095 kfree(mh);
1096 goto err;
1097 }
1098
1099 break;
1100 default:
1101 kfree(mh);
1102 WARN_ON(1);
1103 goto err;
1104 }
1105
1106 mh->msg = msg;
1107
1108
1109
1110
1111
1112
1113 return mh;
1114
1115err:
1116 srcu_read_unlock(&nodes_srcu, idx);
1117 return NULL;
1118}
1119
1120static void dlm_midcomms_commit_msg_3_2(struct dlm_mhandle *mh)
1121{
1122
1123 mh->opts->o_nextcmd = mh->inner_hd->h_cmd;
1124 mh->committed = true;
1125 dlm_lowcomms_commit_msg(mh->msg);
1126}
1127
1128void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh)
1129{
1130 switch (mh->node->version) {
1131 case DLM_VERSION_3_1:
1132 srcu_read_unlock(&nodes_srcu, mh->idx);
1133
1134 dlm_lowcomms_commit_msg(mh->msg);
1135 dlm_lowcomms_put_msg(mh->msg);
1136
1137 kfree(mh);
1138 break;
1139 case DLM_VERSION_3_2:
1140 dlm_midcomms_commit_msg_3_2(mh);
1141 srcu_read_unlock(&nodes_srcu, mh->idx);
1142 break;
1143 default:
1144 srcu_read_unlock(&nodes_srcu, mh->idx);
1145 WARN_ON(1);
1146 break;
1147 }
1148}
1149
1150int dlm_midcomms_start(void)
1151{
1152 int i;
1153
1154 for (i = 0; i < CONN_HASH_SIZE; i++)
1155 INIT_HLIST_HEAD(&node_hash[i]);
1156
1157 return dlm_lowcomms_start();
1158}
1159
1160static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
1161{
1162 spin_lock(&node->state_lock);
1163 pr_debug("receive active fin ack from node %d with state %s\n",
1164 node->nodeid, dlm_state_str(node->state));
1165
1166 switch (node->state) {
1167 case DLM_FIN_WAIT1:
1168 node->state = DLM_FIN_WAIT2;
1169 pr_debug("switch node %d to state %s\n",
1170 node->nodeid, dlm_state_str(node->state));
1171 break;
1172 case DLM_CLOSING:
1173 midcomms_node_reset(node);
1174 pr_debug("switch node %d to state %s\n",
1175 node->nodeid, dlm_state_str(node->state));
1176 wake_up(&node->shutdown_wait);
1177 break;
1178 case DLM_CLOSED:
1179
1180 wake_up(&node->shutdown_wait);
1181 break;
1182 default:
1183 spin_unlock(&node->state_lock);
1184 log_print("%s: unexpected state: %d\n",
1185 __func__, node->state);
1186 WARN_ON(1);
1187 return;
1188 }
1189 spin_unlock(&node->state_lock);
1190}
1191
1192void dlm_midcomms_add_member(int nodeid)
1193{
1194 struct midcomms_node *node;
1195 int idx;
1196
1197 if (nodeid == dlm_our_nodeid())
1198 return;
1199
1200 idx = srcu_read_lock(&nodes_srcu);
1201 node = nodeid2node(nodeid, GFP_NOFS);
1202 if (!node) {
1203 srcu_read_unlock(&nodes_srcu, idx);
1204 return;
1205 }
1206
1207 spin_lock(&node->state_lock);
1208 if (!node->users) {
1209 pr_debug("receive add member from node %d with state %s\n",
1210 node->nodeid, dlm_state_str(node->state));
1211 switch (node->state) {
1212 case DLM_ESTABLISHED:
1213 break;
1214 case DLM_CLOSED:
1215 node->state = DLM_ESTABLISHED;
1216 pr_debug("switch node %d to state %s\n",
1217 node->nodeid, dlm_state_str(node->state));
1218 break;
1219 default:
1220
1221
1222
1223
1224 log_print("reset node %d because shutdown stuck",
1225 node->nodeid);
1226
1227 midcomms_node_reset(node);
1228 node->state = DLM_ESTABLISHED;
1229 break;
1230 }
1231 }
1232
1233 node->users++;
1234 pr_debug("users inc count %d\n", node->users);
1235 spin_unlock(&node->state_lock);
1236
1237 srcu_read_unlock(&nodes_srcu, idx);
1238}
1239
1240void dlm_midcomms_remove_member(int nodeid)
1241{
1242 struct midcomms_node *node;
1243 int idx;
1244
1245 if (nodeid == dlm_our_nodeid())
1246 return;
1247
1248 idx = srcu_read_lock(&nodes_srcu);
1249 node = nodeid2node(nodeid, 0);
1250 if (!node) {
1251 srcu_read_unlock(&nodes_srcu, idx);
1252 return;
1253 }
1254
1255 spin_lock(&node->state_lock);
1256 node->users--;
1257 pr_debug("users dec count %d\n", node->users);
1258
1259
1260
1261
1262
1263 if (node->users == 0) {
1264 pr_debug("receive remove member from node %d with state %s\n",
1265 node->nodeid, dlm_state_str(node->state));
1266 switch (node->state) {
1267 case DLM_ESTABLISHED:
1268 break;
1269 case DLM_CLOSE_WAIT:
1270
1271 node->state = DLM_LAST_ACK;
1272 spin_unlock(&node->state_lock);
1273
1274 pr_debug("switch node %d to state %s case 2\n",
1275 node->nodeid, dlm_state_str(node->state));
1276 goto send_fin;
1277 case DLM_LAST_ACK:
1278
1279 break;
1280 case DLM_CLOSED:
1281
1282 break;
1283 default:
1284 log_print("%s: unexpected state: %d\n",
1285 __func__, node->state);
1286 break;
1287 }
1288 }
1289 spin_unlock(&node->state_lock);
1290
1291 srcu_read_unlock(&nodes_srcu, idx);
1292 return;
1293
1294send_fin:
1295 set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
1296 dlm_send_fin(node, dlm_pas_fin_ack_rcv);
1297 srcu_read_unlock(&nodes_srcu, idx);
1298}
1299
1300static void midcomms_node_release(struct rcu_head *rcu)
1301{
1302 struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
1303
1304 WARN_ON(atomic_read(&node->send_queue_cnt));
1305 kfree(node);
1306}
1307
1308static void midcomms_shutdown(struct midcomms_node *node)
1309{
1310 int ret;
1311
1312
1313 switch (node->version) {
1314 case DLM_VERSION_3_2:
1315 break;
1316 default:
1317 return;
1318 }
1319
1320 spin_lock(&node->state_lock);
1321 pr_debug("receive active shutdown for node %d with state %s\n",
1322 node->nodeid, dlm_state_str(node->state));
1323 switch (node->state) {
1324 case DLM_ESTABLISHED:
1325 node->state = DLM_FIN_WAIT1;
1326 pr_debug("switch node %d to state %s case 2\n",
1327 node->nodeid, dlm_state_str(node->state));
1328 break;
1329 case DLM_CLOSED:
1330
1331 spin_unlock(&node->state_lock);
1332 return;
1333 default:
1334
1335
1336
1337 break;
1338 }
1339 spin_unlock(&node->state_lock);
1340
1341 if (node->state == DLM_FIN_WAIT1) {
1342 dlm_send_fin(node, dlm_act_fin_ack_rcv);
1343
1344 if (DLM_DEBUG_FENCE_TERMINATION)
1345 msleep(5000);
1346 }
1347
1348
1349 ret = wait_event_timeout(node->shutdown_wait,
1350 node->state == DLM_CLOSED ||
1351 test_bit(DLM_NODE_FLAG_CLOSE, &node->flags),
1352 DLM_SHUTDOWN_TIMEOUT);
1353 if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags)) {
1354 pr_debug("active shutdown timed out for node %d with state %s\n",
1355 node->nodeid, dlm_state_str(node->state));
1356 midcomms_node_reset(node);
1357 return;
1358 }
1359
1360 pr_debug("active shutdown done for node %d with state %s\n",
1361 node->nodeid, dlm_state_str(node->state));
1362}
1363
1364void dlm_midcomms_shutdown(void)
1365{
1366 struct midcomms_node *node;
1367 int i, idx;
1368
1369 mutex_lock(&close_lock);
1370 idx = srcu_read_lock(&nodes_srcu);
1371 for (i = 0; i < CONN_HASH_SIZE; i++) {
1372 hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
1373 midcomms_shutdown(node);
1374
1375 dlm_delete_debug_comms_file(node->debugfs);
1376
1377 spin_lock(&nodes_lock);
1378 hlist_del_rcu(&node->hlist);
1379 spin_unlock(&nodes_lock);
1380
1381 call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
1382 }
1383 }
1384 srcu_read_unlock(&nodes_srcu, idx);
1385 mutex_unlock(&close_lock);
1386
1387 dlm_lowcomms_shutdown();
1388}
1389
1390int dlm_midcomms_close(int nodeid)
1391{
1392 struct midcomms_node *node;
1393 int idx, ret;
1394
1395 if (nodeid == dlm_our_nodeid())
1396 return 0;
1397
1398 idx = srcu_read_lock(&nodes_srcu);
1399
1400 node = nodeid2node(nodeid, 0);
1401 if (node) {
1402
1403 set_bit(DLM_NODE_FLAG_CLOSE, &node->flags);
1404 wake_up(&node->shutdown_wait);
1405 }
1406 srcu_read_unlock(&nodes_srcu, idx);
1407
1408 synchronize_srcu(&nodes_srcu);
1409
1410 idx = srcu_read_lock(&nodes_srcu);
1411 mutex_lock(&close_lock);
1412 node = nodeid2node(nodeid, 0);
1413 if (!node) {
1414 mutex_unlock(&close_lock);
1415 srcu_read_unlock(&nodes_srcu, idx);
1416 return dlm_lowcomms_close(nodeid);
1417 }
1418
1419 ret = dlm_lowcomms_close(nodeid);
1420 spin_lock(&node->state_lock);
1421 midcomms_node_reset(node);
1422 spin_unlock(&node->state_lock);
1423 srcu_read_unlock(&nodes_srcu, idx);
1424 mutex_unlock(&close_lock);
1425
1426 return ret;
1427}
1428