linux/fs/dlm/midcomms.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6**  Copyright (C) 2004-2021 Red Hat, Inc.  All rights reserved.
   7**
   8**
   9*******************************************************************************
  10******************************************************************************/
  11
  12/*
  13 * midcomms.c
  14 *
  15 * This is the appallingly named "mid-level" comms layer. It takes care about
  16 * deliver an on application layer "reliable" communication above the used
  17 * lowcomms transport layer.
  18 *
  19 * How it works:
  20 *
  21 * Each nodes keeps track of all send DLM messages in send_queue with a sequence
  22 * number. The receive will send an DLM_ACK message back for every DLM message
  23 * received at the other side. If a reconnect happens in lowcomms we will send
  24 * all unacknowledged dlm messages again. The receiving side might drop any already
  25 * received message by comparing sequence numbers.
  26 *
  27 * How version detection works:
  28 *
  29 * Due the fact that dlm has pre-configured node addresses on every side
  30 * it is in it's nature that every side connects at starts to transmit
  31 * dlm messages which ends in a race. However DLM_RCOM_NAMES, DLM_RCOM_STATUS
  32 * and their replies are the first messages which are exchanges. Due backwards
  33 * compatibility these messages are not covered by the midcomms re-transmission
  34 * layer. These messages have their own re-transmission handling in the dlm
  35 * application layer. The version field of every node will be set on these RCOM
  36 * messages as soon as they arrived and the node isn't yet part of the nodes
  37 * hash. There exists also logic to detect version mismatched if something weird
  38 * going on or the first messages isn't an expected one.
  39 *
  40 * Termination:
  41 *
  42 * The midcomms layer does a 4 way handshake for termination on DLM protocol
  43 * like TCP supports it with half-closed socket support. SCTP doesn't support
  44 * half-closed socket, so we do it on DLM layer. Also socket shutdown() can be
  45 * interrupted by .e.g. tcp reset itself. Additional there exists the othercon
  46 * paradigm in lowcomms which cannot be easily without breaking backwards
  47 * compatibility. A node cannot send anything to another node when a DLM_FIN
  48 * message was send. There exists additional logic to print a warning if
  49 * DLM wants to do it. There exists a state handling like RFC 793 but reduced
  50 * to termination only. The event "member removal event" describes the cluster
  51 * manager removed the node from internal lists, at this point DLM does not
  52 * send any message to the other node. There exists two cases:
  53 *
  54 * 1. The cluster member was removed and we received a FIN
  55 * OR
  56 * 2. We received a FIN but the member was not removed yet
  57 *
  58 * One of these cases will do the CLOSE_WAIT to LAST_ACK change.
  59 *
  60 *
  61 *                              +---------+
  62 *                              | CLOSED  |
  63 *                              +---------+
  64 *                                   | add member/receive RCOM version
  65 *                                   |            detection msg
  66 *                                   V
  67 *                              +---------+
  68 *                              |  ESTAB  |
  69 *                              +---------+
  70 *                       CLOSE    |     |    rcv FIN
  71 *                      -------   |     |    -------
  72 * +---------+          snd FIN  /       \   snd ACK          +---------+
  73 * |  FIN    |<-----------------           ------------------>|  CLOSE  |
  74 * | WAIT-1  |------------------                              |   WAIT  |
  75 * +---------+          rcv FIN  \                            +---------+
  76 * | rcv ACK of FIN   -------   |                            CLOSE  | member
  77 * | --------------   snd ACK   |                           ------- | removal
  78 * V        x                   V                           snd FIN V event
  79 * +---------+                  +---------+                   +---------+
  80 * |FINWAIT-2|                  | CLOSING |                   | LAST-ACK|
  81 * +---------+                  +---------+                   +---------+
  82 * |                rcv ACK of FIN |                 rcv ACK of FIN |
  83 * |  rcv FIN       -------------- |                 -------------- |
  84 * |  -------              x       V                        x       V
  85 *  \ snd ACK                 +---------+                   +---------+
  86 *   ------------------------>| CLOSED  |                   | CLOSED  |
  87 *                            +---------+                   +---------+
  88 *
  89 * NOTE: any state can interrupted by midcomms_close() and state will be
  90 * switched to CLOSED in case of fencing. There exists also some timeout
  91 * handling when we receive the version detection RCOM messages which is
  92 * made by observation.
  93 *
  94 * Future improvements:
  95 *
  96 * There exists some known issues/improvements of the dlm handling. Some
  97 * of them should be done in a next major dlm version bump which makes
  98 * it incompatible with previous versions.
  99 *
 100 * Unaligned memory access:
 101 *
 102 * There exists cases when the dlm message buffer length is not aligned
 103 * to 8 byte. However seems nobody detected any problem with it. This
 104 * can be fixed in the next major version bump of dlm.
 105 *
 106 * Version detection:
 107 *
 108 * The version detection and how it's done is related to backwards
 109 * compatibility. There exists better ways to make a better handling.
 110 * However this should be changed in the next major version bump of dlm.
 111 *
 112 * Tail Size checking:
 113 *
 114 * There exists a message tail payload in e.g. DLM_MSG however we don't
 115 * check it against the message length yet regarding to the receive buffer
 116 * length. That need to be validated.
 117 *
 118 * Fencing bad nodes:
 119 *
 120 * At timeout places or weird sequence number behaviours we should send
 121 * a fencing request to the cluster manager.
 122 */
 123
 124/* Debug switch to enable a 5 seconds sleep waiting of a termination.
 125 * This can be useful to test fencing while termination is running.
 126 * This requires a setup with only gfs2 as dlm user, so that the
 127 * last umount will terminate the connection.
 128 *
 129 * However it became useful to test, while the 5 seconds block in umount
 130 * just press the reset button. In a lot of dropping the termination
 131 * process can could take several seconds.
 132 */
 133#define DLM_DEBUG_FENCE_TERMINATION     0
 134
 135#include <net/tcp.h>
 136
 137#include "dlm_internal.h"
 138#include "lowcomms.h"
 139#include "config.h"
 140#include "lock.h"
 141#include "util.h"
 142#include "midcomms.h"
 143
 144/* init value for sequence numbers for testing purpose only e.g. overflows */
 145#define DLM_SEQ_INIT            0
 146/* 3 minutes wait to sync ending of dlm */
 147#define DLM_SHUTDOWN_TIMEOUT    msecs_to_jiffies(3 * 60 * 1000)
 148#define DLM_VERSION_NOT_SET     0
 149
 150struct midcomms_node {
 151        int nodeid;
 152        uint32_t version;
 153        uint32_t seq_send;
 154        uint32_t seq_next;
 155        /* These queues are unbound because we cannot drop any message in dlm.
 156         * We could send a fence signal for a specific node to the cluster
 157         * manager if queues hits some maximum value, however this handling
 158         * not supported yet.
 159         */
 160        struct list_head send_queue;
 161        spinlock_t send_queue_lock;
 162        atomic_t send_queue_cnt;
 163#define DLM_NODE_FLAG_CLOSE     1
 164#define DLM_NODE_FLAG_STOP_TX   2
 165#define DLM_NODE_FLAG_STOP_RX   3
 166#define DLM_NODE_ULP_DELIVERED  4
 167        unsigned long flags;
 168        wait_queue_head_t shutdown_wait;
 169
 170        /* dlm tcp termination state */
 171#define DLM_CLOSED      1
 172#define DLM_ESTABLISHED 2
 173#define DLM_FIN_WAIT1   3
 174#define DLM_FIN_WAIT2   4
 175#define DLM_CLOSE_WAIT  5
 176#define DLM_LAST_ACK    6
 177#define DLM_CLOSING     7
 178        int state;
 179        spinlock_t state_lock;
 180
 181        /* counts how many lockspaces are using this node
 182         * this refcount is necessary to determine if the
 183         * node wants to disconnect.
 184         */
 185        int users;
 186
 187        /* not protected by srcu, node_hash lifetime */
 188        void *debugfs;
 189
 190        struct hlist_node hlist;
 191        struct rcu_head rcu;
 192};
 193
 194struct dlm_mhandle {
 195        const struct dlm_header *inner_hd;
 196        struct midcomms_node *node;
 197        struct dlm_opts *opts;
 198        struct dlm_msg *msg;
 199        bool committed;
 200        uint32_t seq;
 201
 202        void (*ack_rcv)(struct midcomms_node *node);
 203
 204        /* get_mhandle/commit srcu idx exchange */
 205        int idx;
 206
 207        struct list_head list;
 208        struct rcu_head rcu;
 209};
 210
 211static struct hlist_head node_hash[CONN_HASH_SIZE];
 212static DEFINE_SPINLOCK(nodes_lock);
 213DEFINE_STATIC_SRCU(nodes_srcu);
 214
 215/* This mutex prevents that midcomms_close() is running while
 216 * stop() or remove(). As I experienced invalid memory access
 217 * behaviours when DLM_DEBUG_FENCE_TERMINATION is enabled and
 218 * resetting machines. I will end in some double deletion in nodes
 219 * datastructure.
 220 */
 221static DEFINE_MUTEX(close_lock);
 222
 223static inline const char *dlm_state_str(int state)
 224{
 225        switch (state) {
 226        case DLM_CLOSED:
 227                return "CLOSED";
 228        case DLM_ESTABLISHED:
 229                return "ESTABLISHED";
 230        case DLM_FIN_WAIT1:
 231                return "FIN_WAIT1";
 232        case DLM_FIN_WAIT2:
 233                return "FIN_WAIT2";
 234        case DLM_CLOSE_WAIT:
 235                return "CLOSE_WAIT";
 236        case DLM_LAST_ACK:
 237                return "LAST_ACK";
 238        case DLM_CLOSING:
 239                return "CLOSING";
 240        default:
 241                return "UNKNOWN";
 242        }
 243}
 244
 245const char *dlm_midcomms_state(struct midcomms_node *node)
 246{
 247        return dlm_state_str(node->state);
 248}
 249
 250unsigned long dlm_midcomms_flags(struct midcomms_node *node)
 251{
 252        return node->flags;
 253}
 254
 255int dlm_midcomms_send_queue_cnt(struct midcomms_node *node)
 256{
 257        return atomic_read(&node->send_queue_cnt);
 258}
 259
 260uint32_t dlm_midcomms_version(struct midcomms_node *node)
 261{
 262        return node->version;
 263}
 264
 265static struct midcomms_node *__find_node(int nodeid, int r)
 266{
 267        struct midcomms_node *node;
 268
 269        hlist_for_each_entry_rcu(node, &node_hash[r], hlist) {
 270                if (node->nodeid == nodeid)
 271                        return node;
 272        }
 273
 274        return NULL;
 275}
 276
 277static void dlm_mhandle_release(struct rcu_head *rcu)
 278{
 279        struct dlm_mhandle *mh = container_of(rcu, struct dlm_mhandle, rcu);
 280
 281        dlm_lowcomms_put_msg(mh->msg);
 282        kfree(mh);
 283}
 284
 285static void dlm_mhandle_delete(struct midcomms_node *node,
 286                               struct dlm_mhandle *mh)
 287{
 288        list_del_rcu(&mh->list);
 289        atomic_dec(&node->send_queue_cnt);
 290        call_rcu(&mh->rcu, dlm_mhandle_release);
 291}
 292
 293static void dlm_send_queue_flush(struct midcomms_node *node)
 294{
 295        struct dlm_mhandle *mh;
 296
 297        pr_debug("flush midcomms send queue of node %d\n", node->nodeid);
 298
 299        rcu_read_lock();
 300        spin_lock(&node->send_queue_lock);
 301        list_for_each_entry_rcu(mh, &node->send_queue, list) {
 302                dlm_mhandle_delete(node, mh);
 303        }
 304        spin_unlock(&node->send_queue_lock);
 305        rcu_read_unlock();
 306}
 307
 308static void midcomms_node_reset(struct midcomms_node *node)
 309{
 310        pr_debug("reset node %d\n", node->nodeid);
 311
 312        node->seq_next = DLM_SEQ_INIT;
 313        node->seq_send = DLM_SEQ_INIT;
 314        node->version = DLM_VERSION_NOT_SET;
 315        node->flags = 0;
 316
 317        dlm_send_queue_flush(node);
 318        node->state = DLM_CLOSED;
 319        wake_up(&node->shutdown_wait);
 320}
 321
 322static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc)
 323{
 324        struct midcomms_node *node, *tmp;
 325        int r = nodeid_hash(nodeid);
 326
 327        node = __find_node(nodeid, r);
 328        if (node || !alloc)
 329                return node;
 330
 331        node = kmalloc(sizeof(*node), alloc);
 332        if (!node)
 333                return NULL;
 334
 335        node->nodeid = nodeid;
 336        spin_lock_init(&node->state_lock);
 337        spin_lock_init(&node->send_queue_lock);
 338        atomic_set(&node->send_queue_cnt, 0);
 339        INIT_LIST_HEAD(&node->send_queue);
 340        init_waitqueue_head(&node->shutdown_wait);
 341        node->users = 0;
 342        midcomms_node_reset(node);
 343
 344        spin_lock(&nodes_lock);
 345        /* check again if there was somebody else
 346         * earlier here to add the node
 347         */
 348        tmp = __find_node(nodeid, r);
 349        if (tmp) {
 350                spin_unlock(&nodes_lock);
 351                kfree(node);
 352                return tmp;
 353        }
 354
 355        hlist_add_head_rcu(&node->hlist, &node_hash[r]);
 356        spin_unlock(&nodes_lock);
 357
 358        node->debugfs = dlm_create_debug_comms_file(nodeid, node);
 359        return node;
 360}
 361
 362static int dlm_send_ack(int nodeid, uint32_t seq)
 363{
 364        int mb_len = sizeof(struct dlm_header);
 365        struct dlm_header *m_header;
 366        struct dlm_msg *msg;
 367        char *ppc;
 368
 369        msg = dlm_lowcomms_new_msg(nodeid, mb_len, GFP_NOFS, &ppc,
 370                                   NULL, NULL);
 371        if (!msg)
 372                return -ENOMEM;
 373
 374        m_header = (struct dlm_header *)ppc;
 375
 376        m_header->h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
 377        m_header->h_nodeid = dlm_our_nodeid();
 378        m_header->h_length = mb_len;
 379        m_header->h_cmd = DLM_ACK;
 380        m_header->u.h_seq = seq;
 381
 382        header_out(m_header);
 383        dlm_lowcomms_commit_msg(msg);
 384        dlm_lowcomms_put_msg(msg);
 385
 386        return 0;
 387}
 388
 389static int dlm_send_fin(struct midcomms_node *node,
 390                        void (*ack_rcv)(struct midcomms_node *node))
 391{
 392        int mb_len = sizeof(struct dlm_header);
 393        struct dlm_header *m_header;
 394        struct dlm_mhandle *mh;
 395        char *ppc;
 396
 397        mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, GFP_NOFS, &ppc);
 398        if (!mh)
 399                return -ENOMEM;
 400
 401        mh->ack_rcv = ack_rcv;
 402
 403        m_header = (struct dlm_header *)ppc;
 404
 405        m_header->h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
 406        m_header->h_nodeid = dlm_our_nodeid();
 407        m_header->h_length = mb_len;
 408        m_header->h_cmd = DLM_FIN;
 409
 410        header_out(m_header);
 411
 412        pr_debug("sending fin msg to node %d\n", node->nodeid);
 413        dlm_midcomms_commit_mhandle(mh);
 414        set_bit(DLM_NODE_FLAG_STOP_TX, &node->flags);
 415
 416        return 0;
 417}
 418
 419static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
 420{
 421        struct dlm_mhandle *mh;
 422
 423        rcu_read_lock();
 424        list_for_each_entry_rcu(mh, &node->send_queue, list) {
 425                if (before(mh->seq, seq)) {
 426                        if (mh->ack_rcv)
 427                                mh->ack_rcv(node);
 428                } else {
 429                        /* send queue should be ordered */
 430                        break;
 431                }
 432        }
 433
 434        spin_lock(&node->send_queue_lock);
 435        list_for_each_entry_rcu(mh, &node->send_queue, list) {
 436                if (before(mh->seq, seq)) {
 437                        dlm_mhandle_delete(node, mh);
 438                } else {
 439                        /* send queue should be ordered */
 440                        break;
 441                }
 442        }
 443        spin_unlock(&node->send_queue_lock);
 444        rcu_read_unlock();
 445}
 446
 447static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
 448{
 449        spin_lock(&node->state_lock);
 450        pr_debug("receive passive fin ack from node %d with state %s\n",
 451                 node->nodeid, dlm_state_str(node->state));
 452
 453        switch (node->state) {
 454        case DLM_LAST_ACK:
 455                /* DLM_CLOSED */
 456                midcomms_node_reset(node);
 457                break;
 458        case DLM_CLOSED:
 459                /* not valid but somehow we got what we want */
 460                wake_up(&node->shutdown_wait);
 461                break;
 462        default:
 463                spin_unlock(&node->state_lock);
 464                log_print("%s: unexpected state: %d\n",
 465                          __func__, node->state);
 466                WARN_ON(1);
 467                return;
 468        }
 469        spin_unlock(&node->state_lock);
 470}
 471
 472static void dlm_midcomms_receive_buffer(union dlm_packet *p,
 473                                        struct midcomms_node *node,
 474                                        uint32_t seq)
 475{
 476        if (seq == node->seq_next) {
 477                node->seq_next++;
 478
 479                switch (p->header.h_cmd) {
 480                case DLM_FIN:
 481                        /* send ack before fin */
 482                        dlm_send_ack(node->nodeid, node->seq_next);
 483
 484                        spin_lock(&node->state_lock);
 485                        pr_debug("receive fin msg from node %d with state %s\n",
 486                                 node->nodeid, dlm_state_str(node->state));
 487
 488                        switch (node->state) {
 489                        case DLM_ESTABLISHED:
 490                                node->state = DLM_CLOSE_WAIT;
 491                                pr_debug("switch node %d to state %s\n",
 492                                         node->nodeid, dlm_state_str(node->state));
 493                                /* passive shutdown DLM_LAST_ACK case 1
 494                                 * additional we check if the node is used by
 495                                 * cluster manager events at all.
 496                                 */
 497                                if (node->users == 0) {
 498                                        node->state = DLM_LAST_ACK;
 499                                        pr_debug("switch node %d to state %s case 1\n",
 500                                                 node->nodeid, dlm_state_str(node->state));
 501                                        spin_unlock(&node->state_lock);
 502                                        goto send_fin;
 503                                }
 504                                break;
 505                        case DLM_FIN_WAIT1:
 506                                node->state = DLM_CLOSING;
 507                                pr_debug("switch node %d to state %s\n",
 508                                         node->nodeid, dlm_state_str(node->state));
 509                                break;
 510                        case DLM_FIN_WAIT2:
 511                                midcomms_node_reset(node);
 512                                pr_debug("switch node %d to state %s\n",
 513                                         node->nodeid, dlm_state_str(node->state));
 514                                wake_up(&node->shutdown_wait);
 515                                break;
 516                        case DLM_LAST_ACK:
 517                                /* probably remove_member caught it, do nothing */
 518                                break;
 519                        default:
 520                                spin_unlock(&node->state_lock);
 521                                log_print("%s: unexpected state: %d\n",
 522                                          __func__, node->state);
 523                                WARN_ON(1);
 524                                return;
 525                        }
 526                        spin_unlock(&node->state_lock);
 527
 528                        set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
 529                        break;
 530                default:
 531                        WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
 532                        dlm_receive_buffer(p, node->nodeid);
 533                        set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
 534                        break;
 535                }
 536        } else {
 537                /* retry to ack message which we already have by sending back
 538                 * current node->seq_next number as ack.
 539                 */
 540                if (seq < node->seq_next)
 541                        dlm_send_ack(node->nodeid, node->seq_next);
 542
 543                log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d",
 544                                      seq, node->seq_next, node->nodeid);
 545        }
 546
 547        return;
 548
 549send_fin:
 550        set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
 551        dlm_send_fin(node, dlm_pas_fin_ack_rcv);
 552}
 553
 554static struct midcomms_node *
 555dlm_midcomms_recv_node_lookup(int nodeid, const union dlm_packet *p,
 556                              uint16_t msglen, int (*cb)(struct midcomms_node *node))
 557{
 558        struct midcomms_node *node = NULL;
 559        gfp_t allocation = 0;
 560        int ret;
 561
 562        switch (p->header.h_cmd) {
 563        case DLM_RCOM:
 564                if (msglen < sizeof(struct dlm_rcom)) {
 565                        log_print("rcom msg too small: %u, will skip this message from node %d",
 566                                  msglen, nodeid);
 567                        return NULL;
 568                }
 569
 570                switch (le32_to_cpu(p->rcom.rc_type)) {
 571                case DLM_RCOM_NAMES:
 572                        fallthrough;
 573                case DLM_RCOM_NAMES_REPLY:
 574                        fallthrough;
 575                case DLM_RCOM_STATUS:
 576                        fallthrough;
 577                case DLM_RCOM_STATUS_REPLY:
 578                        node = nodeid2node(nodeid, 0);
 579                        if (node) {
 580                                spin_lock(&node->state_lock);
 581                                if (node->state != DLM_ESTABLISHED)
 582                                        pr_debug("receive begin RCOM msg from node %d with state %s\n",
 583                                                 node->nodeid, dlm_state_str(node->state));
 584
 585                                switch (node->state) {
 586                                case DLM_CLOSED:
 587                                        node->state = DLM_ESTABLISHED;
 588                                        pr_debug("switch node %d to state %s\n",
 589                                                 node->nodeid, dlm_state_str(node->state));
 590                                        break;
 591                                case DLM_ESTABLISHED:
 592                                        break;
 593                                default:
 594                                        /* some invalid state passive shutdown
 595                                         * was failed, we try to reset and
 596                                         * hope it will go on.
 597                                         */
 598                                        log_print("reset node %d because shutdown stuck",
 599                                                  node->nodeid);
 600
 601                                        midcomms_node_reset(node);
 602                                        node->state = DLM_ESTABLISHED;
 603                                        break;
 604                                }
 605                                spin_unlock(&node->state_lock);
 606                        }
 607
 608                        allocation = GFP_NOFS;
 609                        break;
 610                default:
 611                        break;
 612                }
 613
 614                break;
 615        default:
 616                break;
 617        }
 618
 619        node = nodeid2node(nodeid, allocation);
 620        if (!node) {
 621                switch (p->header.h_cmd) {
 622                case DLM_OPTS:
 623                        if (msglen < sizeof(struct dlm_opts)) {
 624                                log_print("opts msg too small: %u, will skip this message from node %d",
 625                                          msglen, nodeid);
 626                                return NULL;
 627                        }
 628
 629                        log_print_ratelimited("received dlm opts message nextcmd %d from node %d in an invalid sequence",
 630                                              p->opts.o_nextcmd, nodeid);
 631                        break;
 632                default:
 633                        log_print_ratelimited("received dlm message cmd %d from node %d in an invalid sequence",
 634                                              p->header.h_cmd, nodeid);
 635                        break;
 636                }
 637
 638                return NULL;
 639        }
 640
 641        ret = cb(node);
 642        if (ret < 0)
 643                return NULL;
 644
 645        return node;
 646}
 647
 648static int dlm_midcomms_version_check_3_2(struct midcomms_node *node)
 649{
 650        switch (node->version) {
 651        case DLM_VERSION_NOT_SET:
 652                node->version = DLM_VERSION_3_2;
 653                log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
 654                          node->nodeid);
 655                break;
 656        case DLM_VERSION_3_2:
 657                break;
 658        default:
 659                log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
 660                                      DLM_VERSION_3_2, node->nodeid, node->version);
 661                return -1;
 662        }
 663
 664        return 0;
 665}
 666
 667static int dlm_opts_check_msglen(union dlm_packet *p, uint16_t msglen, int nodeid)
 668{
 669        int len = msglen;
 670
 671        /* we only trust outer header msglen because
 672         * it's checked against receive buffer length.
 673         */
 674        if (len < sizeof(struct dlm_opts))
 675                return -1;
 676        len -= sizeof(struct dlm_opts);
 677
 678        if (len < le16_to_cpu(p->opts.o_optlen))
 679                return -1;
 680        len -= le16_to_cpu(p->opts.o_optlen);
 681
 682        switch (p->opts.o_nextcmd) {
 683        case DLM_FIN:
 684                if (len < sizeof(struct dlm_header)) {
 685                        log_print("fin too small: %d, will skip this message from node %d",
 686                                  len, nodeid);
 687                        return -1;
 688                }
 689
 690                break;
 691        case DLM_MSG:
 692                if (len < sizeof(struct dlm_message)) {
 693                        log_print("msg too small: %d, will skip this message from node %d",
 694                                  msglen, nodeid);
 695                        return -1;
 696                }
 697
 698                break;
 699        case DLM_RCOM:
 700                if (len < sizeof(struct dlm_rcom)) {
 701                        log_print("rcom msg too small: %d, will skip this message from node %d",
 702                                  len, nodeid);
 703                        return -1;
 704                }
 705
 706                break;
 707        default:
 708                log_print("unsupported o_nextcmd received: %u, will skip this message from node %d",
 709                          p->opts.o_nextcmd, nodeid);
 710                return -1;
 711        }
 712
 713        return 0;
 714}
 715
 716static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid)
 717{
 718        uint16_t msglen = le16_to_cpu(p->header.h_length);
 719        struct midcomms_node *node;
 720        uint32_t seq;
 721        int ret, idx;
 722
 723        idx = srcu_read_lock(&nodes_srcu);
 724        node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
 725                                             dlm_midcomms_version_check_3_2);
 726        if (!node)
 727                goto out;
 728
 729        switch (p->header.h_cmd) {
 730        case DLM_RCOM:
 731                /* these rcom message we use to determine version.
 732                 * they have their own retransmission handling and
 733                 * are the first messages of dlm.
 734                 *
 735                 * length already checked.
 736                 */
 737                switch (le32_to_cpu(p->rcom.rc_type)) {
 738                case DLM_RCOM_NAMES:
 739                        fallthrough;
 740                case DLM_RCOM_NAMES_REPLY:
 741                        fallthrough;
 742                case DLM_RCOM_STATUS:
 743                        fallthrough;
 744                case DLM_RCOM_STATUS_REPLY:
 745                        break;
 746                default:
 747                        log_print("unsupported rcom type received: %u, will skip this message from node %d",
 748                                  le32_to_cpu(p->rcom.rc_type), nodeid);
 749                        goto out;
 750                }
 751
 752                WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
 753                dlm_receive_buffer(p, nodeid);
 754                break;
 755        case DLM_OPTS:
 756                seq = le32_to_cpu(p->header.u.h_seq);
 757
 758                ret = dlm_opts_check_msglen(p, msglen, nodeid);
 759                if (ret < 0) {
 760                        log_print("opts msg too small: %u, will skip this message from node %d",
 761                                  msglen, nodeid);
 762                        goto out;
 763                }
 764
 765                p = (union dlm_packet *)((unsigned char *)p->opts.o_opts +
 766                                         le16_to_cpu(p->opts.o_optlen));
 767
 768                /* recheck inner msglen just if it's not garbage */
 769                msglen = le16_to_cpu(p->header.h_length);
 770                switch (p->header.h_cmd) {
 771                case DLM_RCOM:
 772                        if (msglen < sizeof(struct dlm_rcom)) {
 773                                log_print("inner rcom msg too small: %u, will skip this message from node %d",
 774                                          msglen, nodeid);
 775                                goto out;
 776                        }
 777
 778                        break;
 779                case DLM_MSG:
 780                        if (msglen < sizeof(struct dlm_message)) {
 781                                log_print("inner msg too small: %u, will skip this message from node %d",
 782                                          msglen, nodeid);
 783                                goto out;
 784                        }
 785
 786                        break;
 787                case DLM_FIN:
 788                        if (msglen < sizeof(struct dlm_header)) {
 789                                log_print("inner fin too small: %u, will skip this message from node %d",
 790                                          msglen, nodeid);
 791                                goto out;
 792                        }
 793
 794                        break;
 795                default:
 796                        log_print("unsupported inner h_cmd received: %u, will skip this message from node %d",
 797                                  msglen, nodeid);
 798                        goto out;
 799                }
 800
 801                dlm_midcomms_receive_buffer(p, node, seq);
 802                break;
 803        case DLM_ACK:
 804                seq = le32_to_cpu(p->header.u.h_seq);
 805                dlm_receive_ack(node, seq);
 806                break;
 807        default:
 808                log_print("unsupported h_cmd received: %u, will skip this message from node %d",
 809                          p->header.h_cmd, nodeid);
 810                break;
 811        }
 812
 813out:
 814        srcu_read_unlock(&nodes_srcu, idx);
 815}
 816
 817static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
 818{
 819        switch (node->version) {
 820        case DLM_VERSION_NOT_SET:
 821                node->version = DLM_VERSION_3_1;
 822                log_print("version 0x%08x for node %d detected", DLM_VERSION_3_1,
 823                          node->nodeid);
 824                break;
 825        case DLM_VERSION_3_1:
 826                break;
 827        default:
 828                log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
 829                                      DLM_VERSION_3_1, node->nodeid, node->version);
 830                return -1;
 831        }
 832
 833        return 0;
 834}
 835
 836static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid)
 837{
 838        uint16_t msglen = le16_to_cpu(p->header.h_length);
 839        struct midcomms_node *node;
 840        int idx;
 841
 842        idx = srcu_read_lock(&nodes_srcu);
 843        node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
 844                                             dlm_midcomms_version_check_3_1);
 845        if (!node) {
 846                srcu_read_unlock(&nodes_srcu, idx);
 847                return;
 848        }
 849        srcu_read_unlock(&nodes_srcu, idx);
 850
 851        switch (p->header.h_cmd) {
 852        case DLM_RCOM:
 853                /* length already checked */
 854                break;
 855        case DLM_MSG:
 856                if (msglen < sizeof(struct dlm_message)) {
 857                        log_print("msg too small: %u, will skip this message from node %d",
 858                                  msglen, nodeid);
 859                        return;
 860                }
 861
 862                break;
 863        default:
 864                log_print("unsupported h_cmd received: %u, will skip this message from node %d",
 865                          p->header.h_cmd, nodeid);
 866                return;
 867        }
 868
 869        dlm_receive_buffer(p, nodeid);
 870}
 871
 872/*
 873 * Called from the low-level comms layer to process a buffer of
 874 * commands.
 875 */
 876
 877int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
 878{
 879        const unsigned char *ptr = buf;
 880        const struct dlm_header *hd;
 881        uint16_t msglen;
 882        int ret = 0;
 883
 884        while (len >= sizeof(struct dlm_header)) {
 885                hd = (struct dlm_header *)ptr;
 886
 887                /* no message should be more than DLM_MAX_SOCKET_BUFSIZE or
 888                 * less than dlm_header size.
 889                 *
 890                 * Some messages does not have a 8 byte length boundary yet
 891                 * which can occur in a unaligned memory access of some dlm
 892                 * messages. However this problem need to be fixed at the
 893                 * sending side, for now it seems nobody run into architecture
 894                 * related issues yet but it slows down some processing.
 895                 * Fixing this issue should be scheduled in future by doing
 896                 * the next major version bump.
 897                 */
 898                msglen = le16_to_cpu(hd->h_length);
 899                if (msglen > DLM_MAX_SOCKET_BUFSIZE ||
 900                    msglen < sizeof(struct dlm_header)) {
 901                        log_print("received invalid length header: %u from node %d, will abort message parsing",
 902                                  msglen, nodeid);
 903                        return -EBADMSG;
 904                }
 905
 906                /* caller will take care that leftover
 907                 * will be parsed next call with more data
 908                 */
 909                if (msglen > len)
 910                        break;
 911
 912                switch (le32_to_cpu(hd->h_version)) {
 913                case DLM_VERSION_3_1:
 914                        dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid);
 915                        break;
 916                case DLM_VERSION_3_2:
 917                        dlm_midcomms_receive_buffer_3_2((union dlm_packet *)ptr, nodeid);
 918                        break;
 919                default:
 920                        log_print("received invalid version header: %u from node %d, will skip this message",
 921                                  le32_to_cpu(hd->h_version), nodeid);
 922                        break;
 923                }
 924
 925                ret += msglen;
 926                len -= msglen;
 927                ptr += msglen;
 928        }
 929
 930        return ret;
 931}
 932
 933void dlm_midcomms_receive_done(int nodeid)
 934{
 935        struct midcomms_node *node;
 936        int idx;
 937
 938        idx = srcu_read_lock(&nodes_srcu);
 939        node = nodeid2node(nodeid, 0);
 940        if (!node) {
 941                srcu_read_unlock(&nodes_srcu, idx);
 942                return;
 943        }
 944
 945        /* old protocol, we do nothing */
 946        switch (node->version) {
 947        case DLM_VERSION_3_2:
 948                break;
 949        default:
 950                srcu_read_unlock(&nodes_srcu, idx);
 951                return;
 952        }
 953
 954        /* do nothing if we didn't delivered stateful to ulp */
 955        if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
 956                                &node->flags)) {
 957                srcu_read_unlock(&nodes_srcu, idx);
 958                return;
 959        }
 960
 961        spin_lock(&node->state_lock);
 962        /* we only ack if state is ESTABLISHED */
 963        switch (node->state) {
 964        case DLM_ESTABLISHED:
 965                spin_unlock(&node->state_lock);
 966                dlm_send_ack(node->nodeid, node->seq_next);
 967                break;
 968        default:
 969                spin_unlock(&node->state_lock);
 970                /* do nothing FIN has it's own ack send */
 971                break;
 972        };
 973        srcu_read_unlock(&nodes_srcu, idx);
 974}
 975
 976void dlm_midcomms_unack_msg_resend(int nodeid)
 977{
 978        struct midcomms_node *node;
 979        struct dlm_mhandle *mh;
 980        int idx, ret;
 981
 982        idx = srcu_read_lock(&nodes_srcu);
 983        node = nodeid2node(nodeid, 0);
 984        if (!node) {
 985                srcu_read_unlock(&nodes_srcu, idx);
 986                return;
 987        }
 988
 989        /* old protocol, we don't support to retransmit on failure */
 990        switch (node->version) {
 991        case DLM_VERSION_3_2:
 992                break;
 993        default:
 994                srcu_read_unlock(&nodes_srcu, idx);
 995                return;
 996        }
 997
 998        rcu_read_lock();
 999        list_for_each_entry_rcu(mh, &node->send_queue, list) {
1000                if (!mh->committed)
1001                        continue;
1002
1003                ret = dlm_lowcomms_resend_msg(mh->msg);
1004                if (!ret)
1005                        log_print_ratelimited("retransmit dlm msg, seq %u, nodeid %d",
1006                                              mh->seq, node->nodeid);
1007        }
1008        rcu_read_unlock();
1009        srcu_read_unlock(&nodes_srcu, idx);
1010}
1011
1012static void dlm_fill_opts_header(struct dlm_opts *opts, uint16_t inner_len,
1013                                 uint32_t seq)
1014{
1015        opts->o_header.h_cmd = DLM_OPTS;
1016        opts->o_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
1017        opts->o_header.h_nodeid = dlm_our_nodeid();
1018        opts->o_header.h_length = DLM_MIDCOMMS_OPT_LEN + inner_len;
1019        opts->o_header.u.h_seq = seq;
1020        header_out(&opts->o_header);
1021}
1022
1023static void midcomms_new_msg_cb(struct dlm_mhandle *mh)
1024{
1025        atomic_inc(&mh->node->send_queue_cnt);
1026
1027        spin_lock(&mh->node->send_queue_lock);
1028        list_add_tail_rcu(&mh->list, &mh->node->send_queue);
1029        spin_unlock(&mh->node->send_queue_lock);
1030
1031        mh->seq = mh->node->seq_send++;
1032}
1033
1034static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid,
1035                                                int len, gfp_t allocation, char **ppc)
1036{
1037        struct dlm_opts *opts;
1038        struct dlm_msg *msg;
1039
1040        msg = dlm_lowcomms_new_msg(nodeid, len + DLM_MIDCOMMS_OPT_LEN,
1041                                   allocation, ppc, midcomms_new_msg_cb, mh);
1042        if (!msg)
1043                return NULL;
1044
1045        opts = (struct dlm_opts *)*ppc;
1046        mh->opts = opts;
1047
1048        /* add possible options here */
1049        dlm_fill_opts_header(opts, len, mh->seq);
1050
1051        *ppc += sizeof(*opts);
1052        mh->inner_hd = (const struct dlm_header *)*ppc;
1053        return msg;
1054}
1055
1056struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
1057                                             gfp_t allocation, char **ppc)
1058{
1059        struct midcomms_node *node;
1060        struct dlm_mhandle *mh;
1061        struct dlm_msg *msg;
1062        int idx;
1063
1064        idx = srcu_read_lock(&nodes_srcu);
1065        node = nodeid2node(nodeid, 0);
1066        if (!node) {
1067                WARN_ON_ONCE(1);
1068                goto err;
1069        }
1070
1071        /* this is a bug, however we going on and hope it will be resolved */
1072        WARN_ON(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
1073
1074        mh = kzalloc(sizeof(*mh), GFP_NOFS);
1075        if (!mh)
1076                goto err;
1077
1078        mh->idx = idx;
1079        mh->node = node;
1080
1081        switch (node->version) {
1082        case DLM_VERSION_3_1:
1083                msg = dlm_lowcomms_new_msg(nodeid, len, allocation, ppc,
1084                                           NULL, NULL);
1085                if (!msg) {
1086                        kfree(mh);
1087                        goto err;
1088                }
1089
1090                break;
1091        case DLM_VERSION_3_2:
1092                msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation,
1093                                               ppc);
1094                if (!msg) {
1095                        kfree(mh);
1096                        goto err;
1097                }
1098
1099                break;
1100        default:
1101                kfree(mh);
1102                WARN_ON(1);
1103                goto err;
1104        }
1105
1106        mh->msg = msg;
1107
1108        /* keep in mind that is a must to call
1109         * dlm_midcomms_commit_msg() which releases
1110         * nodes_srcu using mh->idx which is assumed
1111         * here that the application will call it.
1112         */
1113        return mh;
1114
1115err:
1116        srcu_read_unlock(&nodes_srcu, idx);
1117        return NULL;
1118}
1119
1120static void dlm_midcomms_commit_msg_3_2(struct dlm_mhandle *mh)
1121{
1122        /* nexthdr chain for fast lookup */
1123        mh->opts->o_nextcmd = mh->inner_hd->h_cmd;
1124        mh->committed = true;
1125        dlm_lowcomms_commit_msg(mh->msg);
1126}
1127
1128void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh)
1129{
1130        switch (mh->node->version) {
1131        case DLM_VERSION_3_1:
1132                srcu_read_unlock(&nodes_srcu, mh->idx);
1133
1134                dlm_lowcomms_commit_msg(mh->msg);
1135                dlm_lowcomms_put_msg(mh->msg);
1136                /* mh is not part of rcu list in this case */
1137                kfree(mh);
1138                break;
1139        case DLM_VERSION_3_2:
1140                dlm_midcomms_commit_msg_3_2(mh);
1141                srcu_read_unlock(&nodes_srcu, mh->idx);
1142                break;
1143        default:
1144                srcu_read_unlock(&nodes_srcu, mh->idx);
1145                WARN_ON(1);
1146                break;
1147        }
1148}
1149
1150int dlm_midcomms_start(void)
1151{
1152        int i;
1153
1154        for (i = 0; i < CONN_HASH_SIZE; i++)
1155                INIT_HLIST_HEAD(&node_hash[i]);
1156
1157        return dlm_lowcomms_start();
1158}
1159
1160static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
1161{
1162        spin_lock(&node->state_lock);
1163        pr_debug("receive active fin ack from node %d with state %s\n",
1164                 node->nodeid, dlm_state_str(node->state));
1165
1166        switch (node->state) {
1167        case DLM_FIN_WAIT1:
1168                node->state = DLM_FIN_WAIT2;
1169                pr_debug("switch node %d to state %s\n",
1170                         node->nodeid, dlm_state_str(node->state));
1171                break;
1172        case DLM_CLOSING:
1173                midcomms_node_reset(node);
1174                pr_debug("switch node %d to state %s\n",
1175                         node->nodeid, dlm_state_str(node->state));
1176                wake_up(&node->shutdown_wait);
1177                break;
1178        case DLM_CLOSED:
1179                /* not valid but somehow we got what we want */
1180                wake_up(&node->shutdown_wait);
1181                break;
1182        default:
1183                spin_unlock(&node->state_lock);
1184                log_print("%s: unexpected state: %d\n",
1185                          __func__, node->state);
1186                WARN_ON(1);
1187                return;
1188        }
1189        spin_unlock(&node->state_lock);
1190}
1191
1192void dlm_midcomms_add_member(int nodeid)
1193{
1194        struct midcomms_node *node;
1195        int idx;
1196
1197        if (nodeid == dlm_our_nodeid())
1198                return;
1199
1200        idx = srcu_read_lock(&nodes_srcu);
1201        node = nodeid2node(nodeid, GFP_NOFS);
1202        if (!node) {
1203                srcu_read_unlock(&nodes_srcu, idx);
1204                return;
1205        }
1206
1207        spin_lock(&node->state_lock);
1208        if (!node->users) {
1209                pr_debug("receive add member from node %d with state %s\n",
1210                         node->nodeid, dlm_state_str(node->state));
1211                switch (node->state) {
1212                case DLM_ESTABLISHED:
1213                        break;
1214                case DLM_CLOSED:
1215                        node->state = DLM_ESTABLISHED;
1216                        pr_debug("switch node %d to state %s\n",
1217                                 node->nodeid, dlm_state_str(node->state));
1218                        break;
1219                default:
1220                        /* some invalid state passive shutdown
1221                         * was failed, we try to reset and
1222                         * hope it will go on.
1223                         */
1224                        log_print("reset node %d because shutdown stuck",
1225                                  node->nodeid);
1226
1227                        midcomms_node_reset(node);
1228                        node->state = DLM_ESTABLISHED;
1229                        break;
1230                }
1231        }
1232
1233        node->users++;
1234        pr_debug("users inc count %d\n", node->users);
1235        spin_unlock(&node->state_lock);
1236
1237        srcu_read_unlock(&nodes_srcu, idx);
1238}
1239
1240void dlm_midcomms_remove_member(int nodeid)
1241{
1242        struct midcomms_node *node;
1243        int idx;
1244
1245        if (nodeid == dlm_our_nodeid())
1246                return;
1247
1248        idx = srcu_read_lock(&nodes_srcu);
1249        node = nodeid2node(nodeid, 0);
1250        if (!node) {
1251                srcu_read_unlock(&nodes_srcu, idx);
1252                return;
1253        }
1254
1255        spin_lock(&node->state_lock);
1256        node->users--;
1257        pr_debug("users dec count %d\n", node->users);
1258
1259        /* hitting users count to zero means the
1260         * other side is running dlm_midcomms_stop()
1261         * we meet us to have a clean disconnect.
1262         */
1263        if (node->users == 0) {
1264                pr_debug("receive remove member from node %d with state %s\n",
1265                         node->nodeid, dlm_state_str(node->state));
1266                switch (node->state) {
1267                case DLM_ESTABLISHED:
1268                        break;
1269                case DLM_CLOSE_WAIT:
1270                        /* passive shutdown DLM_LAST_ACK case 2 */
1271                        node->state = DLM_LAST_ACK;
1272                        spin_unlock(&node->state_lock);
1273
1274                        pr_debug("switch node %d to state %s case 2\n",
1275                                 node->nodeid, dlm_state_str(node->state));
1276                        goto send_fin;
1277                case DLM_LAST_ACK:
1278                        /* probably receive fin caught it, do nothing */
1279                        break;
1280                case DLM_CLOSED:
1281                        /* already gone, do nothing */
1282                        break;
1283                default:
1284                        log_print("%s: unexpected state: %d\n",
1285                                  __func__, node->state);
1286                        break;
1287                }
1288        }
1289        spin_unlock(&node->state_lock);
1290
1291        srcu_read_unlock(&nodes_srcu, idx);
1292        return;
1293
1294send_fin:
1295        set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
1296        dlm_send_fin(node, dlm_pas_fin_ack_rcv);
1297        srcu_read_unlock(&nodes_srcu, idx);
1298}
1299
1300static void midcomms_node_release(struct rcu_head *rcu)
1301{
1302        struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
1303
1304        WARN_ON(atomic_read(&node->send_queue_cnt));
1305        kfree(node);
1306}
1307
1308static void midcomms_shutdown(struct midcomms_node *node)
1309{
1310        int ret;
1311
1312        /* old protocol, we don't wait for pending operations */
1313        switch (node->version) {
1314        case DLM_VERSION_3_2:
1315                break;
1316        default:
1317                return;
1318        }
1319
1320        spin_lock(&node->state_lock);
1321        pr_debug("receive active shutdown for node %d with state %s\n",
1322                 node->nodeid, dlm_state_str(node->state));
1323        switch (node->state) {
1324        case DLM_ESTABLISHED:
1325                node->state = DLM_FIN_WAIT1;
1326                pr_debug("switch node %d to state %s case 2\n",
1327                         node->nodeid, dlm_state_str(node->state));
1328                break;
1329        case DLM_CLOSED:
1330                /* we have what we want */
1331                spin_unlock(&node->state_lock);
1332                return;
1333        default:
1334                /* busy to enter DLM_FIN_WAIT1, wait until passive
1335                 * done in shutdown_wait to enter DLM_CLOSED.
1336                 */
1337                break;
1338        }
1339        spin_unlock(&node->state_lock);
1340
1341        if (node->state == DLM_FIN_WAIT1) {
1342                dlm_send_fin(node, dlm_act_fin_ack_rcv);
1343
1344                if (DLM_DEBUG_FENCE_TERMINATION)
1345                        msleep(5000);
1346        }
1347
1348        /* wait for other side dlm + fin */
1349        ret = wait_event_timeout(node->shutdown_wait,
1350                                 node->state == DLM_CLOSED ||
1351                                 test_bit(DLM_NODE_FLAG_CLOSE, &node->flags),
1352                                 DLM_SHUTDOWN_TIMEOUT);
1353        if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags)) {
1354                pr_debug("active shutdown timed out for node %d with state %s\n",
1355                         node->nodeid, dlm_state_str(node->state));
1356                midcomms_node_reset(node);
1357                return;
1358        }
1359
1360        pr_debug("active shutdown done for node %d with state %s\n",
1361                 node->nodeid, dlm_state_str(node->state));
1362}
1363
1364void dlm_midcomms_shutdown(void)
1365{
1366        struct midcomms_node *node;
1367        int i, idx;
1368
1369        mutex_lock(&close_lock);
1370        idx = srcu_read_lock(&nodes_srcu);
1371        for (i = 0; i < CONN_HASH_SIZE; i++) {
1372                hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
1373                        midcomms_shutdown(node);
1374
1375                        dlm_delete_debug_comms_file(node->debugfs);
1376
1377                        spin_lock(&nodes_lock);
1378                        hlist_del_rcu(&node->hlist);
1379                        spin_unlock(&nodes_lock);
1380
1381                        call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
1382                }
1383        }
1384        srcu_read_unlock(&nodes_srcu, idx);
1385        mutex_unlock(&close_lock);
1386
1387        dlm_lowcomms_shutdown();
1388}
1389
1390int dlm_midcomms_close(int nodeid)
1391{
1392        struct midcomms_node *node;
1393        int idx, ret;
1394
1395        if (nodeid == dlm_our_nodeid())
1396                return 0;
1397
1398        idx = srcu_read_lock(&nodes_srcu);
1399        /* Abort pending close/remove operation */
1400        node = nodeid2node(nodeid, 0);
1401        if (node) {
1402                /* let shutdown waiters leave */
1403                set_bit(DLM_NODE_FLAG_CLOSE, &node->flags);
1404                wake_up(&node->shutdown_wait);
1405        }
1406        srcu_read_unlock(&nodes_srcu, idx);
1407
1408        synchronize_srcu(&nodes_srcu);
1409
1410        idx = srcu_read_lock(&nodes_srcu);
1411        mutex_lock(&close_lock);
1412        node = nodeid2node(nodeid, 0);
1413        if (!node) {
1414                mutex_unlock(&close_lock);
1415                srcu_read_unlock(&nodes_srcu, idx);
1416                return dlm_lowcomms_close(nodeid);
1417        }
1418
1419        ret = dlm_lowcomms_close(nodeid);
1420        spin_lock(&node->state_lock);
1421        midcomms_node_reset(node);
1422        spin_unlock(&node->state_lock);
1423        srcu_read_unlock(&nodes_srcu, idx);
1424        mutex_unlock(&close_lock);
1425
1426        return ret;
1427}
1428