linux/fs/dlm/midcomms.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6**  Copyright (C) 2004-2021 Red Hat, Inc.  All rights reserved.
   7**
   8**
   9*******************************************************************************
  10******************************************************************************/
  11
  12/*
  13 * midcomms.c
  14 *
  15 * This is the appallingly named "mid-level" comms layer. It takes care about
  16 * deliver an on application layer "reliable" communication above the used
  17 * lowcomms transport layer.
  18 *
  19 * How it works:
  20 *
  21 * Each nodes keeps track of all send DLM messages in send_queue with a sequence
  22 * number. The receive will send an DLM_ACK message back for every DLM message
  23 * received at the other side. If a reconnect happens in lowcomms we will send
  24 * all unacknowledged dlm messages again. The receiving side might drop any already
  25 * received message by comparing sequence numbers.
  26 *
  27 * How version detection works:
  28 *
  29 * Due the fact that dlm has pre-configured node addresses on every side
  30 * it is in it's nature that every side connects at starts to transmit
  31 * dlm messages which ends in a race. However DLM_RCOM_NAMES, DLM_RCOM_STATUS
  32 * and their replies are the first messages which are exchanges. Due backwards
  33 * compatibility these messages are not covered by the midcomms re-transmission
  34 * layer. These messages have their own re-transmission handling in the dlm
  35 * application layer. The version field of every node will be set on these RCOM
  36 * messages as soon as they arrived and the node isn't yet part of the nodes
  37 * hash. There exists also logic to detect version mismatched if something weird
  38 * going on or the first messages isn't an expected one.
  39 *
  40 * Termination:
  41 *
  42 * The midcomms layer does a 4 way handshake for termination on DLM protocol
  43 * like TCP supports it with half-closed socket support. SCTP doesn't support
  44 * half-closed socket, so we do it on DLM layer. Also socket shutdown() can be
  45 * interrupted by .e.g. tcp reset itself. Additional there exists the othercon
  46 * paradigm in lowcomms which cannot be easily without breaking backwards
  47 * compatibility. A node cannot send anything to another node when a DLM_FIN
  48 * message was send. There exists additional logic to print a warning if
  49 * DLM wants to do it. There exists a state handling like RFC 793 but reduced
  50 * to termination only. The event "member removal event" describes the cluster
  51 * manager removed the node from internal lists, at this point DLM does not
  52 * send any message to the other node. There exists two cases:
  53 *
  54 * 1. The cluster member was removed and we received a FIN
  55 * OR
  56 * 2. We received a FIN but the member was not removed yet
  57 *
  58 * One of these cases will do the CLOSE_WAIT to LAST_ACK change.
  59 *
  60 *
  61 *                              +---------+
  62 *                              | CLOSED  |
  63 *                              +---------+
  64 *                                   | add member/receive RCOM version
  65 *                                   |            detection msg
  66 *                                   V
  67 *                              +---------+
  68 *                              |  ESTAB  |
  69 *                              +---------+
  70 *                       CLOSE    |     |    rcv FIN
  71 *                      -------   |     |    -------
  72 * +---------+          snd FIN  /       \   snd ACK          +---------+
  73 * |  FIN    |<-----------------           ------------------>|  CLOSE  |
  74 * | WAIT-1  |------------------                              |   WAIT  |
  75 * +---------+          rcv FIN  \                            +---------+
  76 * | rcv ACK of FIN   -------   |                            CLOSE  | member
  77 * | --------------   snd ACK   |                           ------- | removal
  78 * V        x                   V                           snd FIN V event
  79 * +---------+                  +---------+                   +---------+
  80 * |FINWAIT-2|                  | CLOSING |                   | LAST-ACK|
  81 * +---------+                  +---------+                   +---------+
  82 * |                rcv ACK of FIN |                 rcv ACK of FIN |
  83 * |  rcv FIN       -------------- |                 -------------- |
  84 * |  -------              x       V                        x       V
  85 *  \ snd ACK                 +---------+                   +---------+
  86 *   ------------------------>| CLOSED  |                   | CLOSED  |
  87 *                            +---------+                   +---------+
  88 *
  89 * NOTE: any state can interrupted by midcomms_close() and state will be
  90 * switched to CLOSED in case of fencing. There exists also some timeout
  91 * handling when we receive the version detection RCOM messages which is
  92 * made by observation.
  93 *
  94 * Future improvements:
  95 *
  96 * There exists some known issues/improvements of the dlm handling. Some
  97 * of them should be done in a next major dlm version bump which makes
  98 * it incompatible with previous versions.
  99 *
 100 * Unaligned memory access:
 101 *
 102 * There exists cases when the dlm message buffer length is not aligned
 103 * to 8 byte. However seems nobody detected any problem with it. This
 104 * can be fixed in the next major version bump of dlm.
 105 *
 106 * Version detection:
 107 *
 108 * The version detection and how it's done is related to backwards
 109 * compatibility. There exists better ways to make a better handling.
 110 * However this should be changed in the next major version bump of dlm.
 111 *
 112 * Tail Size checking:
 113 *
 114 * There exists a message tail payload in e.g. DLM_MSG however we don't
 115 * check it against the message length yet regarding to the receive buffer
 116 * length. That need to be validated.
 117 *
 118 * Fencing bad nodes:
 119 *
 120 * At timeout places or weird sequence number behaviours we should send
 121 * a fencing request to the cluster manager.
 122 */
 123
 124/* Debug switch to enable a 5 seconds sleep waiting of a termination.
 125 * This can be useful to test fencing while termination is running.
 126 * This requires a setup with only gfs2 as dlm user, so that the
 127 * last umount will terminate the connection.
 128 *
 129 * However it became useful to test, while the 5 seconds block in umount
 130 * just press the reset button. In a lot of dropping the termination
 131 * process can could take several seconds.
 132 */
 133#define DLM_DEBUG_FENCE_TERMINATION     0
 134
 135#include <net/tcp.h>
 136
 137#include "dlm_internal.h"
 138#include "lowcomms.h"
 139#include "config.h"
 140#include "memory.h"
 141#include "lock.h"
 142#include "util.h"
 143#include "midcomms.h"
 144
 145/* init value for sequence numbers for testing purpose only e.g. overflows */
 146#define DLM_SEQ_INIT            0
 147/* 3 minutes wait to sync ending of dlm */
 148#define DLM_SHUTDOWN_TIMEOUT    msecs_to_jiffies(3 * 60 * 1000)
 149#define DLM_VERSION_NOT_SET     0
 150
 151struct midcomms_node {
 152        int nodeid;
 153        uint32_t version;
 154        uint32_t seq_send;
 155        uint32_t seq_next;
 156        /* These queues are unbound because we cannot drop any message in dlm.
 157         * We could send a fence signal for a specific node to the cluster
 158         * manager if queues hits some maximum value, however this handling
 159         * not supported yet.
 160         */
 161        struct list_head send_queue;
 162        spinlock_t send_queue_lock;
 163        atomic_t send_queue_cnt;
 164#define DLM_NODE_FLAG_CLOSE     1
 165#define DLM_NODE_FLAG_STOP_TX   2
 166#define DLM_NODE_FLAG_STOP_RX   3
 167#define DLM_NODE_ULP_DELIVERED  4
 168        unsigned long flags;
 169        wait_queue_head_t shutdown_wait;
 170
 171        /* dlm tcp termination state */
 172#define DLM_CLOSED      1
 173#define DLM_ESTABLISHED 2
 174#define DLM_FIN_WAIT1   3
 175#define DLM_FIN_WAIT2   4
 176#define DLM_CLOSE_WAIT  5
 177#define DLM_LAST_ACK    6
 178#define DLM_CLOSING     7
 179        int state;
 180        spinlock_t state_lock;
 181
 182        /* counts how many lockspaces are using this node
 183         * this refcount is necessary to determine if the
 184         * node wants to disconnect.
 185         */
 186        int users;
 187
 188        /* not protected by srcu, node_hash lifetime */
 189        void *debugfs;
 190
 191        struct hlist_node hlist;
 192        struct rcu_head rcu;
 193};
 194
 195struct dlm_mhandle {
 196        const struct dlm_header *inner_hd;
 197        struct midcomms_node *node;
 198        struct dlm_opts *opts;
 199        struct dlm_msg *msg;
 200        bool committed;
 201        uint32_t seq;
 202
 203        void (*ack_rcv)(struct midcomms_node *node);
 204
 205        /* get_mhandle/commit srcu idx exchange */
 206        int idx;
 207
 208        struct list_head list;
 209        struct rcu_head rcu;
 210};
 211
 212static struct hlist_head node_hash[CONN_HASH_SIZE];
 213static DEFINE_SPINLOCK(nodes_lock);
 214DEFINE_STATIC_SRCU(nodes_srcu);
 215
 216/* This mutex prevents that midcomms_close() is running while
 217 * stop() or remove(). As I experienced invalid memory access
 218 * behaviours when DLM_DEBUG_FENCE_TERMINATION is enabled and
 219 * resetting machines. I will end in some double deletion in nodes
 220 * datastructure.
 221 */
 222static DEFINE_MUTEX(close_lock);
 223
 224struct kmem_cache *dlm_midcomms_cache_create(void)
 225{
 226        return kmem_cache_create("dlm_mhandle", sizeof(struct dlm_mhandle),
 227                                 0, 0, NULL);
 228}
 229
 230static inline const char *dlm_state_str(int state)
 231{
 232        switch (state) {
 233        case DLM_CLOSED:
 234                return "CLOSED";
 235        case DLM_ESTABLISHED:
 236                return "ESTABLISHED";
 237        case DLM_FIN_WAIT1:
 238                return "FIN_WAIT1";
 239        case DLM_FIN_WAIT2:
 240                return "FIN_WAIT2";
 241        case DLM_CLOSE_WAIT:
 242                return "CLOSE_WAIT";
 243        case DLM_LAST_ACK:
 244                return "LAST_ACK";
 245        case DLM_CLOSING:
 246                return "CLOSING";
 247        default:
 248                return "UNKNOWN";
 249        }
 250}
 251
 252const char *dlm_midcomms_state(struct midcomms_node *node)
 253{
 254        return dlm_state_str(node->state);
 255}
 256
 257unsigned long dlm_midcomms_flags(struct midcomms_node *node)
 258{
 259        return node->flags;
 260}
 261
 262int dlm_midcomms_send_queue_cnt(struct midcomms_node *node)
 263{
 264        return atomic_read(&node->send_queue_cnt);
 265}
 266
 267uint32_t dlm_midcomms_version(struct midcomms_node *node)
 268{
 269        return node->version;
 270}
 271
 272static struct midcomms_node *__find_node(int nodeid, int r)
 273{
 274        struct midcomms_node *node;
 275
 276        hlist_for_each_entry_rcu(node, &node_hash[r], hlist) {
 277                if (node->nodeid == nodeid)
 278                        return node;
 279        }
 280
 281        return NULL;
 282}
 283
 284static void dlm_mhandle_release(struct rcu_head *rcu)
 285{
 286        struct dlm_mhandle *mh = container_of(rcu, struct dlm_mhandle, rcu);
 287
 288        dlm_lowcomms_put_msg(mh->msg);
 289        dlm_free_mhandle(mh);
 290}
 291
 292static void dlm_mhandle_delete(struct midcomms_node *node,
 293                               struct dlm_mhandle *mh)
 294{
 295        list_del_rcu(&mh->list);
 296        atomic_dec(&node->send_queue_cnt);
 297        call_rcu(&mh->rcu, dlm_mhandle_release);
 298}
 299
 300static void dlm_send_queue_flush(struct midcomms_node *node)
 301{
 302        struct dlm_mhandle *mh;
 303
 304        pr_debug("flush midcomms send queue of node %d\n", node->nodeid);
 305
 306        rcu_read_lock();
 307        spin_lock(&node->send_queue_lock);
 308        list_for_each_entry_rcu(mh, &node->send_queue, list) {
 309                dlm_mhandle_delete(node, mh);
 310        }
 311        spin_unlock(&node->send_queue_lock);
 312        rcu_read_unlock();
 313}
 314
 315static void midcomms_node_reset(struct midcomms_node *node)
 316{
 317        pr_debug("reset node %d\n", node->nodeid);
 318
 319        node->seq_next = DLM_SEQ_INIT;
 320        node->seq_send = DLM_SEQ_INIT;
 321        node->version = DLM_VERSION_NOT_SET;
 322        node->flags = 0;
 323
 324        dlm_send_queue_flush(node);
 325        node->state = DLM_CLOSED;
 326        wake_up(&node->shutdown_wait);
 327}
 328
 329static struct midcomms_node *nodeid2node(int nodeid, gfp_t alloc)
 330{
 331        struct midcomms_node *node, *tmp;
 332        int r = nodeid_hash(nodeid);
 333
 334        node = __find_node(nodeid, r);
 335        if (node || !alloc)
 336                return node;
 337
 338        node = kmalloc(sizeof(*node), alloc);
 339        if (!node)
 340                return NULL;
 341
 342        node->nodeid = nodeid;
 343        spin_lock_init(&node->state_lock);
 344        spin_lock_init(&node->send_queue_lock);
 345        atomic_set(&node->send_queue_cnt, 0);
 346        INIT_LIST_HEAD(&node->send_queue);
 347        init_waitqueue_head(&node->shutdown_wait);
 348        node->users = 0;
 349        midcomms_node_reset(node);
 350
 351        spin_lock(&nodes_lock);
 352        /* check again if there was somebody else
 353         * earlier here to add the node
 354         */
 355        tmp = __find_node(nodeid, r);
 356        if (tmp) {
 357                spin_unlock(&nodes_lock);
 358                kfree(node);
 359                return tmp;
 360        }
 361
 362        hlist_add_head_rcu(&node->hlist, &node_hash[r]);
 363        spin_unlock(&nodes_lock);
 364
 365        node->debugfs = dlm_create_debug_comms_file(nodeid, node);
 366        return node;
 367}
 368
 369static int dlm_send_ack(int nodeid, uint32_t seq)
 370{
 371        int mb_len = sizeof(struct dlm_header);
 372        struct dlm_header *m_header;
 373        struct dlm_msg *msg;
 374        char *ppc;
 375
 376        msg = dlm_lowcomms_new_msg(nodeid, mb_len, GFP_NOFS, &ppc,
 377                                   NULL, NULL);
 378        if (!msg)
 379                return -ENOMEM;
 380
 381        m_header = (struct dlm_header *)ppc;
 382
 383        m_header->h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
 384        m_header->h_nodeid = dlm_our_nodeid();
 385        m_header->h_length = mb_len;
 386        m_header->h_cmd = DLM_ACK;
 387        m_header->u.h_seq = seq;
 388
 389        header_out(m_header);
 390        dlm_lowcomms_commit_msg(msg);
 391        dlm_lowcomms_put_msg(msg);
 392
 393        return 0;
 394}
 395
 396static int dlm_send_fin(struct midcomms_node *node,
 397                        void (*ack_rcv)(struct midcomms_node *node))
 398{
 399        int mb_len = sizeof(struct dlm_header);
 400        struct dlm_header *m_header;
 401        struct dlm_mhandle *mh;
 402        char *ppc;
 403
 404        mh = dlm_midcomms_get_mhandle(node->nodeid, mb_len, GFP_NOFS, &ppc);
 405        if (!mh)
 406                return -ENOMEM;
 407
 408        mh->ack_rcv = ack_rcv;
 409
 410        m_header = (struct dlm_header *)ppc;
 411
 412        m_header->h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
 413        m_header->h_nodeid = dlm_our_nodeid();
 414        m_header->h_length = mb_len;
 415        m_header->h_cmd = DLM_FIN;
 416
 417        header_out(m_header);
 418
 419        pr_debug("sending fin msg to node %d\n", node->nodeid);
 420        dlm_midcomms_commit_mhandle(mh);
 421        set_bit(DLM_NODE_FLAG_STOP_TX, &node->flags);
 422
 423        return 0;
 424}
 425
 426static void dlm_receive_ack(struct midcomms_node *node, uint32_t seq)
 427{
 428        struct dlm_mhandle *mh;
 429
 430        rcu_read_lock();
 431        list_for_each_entry_rcu(mh, &node->send_queue, list) {
 432                if (before(mh->seq, seq)) {
 433                        if (mh->ack_rcv)
 434                                mh->ack_rcv(node);
 435                } else {
 436                        /* send queue should be ordered */
 437                        break;
 438                }
 439        }
 440
 441        spin_lock(&node->send_queue_lock);
 442        list_for_each_entry_rcu(mh, &node->send_queue, list) {
 443                if (before(mh->seq, seq)) {
 444                        dlm_mhandle_delete(node, mh);
 445                } else {
 446                        /* send queue should be ordered */
 447                        break;
 448                }
 449        }
 450        spin_unlock(&node->send_queue_lock);
 451        rcu_read_unlock();
 452}
 453
 454static void dlm_pas_fin_ack_rcv(struct midcomms_node *node)
 455{
 456        spin_lock(&node->state_lock);
 457        pr_debug("receive passive fin ack from node %d with state %s\n",
 458                 node->nodeid, dlm_state_str(node->state));
 459
 460        switch (node->state) {
 461        case DLM_LAST_ACK:
 462                /* DLM_CLOSED */
 463                midcomms_node_reset(node);
 464                break;
 465        case DLM_CLOSED:
 466                /* not valid but somehow we got what we want */
 467                wake_up(&node->shutdown_wait);
 468                break;
 469        default:
 470                spin_unlock(&node->state_lock);
 471                log_print("%s: unexpected state: %d\n",
 472                          __func__, node->state);
 473                WARN_ON(1);
 474                return;
 475        }
 476        spin_unlock(&node->state_lock);
 477}
 478
 479static void dlm_midcomms_receive_buffer(union dlm_packet *p,
 480                                        struct midcomms_node *node,
 481                                        uint32_t seq)
 482{
 483        if (seq == node->seq_next) {
 484                node->seq_next++;
 485
 486                switch (p->header.h_cmd) {
 487                case DLM_FIN:
 488                        /* send ack before fin */
 489                        dlm_send_ack(node->nodeid, node->seq_next);
 490
 491                        spin_lock(&node->state_lock);
 492                        pr_debug("receive fin msg from node %d with state %s\n",
 493                                 node->nodeid, dlm_state_str(node->state));
 494
 495                        switch (node->state) {
 496                        case DLM_ESTABLISHED:
 497                                node->state = DLM_CLOSE_WAIT;
 498                                pr_debug("switch node %d to state %s\n",
 499                                         node->nodeid, dlm_state_str(node->state));
 500                                /* passive shutdown DLM_LAST_ACK case 1
 501                                 * additional we check if the node is used by
 502                                 * cluster manager events at all.
 503                                 */
 504                                if (node->users == 0) {
 505                                        node->state = DLM_LAST_ACK;
 506                                        pr_debug("switch node %d to state %s case 1\n",
 507                                                 node->nodeid, dlm_state_str(node->state));
 508                                        spin_unlock(&node->state_lock);
 509                                        goto send_fin;
 510                                }
 511                                break;
 512                        case DLM_FIN_WAIT1:
 513                                node->state = DLM_CLOSING;
 514                                pr_debug("switch node %d to state %s\n",
 515                                         node->nodeid, dlm_state_str(node->state));
 516                                break;
 517                        case DLM_FIN_WAIT2:
 518                                midcomms_node_reset(node);
 519                                pr_debug("switch node %d to state %s\n",
 520                                         node->nodeid, dlm_state_str(node->state));
 521                                wake_up(&node->shutdown_wait);
 522                                break;
 523                        case DLM_LAST_ACK:
 524                                /* probably remove_member caught it, do nothing */
 525                                break;
 526                        default:
 527                                spin_unlock(&node->state_lock);
 528                                log_print("%s: unexpected state: %d\n",
 529                                          __func__, node->state);
 530                                WARN_ON(1);
 531                                return;
 532                        }
 533                        spin_unlock(&node->state_lock);
 534
 535                        set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
 536                        break;
 537                default:
 538                        WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
 539                        dlm_receive_buffer(p, node->nodeid);
 540                        set_bit(DLM_NODE_ULP_DELIVERED, &node->flags);
 541                        break;
 542                }
 543        } else {
 544                /* retry to ack message which we already have by sending back
 545                 * current node->seq_next number as ack.
 546                 */
 547                if (seq < node->seq_next)
 548                        dlm_send_ack(node->nodeid, node->seq_next);
 549
 550                log_print_ratelimited("ignore dlm msg because seq mismatch, seq: %u, expected: %u, nodeid: %d",
 551                                      seq, node->seq_next, node->nodeid);
 552        }
 553
 554        return;
 555
 556send_fin:
 557        set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
 558        dlm_send_fin(node, dlm_pas_fin_ack_rcv);
 559}
 560
 561static struct midcomms_node *
 562dlm_midcomms_recv_node_lookup(int nodeid, const union dlm_packet *p,
 563                              uint16_t msglen, int (*cb)(struct midcomms_node *node))
 564{
 565        struct midcomms_node *node = NULL;
 566        gfp_t allocation = 0;
 567        int ret;
 568
 569        switch (p->header.h_cmd) {
 570        case DLM_RCOM:
 571                if (msglen < sizeof(struct dlm_rcom)) {
 572                        log_print("rcom msg too small: %u, will skip this message from node %d",
 573                                  msglen, nodeid);
 574                        return NULL;
 575                }
 576
 577                switch (le32_to_cpu(p->rcom.rc_type)) {
 578                case DLM_RCOM_NAMES:
 579                        fallthrough;
 580                case DLM_RCOM_NAMES_REPLY:
 581                        fallthrough;
 582                case DLM_RCOM_STATUS:
 583                        fallthrough;
 584                case DLM_RCOM_STATUS_REPLY:
 585                        node = nodeid2node(nodeid, 0);
 586                        if (node) {
 587                                spin_lock(&node->state_lock);
 588                                if (node->state != DLM_ESTABLISHED)
 589                                        pr_debug("receive begin RCOM msg from node %d with state %s\n",
 590                                                 node->nodeid, dlm_state_str(node->state));
 591
 592                                switch (node->state) {
 593                                case DLM_CLOSED:
 594                                        node->state = DLM_ESTABLISHED;
 595                                        pr_debug("switch node %d to state %s\n",
 596                                                 node->nodeid, dlm_state_str(node->state));
 597                                        break;
 598                                case DLM_ESTABLISHED:
 599                                        break;
 600                                default:
 601                                        /* some invalid state passive shutdown
 602                                         * was failed, we try to reset and
 603                                         * hope it will go on.
 604                                         */
 605                                        log_print("reset node %d because shutdown stuck",
 606                                                  node->nodeid);
 607
 608                                        midcomms_node_reset(node);
 609                                        node->state = DLM_ESTABLISHED;
 610                                        break;
 611                                }
 612                                spin_unlock(&node->state_lock);
 613                        }
 614
 615                        allocation = GFP_NOFS;
 616                        break;
 617                default:
 618                        break;
 619                }
 620
 621                break;
 622        default:
 623                break;
 624        }
 625
 626        node = nodeid2node(nodeid, allocation);
 627        if (!node) {
 628                switch (p->header.h_cmd) {
 629                case DLM_OPTS:
 630                        if (msglen < sizeof(struct dlm_opts)) {
 631                                log_print("opts msg too small: %u, will skip this message from node %d",
 632                                          msglen, nodeid);
 633                                return NULL;
 634                        }
 635
 636                        log_print_ratelimited("received dlm opts message nextcmd %d from node %d in an invalid sequence",
 637                                              p->opts.o_nextcmd, nodeid);
 638                        break;
 639                default:
 640                        log_print_ratelimited("received dlm message cmd %d from node %d in an invalid sequence",
 641                                              p->header.h_cmd, nodeid);
 642                        break;
 643                }
 644
 645                return NULL;
 646        }
 647
 648        ret = cb(node);
 649        if (ret < 0)
 650                return NULL;
 651
 652        return node;
 653}
 654
 655static int dlm_midcomms_version_check_3_2(struct midcomms_node *node)
 656{
 657        switch (node->version) {
 658        case DLM_VERSION_NOT_SET:
 659                node->version = DLM_VERSION_3_2;
 660                log_print("version 0x%08x for node %d detected", DLM_VERSION_3_2,
 661                          node->nodeid);
 662                break;
 663        case DLM_VERSION_3_2:
 664                break;
 665        default:
 666                log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
 667                                      DLM_VERSION_3_2, node->nodeid, node->version);
 668                return -1;
 669        }
 670
 671        return 0;
 672}
 673
 674static int dlm_opts_check_msglen(union dlm_packet *p, uint16_t msglen, int nodeid)
 675{
 676        int len = msglen;
 677
 678        /* we only trust outer header msglen because
 679         * it's checked against receive buffer length.
 680         */
 681        if (len < sizeof(struct dlm_opts))
 682                return -1;
 683        len -= sizeof(struct dlm_opts);
 684
 685        if (len < le16_to_cpu(p->opts.o_optlen))
 686                return -1;
 687        len -= le16_to_cpu(p->opts.o_optlen);
 688
 689        switch (p->opts.o_nextcmd) {
 690        case DLM_FIN:
 691                if (len < sizeof(struct dlm_header)) {
 692                        log_print("fin too small: %d, will skip this message from node %d",
 693                                  len, nodeid);
 694                        return -1;
 695                }
 696
 697                break;
 698        case DLM_MSG:
 699                if (len < sizeof(struct dlm_message)) {
 700                        log_print("msg too small: %d, will skip this message from node %d",
 701                                  msglen, nodeid);
 702                        return -1;
 703                }
 704
 705                break;
 706        case DLM_RCOM:
 707                if (len < sizeof(struct dlm_rcom)) {
 708                        log_print("rcom msg too small: %d, will skip this message from node %d",
 709                                  len, nodeid);
 710                        return -1;
 711                }
 712
 713                break;
 714        default:
 715                log_print("unsupported o_nextcmd received: %u, will skip this message from node %d",
 716                          p->opts.o_nextcmd, nodeid);
 717                return -1;
 718        }
 719
 720        return 0;
 721}
 722
 723static void dlm_midcomms_receive_buffer_3_2(union dlm_packet *p, int nodeid)
 724{
 725        uint16_t msglen = le16_to_cpu(p->header.h_length);
 726        struct midcomms_node *node;
 727        uint32_t seq;
 728        int ret, idx;
 729
 730        idx = srcu_read_lock(&nodes_srcu);
 731        node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
 732                                             dlm_midcomms_version_check_3_2);
 733        if (!node)
 734                goto out;
 735
 736        switch (p->header.h_cmd) {
 737        case DLM_RCOM:
 738                /* these rcom message we use to determine version.
 739                 * they have their own retransmission handling and
 740                 * are the first messages of dlm.
 741                 *
 742                 * length already checked.
 743                 */
 744                switch (le32_to_cpu(p->rcom.rc_type)) {
 745                case DLM_RCOM_NAMES:
 746                        fallthrough;
 747                case DLM_RCOM_NAMES_REPLY:
 748                        fallthrough;
 749                case DLM_RCOM_STATUS:
 750                        fallthrough;
 751                case DLM_RCOM_STATUS_REPLY:
 752                        break;
 753                default:
 754                        log_print("unsupported rcom type received: %u, will skip this message from node %d",
 755                                  le32_to_cpu(p->rcom.rc_type), nodeid);
 756                        goto out;
 757                }
 758
 759                WARN_ON(test_bit(DLM_NODE_FLAG_STOP_RX, &node->flags));
 760                dlm_receive_buffer(p, nodeid);
 761                break;
 762        case DLM_OPTS:
 763                seq = le32_to_cpu(p->header.u.h_seq);
 764
 765                ret = dlm_opts_check_msglen(p, msglen, nodeid);
 766                if (ret < 0) {
 767                        log_print("opts msg too small: %u, will skip this message from node %d",
 768                                  msglen, nodeid);
 769                        goto out;
 770                }
 771
 772                p = (union dlm_packet *)((unsigned char *)p->opts.o_opts +
 773                                         le16_to_cpu(p->opts.o_optlen));
 774
 775                /* recheck inner msglen just if it's not garbage */
 776                msglen = le16_to_cpu(p->header.h_length);
 777                switch (p->header.h_cmd) {
 778                case DLM_RCOM:
 779                        if (msglen < sizeof(struct dlm_rcom)) {
 780                                log_print("inner rcom msg too small: %u, will skip this message from node %d",
 781                                          msglen, nodeid);
 782                                goto out;
 783                        }
 784
 785                        break;
 786                case DLM_MSG:
 787                        if (msglen < sizeof(struct dlm_message)) {
 788                                log_print("inner msg too small: %u, will skip this message from node %d",
 789                                          msglen, nodeid);
 790                                goto out;
 791                        }
 792
 793                        break;
 794                case DLM_FIN:
 795                        if (msglen < sizeof(struct dlm_header)) {
 796                                log_print("inner fin too small: %u, will skip this message from node %d",
 797                                          msglen, nodeid);
 798                                goto out;
 799                        }
 800
 801                        break;
 802                default:
 803                        log_print("unsupported inner h_cmd received: %u, will skip this message from node %d",
 804                                  msglen, nodeid);
 805                        goto out;
 806                }
 807
 808                dlm_midcomms_receive_buffer(p, node, seq);
 809                break;
 810        case DLM_ACK:
 811                seq = le32_to_cpu(p->header.u.h_seq);
 812                dlm_receive_ack(node, seq);
 813                break;
 814        default:
 815                log_print("unsupported h_cmd received: %u, will skip this message from node %d",
 816                          p->header.h_cmd, nodeid);
 817                break;
 818        }
 819
 820out:
 821        srcu_read_unlock(&nodes_srcu, idx);
 822}
 823
 824static int dlm_midcomms_version_check_3_1(struct midcomms_node *node)
 825{
 826        switch (node->version) {
 827        case DLM_VERSION_NOT_SET:
 828                node->version = DLM_VERSION_3_1;
 829                log_print("version 0x%08x for node %d detected", DLM_VERSION_3_1,
 830                          node->nodeid);
 831                break;
 832        case DLM_VERSION_3_1:
 833                break;
 834        default:
 835                log_print_ratelimited("version mismatch detected, assumed 0x%08x but node %d has 0x%08x",
 836                                      DLM_VERSION_3_1, node->nodeid, node->version);
 837                return -1;
 838        }
 839
 840        return 0;
 841}
 842
 843static void dlm_midcomms_receive_buffer_3_1(union dlm_packet *p, int nodeid)
 844{
 845        uint16_t msglen = le16_to_cpu(p->header.h_length);
 846        struct midcomms_node *node;
 847        int idx;
 848
 849        idx = srcu_read_lock(&nodes_srcu);
 850        node = dlm_midcomms_recv_node_lookup(nodeid, p, msglen,
 851                                             dlm_midcomms_version_check_3_1);
 852        if (!node) {
 853                srcu_read_unlock(&nodes_srcu, idx);
 854                return;
 855        }
 856        srcu_read_unlock(&nodes_srcu, idx);
 857
 858        switch (p->header.h_cmd) {
 859        case DLM_RCOM:
 860                /* length already checked */
 861                break;
 862        case DLM_MSG:
 863                if (msglen < sizeof(struct dlm_message)) {
 864                        log_print("msg too small: %u, will skip this message from node %d",
 865                                  msglen, nodeid);
 866                        return;
 867                }
 868
 869                break;
 870        default:
 871                log_print("unsupported h_cmd received: %u, will skip this message from node %d",
 872                          p->header.h_cmd, nodeid);
 873                return;
 874        }
 875
 876        dlm_receive_buffer(p, nodeid);
 877}
 878
 879/*
 880 * Called from the low-level comms layer to process a buffer of
 881 * commands.
 882 */
 883
 884int dlm_process_incoming_buffer(int nodeid, unsigned char *buf, int len)
 885{
 886        const unsigned char *ptr = buf;
 887        const struct dlm_header *hd;
 888        uint16_t msglen;
 889        int ret = 0;
 890
 891        while (len >= sizeof(struct dlm_header)) {
 892                hd = (struct dlm_header *)ptr;
 893
 894                /* no message should be more than DLM_MAX_SOCKET_BUFSIZE or
 895                 * less than dlm_header size.
 896                 *
 897                 * Some messages does not have a 8 byte length boundary yet
 898                 * which can occur in a unaligned memory access of some dlm
 899                 * messages. However this problem need to be fixed at the
 900                 * sending side, for now it seems nobody run into architecture
 901                 * related issues yet but it slows down some processing.
 902                 * Fixing this issue should be scheduled in future by doing
 903                 * the next major version bump.
 904                 */
 905                msglen = le16_to_cpu(hd->h_length);
 906                if (msglen > DLM_MAX_SOCKET_BUFSIZE ||
 907                    msglen < sizeof(struct dlm_header)) {
 908                        log_print("received invalid length header: %u from node %d, will abort message parsing",
 909                                  msglen, nodeid);
 910                        return -EBADMSG;
 911                }
 912
 913                /* caller will take care that leftover
 914                 * will be parsed next call with more data
 915                 */
 916                if (msglen > len)
 917                        break;
 918
 919                switch (hd->h_version) {
 920                case cpu_to_le32(DLM_VERSION_3_1):
 921                        dlm_midcomms_receive_buffer_3_1((union dlm_packet *)ptr, nodeid);
 922                        break;
 923                case cpu_to_le32(DLM_VERSION_3_2):
 924                        dlm_midcomms_receive_buffer_3_2((union dlm_packet *)ptr, nodeid);
 925                        break;
 926                default:
 927                        log_print("received invalid version header: %u from node %d, will skip this message",
 928                                  le32_to_cpu(hd->h_version), nodeid);
 929                        break;
 930                }
 931
 932                ret += msglen;
 933                len -= msglen;
 934                ptr += msglen;
 935        }
 936
 937        return ret;
 938}
 939
 940void dlm_midcomms_receive_done(int nodeid)
 941{
 942        struct midcomms_node *node;
 943        int idx;
 944
 945        idx = srcu_read_lock(&nodes_srcu);
 946        node = nodeid2node(nodeid, 0);
 947        if (!node) {
 948                srcu_read_unlock(&nodes_srcu, idx);
 949                return;
 950        }
 951
 952        /* old protocol, we do nothing */
 953        switch (node->version) {
 954        case DLM_VERSION_3_2:
 955                break;
 956        default:
 957                srcu_read_unlock(&nodes_srcu, idx);
 958                return;
 959        }
 960
 961        /* do nothing if we didn't delivered stateful to ulp */
 962        if (!test_and_clear_bit(DLM_NODE_ULP_DELIVERED,
 963                                &node->flags)) {
 964                srcu_read_unlock(&nodes_srcu, idx);
 965                return;
 966        }
 967
 968        spin_lock(&node->state_lock);
 969        /* we only ack if state is ESTABLISHED */
 970        switch (node->state) {
 971        case DLM_ESTABLISHED:
 972                spin_unlock(&node->state_lock);
 973                dlm_send_ack(node->nodeid, node->seq_next);
 974                break;
 975        default:
 976                spin_unlock(&node->state_lock);
 977                /* do nothing FIN has it's own ack send */
 978                break;
 979        }
 980        srcu_read_unlock(&nodes_srcu, idx);
 981}
 982
 983void dlm_midcomms_unack_msg_resend(int nodeid)
 984{
 985        struct midcomms_node *node;
 986        struct dlm_mhandle *mh;
 987        int idx, ret;
 988
 989        idx = srcu_read_lock(&nodes_srcu);
 990        node = nodeid2node(nodeid, 0);
 991        if (!node) {
 992                srcu_read_unlock(&nodes_srcu, idx);
 993                return;
 994        }
 995
 996        /* old protocol, we don't support to retransmit on failure */
 997        switch (node->version) {
 998        case DLM_VERSION_3_2:
 999                break;
1000        default:
1001                srcu_read_unlock(&nodes_srcu, idx);
1002                return;
1003        }
1004
1005        rcu_read_lock();
1006        list_for_each_entry_rcu(mh, &node->send_queue, list) {
1007                if (!mh->committed)
1008                        continue;
1009
1010                ret = dlm_lowcomms_resend_msg(mh->msg);
1011                if (!ret)
1012                        log_print_ratelimited("retransmit dlm msg, seq %u, nodeid %d",
1013                                              mh->seq, node->nodeid);
1014        }
1015        rcu_read_unlock();
1016        srcu_read_unlock(&nodes_srcu, idx);
1017}
1018
1019static void dlm_fill_opts_header(struct dlm_opts *opts, uint16_t inner_len,
1020                                 uint32_t seq)
1021{
1022        opts->o_header.h_cmd = DLM_OPTS;
1023        opts->o_header.h_version = (DLM_HEADER_MAJOR | DLM_HEADER_MINOR);
1024        opts->o_header.h_nodeid = dlm_our_nodeid();
1025        opts->o_header.h_length = DLM_MIDCOMMS_OPT_LEN + inner_len;
1026        opts->o_header.u.h_seq = seq;
1027        header_out(&opts->o_header);
1028}
1029
1030static void midcomms_new_msg_cb(void *data)
1031{
1032        struct dlm_mhandle *mh = data;
1033
1034        atomic_inc(&mh->node->send_queue_cnt);
1035
1036        spin_lock(&mh->node->send_queue_lock);
1037        list_add_tail_rcu(&mh->list, &mh->node->send_queue);
1038        spin_unlock(&mh->node->send_queue_lock);
1039
1040        mh->seq = mh->node->seq_send++;
1041}
1042
1043static struct dlm_msg *dlm_midcomms_get_msg_3_2(struct dlm_mhandle *mh, int nodeid,
1044                                                int len, gfp_t allocation, char **ppc)
1045{
1046        struct dlm_opts *opts;
1047        struct dlm_msg *msg;
1048
1049        msg = dlm_lowcomms_new_msg(nodeid, len + DLM_MIDCOMMS_OPT_LEN,
1050                                   allocation, ppc, midcomms_new_msg_cb, mh);
1051        if (!msg)
1052                return NULL;
1053
1054        opts = (struct dlm_opts *)*ppc;
1055        mh->opts = opts;
1056
1057        /* add possible options here */
1058        dlm_fill_opts_header(opts, len, mh->seq);
1059
1060        *ppc += sizeof(*opts);
1061        mh->inner_hd = (const struct dlm_header *)*ppc;
1062        return msg;
1063}
1064
1065struct dlm_mhandle *dlm_midcomms_get_mhandle(int nodeid, int len,
1066                                             gfp_t allocation, char **ppc)
1067{
1068        struct midcomms_node *node;
1069        struct dlm_mhandle *mh;
1070        struct dlm_msg *msg;
1071        int idx;
1072
1073        idx = srcu_read_lock(&nodes_srcu);
1074        node = nodeid2node(nodeid, 0);
1075        if (!node) {
1076                WARN_ON_ONCE(1);
1077                goto err;
1078        }
1079
1080        /* this is a bug, however we going on and hope it will be resolved */
1081        WARN_ON(test_bit(DLM_NODE_FLAG_STOP_TX, &node->flags));
1082
1083        mh = dlm_allocate_mhandle();
1084        if (!mh)
1085                goto err;
1086
1087        mh->committed = false;
1088        mh->ack_rcv = NULL;
1089        mh->idx = idx;
1090        mh->node = node;
1091
1092        switch (node->version) {
1093        case DLM_VERSION_3_1:
1094                msg = dlm_lowcomms_new_msg(nodeid, len, allocation, ppc,
1095                                           NULL, NULL);
1096                if (!msg) {
1097                        dlm_free_mhandle(mh);
1098                        goto err;
1099                }
1100
1101                break;
1102        case DLM_VERSION_3_2:
1103                msg = dlm_midcomms_get_msg_3_2(mh, nodeid, len, allocation,
1104                                               ppc);
1105                if (!msg) {
1106                        dlm_free_mhandle(mh);
1107                        goto err;
1108                }
1109
1110                break;
1111        default:
1112                dlm_free_mhandle(mh);
1113                WARN_ON(1);
1114                goto err;
1115        }
1116
1117        mh->msg = msg;
1118
1119        /* keep in mind that is a must to call
1120         * dlm_midcomms_commit_msg() which releases
1121         * nodes_srcu using mh->idx which is assumed
1122         * here that the application will call it.
1123         */
1124        return mh;
1125
1126err:
1127        srcu_read_unlock(&nodes_srcu, idx);
1128        return NULL;
1129}
1130
1131static void dlm_midcomms_commit_msg_3_2(struct dlm_mhandle *mh)
1132{
1133        /* nexthdr chain for fast lookup */
1134        mh->opts->o_nextcmd = mh->inner_hd->h_cmd;
1135        mh->committed = true;
1136        dlm_lowcomms_commit_msg(mh->msg);
1137}
1138
1139void dlm_midcomms_commit_mhandle(struct dlm_mhandle *mh)
1140{
1141        switch (mh->node->version) {
1142        case DLM_VERSION_3_1:
1143                srcu_read_unlock(&nodes_srcu, mh->idx);
1144
1145                dlm_lowcomms_commit_msg(mh->msg);
1146                dlm_lowcomms_put_msg(mh->msg);
1147                /* mh is not part of rcu list in this case */
1148                dlm_free_mhandle(mh);
1149                break;
1150        case DLM_VERSION_3_2:
1151                dlm_midcomms_commit_msg_3_2(mh);
1152                srcu_read_unlock(&nodes_srcu, mh->idx);
1153                break;
1154        default:
1155                srcu_read_unlock(&nodes_srcu, mh->idx);
1156                WARN_ON(1);
1157                break;
1158        }
1159}
1160
1161int dlm_midcomms_start(void)
1162{
1163        int i;
1164
1165        for (i = 0; i < CONN_HASH_SIZE; i++)
1166                INIT_HLIST_HEAD(&node_hash[i]);
1167
1168        return dlm_lowcomms_start();
1169}
1170
1171static void dlm_act_fin_ack_rcv(struct midcomms_node *node)
1172{
1173        spin_lock(&node->state_lock);
1174        pr_debug("receive active fin ack from node %d with state %s\n",
1175                 node->nodeid, dlm_state_str(node->state));
1176
1177        switch (node->state) {
1178        case DLM_FIN_WAIT1:
1179                node->state = DLM_FIN_WAIT2;
1180                pr_debug("switch node %d to state %s\n",
1181                         node->nodeid, dlm_state_str(node->state));
1182                break;
1183        case DLM_CLOSING:
1184                midcomms_node_reset(node);
1185                pr_debug("switch node %d to state %s\n",
1186                         node->nodeid, dlm_state_str(node->state));
1187                wake_up(&node->shutdown_wait);
1188                break;
1189        case DLM_CLOSED:
1190                /* not valid but somehow we got what we want */
1191                wake_up(&node->shutdown_wait);
1192                break;
1193        default:
1194                spin_unlock(&node->state_lock);
1195                log_print("%s: unexpected state: %d\n",
1196                          __func__, node->state);
1197                WARN_ON(1);
1198                return;
1199        }
1200        spin_unlock(&node->state_lock);
1201}
1202
1203void dlm_midcomms_add_member(int nodeid)
1204{
1205        struct midcomms_node *node;
1206        int idx;
1207
1208        if (nodeid == dlm_our_nodeid())
1209                return;
1210
1211        idx = srcu_read_lock(&nodes_srcu);
1212        node = nodeid2node(nodeid, GFP_NOFS);
1213        if (!node) {
1214                srcu_read_unlock(&nodes_srcu, idx);
1215                return;
1216        }
1217
1218        spin_lock(&node->state_lock);
1219        if (!node->users) {
1220                pr_debug("receive add member from node %d with state %s\n",
1221                         node->nodeid, dlm_state_str(node->state));
1222                switch (node->state) {
1223                case DLM_ESTABLISHED:
1224                        break;
1225                case DLM_CLOSED:
1226                        node->state = DLM_ESTABLISHED;
1227                        pr_debug("switch node %d to state %s\n",
1228                                 node->nodeid, dlm_state_str(node->state));
1229                        break;
1230                default:
1231                        /* some invalid state passive shutdown
1232                         * was failed, we try to reset and
1233                         * hope it will go on.
1234                         */
1235                        log_print("reset node %d because shutdown stuck",
1236                                  node->nodeid);
1237
1238                        midcomms_node_reset(node);
1239                        node->state = DLM_ESTABLISHED;
1240                        break;
1241                }
1242        }
1243
1244        node->users++;
1245        pr_debug("node %d users inc count %d\n", nodeid, node->users);
1246        spin_unlock(&node->state_lock);
1247
1248        srcu_read_unlock(&nodes_srcu, idx);
1249}
1250
1251void dlm_midcomms_remove_member(int nodeid)
1252{
1253        struct midcomms_node *node;
1254        int idx;
1255
1256        if (nodeid == dlm_our_nodeid())
1257                return;
1258
1259        idx = srcu_read_lock(&nodes_srcu);
1260        node = nodeid2node(nodeid, 0);
1261        if (!node) {
1262                srcu_read_unlock(&nodes_srcu, idx);
1263                return;
1264        }
1265
1266        spin_lock(&node->state_lock);
1267        node->users--;
1268        pr_debug("node %d users dec count %d\n", nodeid, node->users);
1269
1270        /* hitting users count to zero means the
1271         * other side is running dlm_midcomms_stop()
1272         * we meet us to have a clean disconnect.
1273         */
1274        if (node->users == 0) {
1275                pr_debug("receive remove member from node %d with state %s\n",
1276                         node->nodeid, dlm_state_str(node->state));
1277                switch (node->state) {
1278                case DLM_ESTABLISHED:
1279                        break;
1280                case DLM_CLOSE_WAIT:
1281                        /* passive shutdown DLM_LAST_ACK case 2 */
1282                        node->state = DLM_LAST_ACK;
1283                        spin_unlock(&node->state_lock);
1284
1285                        pr_debug("switch node %d to state %s case 2\n",
1286                                 node->nodeid, dlm_state_str(node->state));
1287                        goto send_fin;
1288                case DLM_LAST_ACK:
1289                        /* probably receive fin caught it, do nothing */
1290                        break;
1291                case DLM_CLOSED:
1292                        /* already gone, do nothing */
1293                        break;
1294                default:
1295                        log_print("%s: unexpected state: %d\n",
1296                                  __func__, node->state);
1297                        break;
1298                }
1299        }
1300        spin_unlock(&node->state_lock);
1301
1302        srcu_read_unlock(&nodes_srcu, idx);
1303        return;
1304
1305send_fin:
1306        set_bit(DLM_NODE_FLAG_STOP_RX, &node->flags);
1307        dlm_send_fin(node, dlm_pas_fin_ack_rcv);
1308        srcu_read_unlock(&nodes_srcu, idx);
1309}
1310
1311static void midcomms_node_release(struct rcu_head *rcu)
1312{
1313        struct midcomms_node *node = container_of(rcu, struct midcomms_node, rcu);
1314
1315        WARN_ON(atomic_read(&node->send_queue_cnt));
1316        kfree(node);
1317}
1318
1319static void midcomms_shutdown(struct midcomms_node *node)
1320{
1321        int ret;
1322
1323        /* old protocol, we don't wait for pending operations */
1324        switch (node->version) {
1325        case DLM_VERSION_3_2:
1326                break;
1327        default:
1328                return;
1329        }
1330
1331        spin_lock(&node->state_lock);
1332        pr_debug("receive active shutdown for node %d with state %s\n",
1333                 node->nodeid, dlm_state_str(node->state));
1334        switch (node->state) {
1335        case DLM_ESTABLISHED:
1336                node->state = DLM_FIN_WAIT1;
1337                pr_debug("switch node %d to state %s case 2\n",
1338                         node->nodeid, dlm_state_str(node->state));
1339                break;
1340        case DLM_CLOSED:
1341                /* we have what we want */
1342                spin_unlock(&node->state_lock);
1343                return;
1344        default:
1345                /* busy to enter DLM_FIN_WAIT1, wait until passive
1346                 * done in shutdown_wait to enter DLM_CLOSED.
1347                 */
1348                break;
1349        }
1350        spin_unlock(&node->state_lock);
1351
1352        if (node->state == DLM_FIN_WAIT1) {
1353                dlm_send_fin(node, dlm_act_fin_ack_rcv);
1354
1355                if (DLM_DEBUG_FENCE_TERMINATION)
1356                        msleep(5000);
1357        }
1358
1359        /* wait for other side dlm + fin */
1360        ret = wait_event_timeout(node->shutdown_wait,
1361                                 node->state == DLM_CLOSED ||
1362                                 test_bit(DLM_NODE_FLAG_CLOSE, &node->flags),
1363                                 DLM_SHUTDOWN_TIMEOUT);
1364        if (!ret || test_bit(DLM_NODE_FLAG_CLOSE, &node->flags)) {
1365                pr_debug("active shutdown timed out for node %d with state %s\n",
1366                         node->nodeid, dlm_state_str(node->state));
1367                midcomms_node_reset(node);
1368                return;
1369        }
1370
1371        pr_debug("active shutdown done for node %d with state %s\n",
1372                 node->nodeid, dlm_state_str(node->state));
1373}
1374
1375void dlm_midcomms_shutdown(void)
1376{
1377        struct midcomms_node *node;
1378        int i, idx;
1379
1380        mutex_lock(&close_lock);
1381        idx = srcu_read_lock(&nodes_srcu);
1382        for (i = 0; i < CONN_HASH_SIZE; i++) {
1383                hlist_for_each_entry_rcu(node, &node_hash[i], hlist) {
1384                        midcomms_shutdown(node);
1385
1386                        dlm_delete_debug_comms_file(node->debugfs);
1387
1388                        spin_lock(&nodes_lock);
1389                        hlist_del_rcu(&node->hlist);
1390                        spin_unlock(&nodes_lock);
1391
1392                        call_srcu(&nodes_srcu, &node->rcu, midcomms_node_release);
1393                }
1394        }
1395        srcu_read_unlock(&nodes_srcu, idx);
1396        mutex_unlock(&close_lock);
1397
1398        dlm_lowcomms_shutdown();
1399}
1400
1401int dlm_midcomms_close(int nodeid)
1402{
1403        struct midcomms_node *node;
1404        int idx, ret;
1405
1406        if (nodeid == dlm_our_nodeid())
1407                return 0;
1408
1409        idx = srcu_read_lock(&nodes_srcu);
1410        /* Abort pending close/remove operation */
1411        node = nodeid2node(nodeid, 0);
1412        if (node) {
1413                /* let shutdown waiters leave */
1414                set_bit(DLM_NODE_FLAG_CLOSE, &node->flags);
1415                wake_up(&node->shutdown_wait);
1416        }
1417        srcu_read_unlock(&nodes_srcu, idx);
1418
1419        synchronize_srcu(&nodes_srcu);
1420
1421        idx = srcu_read_lock(&nodes_srcu);
1422        mutex_lock(&close_lock);
1423        node = nodeid2node(nodeid, 0);
1424        if (!node) {
1425                mutex_unlock(&close_lock);
1426                srcu_read_unlock(&nodes_srcu, idx);
1427                return dlm_lowcomms_close(nodeid);
1428        }
1429
1430        ret = dlm_lowcomms_close(nodeid);
1431        spin_lock(&node->state_lock);
1432        midcomms_node_reset(node);
1433        spin_unlock(&node->state_lock);
1434        srcu_read_unlock(&nodes_srcu, idx);
1435        mutex_unlock(&close_lock);
1436
1437        return ret;
1438}
1439
1440/* debug functionality to send raw dlm msg from user space */
1441struct dlm_rawmsg_data {
1442        struct midcomms_node *node;
1443        void *buf;
1444};
1445
1446static void midcomms_new_rawmsg_cb(void *data)
1447{
1448        struct dlm_rawmsg_data *rd = data;
1449        struct dlm_header *h = rd->buf;
1450
1451        switch (h->h_version) {
1452        case cpu_to_le32(DLM_VERSION_3_1):
1453                break;
1454        default:
1455                switch (h->h_cmd) {
1456                case DLM_OPTS:
1457                        if (!h->u.h_seq)
1458                                h->u.h_seq = rd->node->seq_send++;
1459                        break;
1460                default:
1461                        break;
1462                }
1463                break;
1464        }
1465}
1466
1467int dlm_midcomms_rawmsg_send(struct midcomms_node *node, void *buf,
1468                             int buflen)
1469{
1470        struct dlm_rawmsg_data rd;
1471        struct dlm_msg *msg;
1472        char *msgbuf;
1473
1474        rd.node = node;
1475        rd.buf = buf;
1476
1477        msg = dlm_lowcomms_new_msg(node->nodeid, buflen, GFP_NOFS,
1478                                   &msgbuf, midcomms_new_rawmsg_cb, &rd);
1479        if (!msg)
1480                return -ENOMEM;
1481
1482        memcpy(msgbuf, buf, buflen);
1483        dlm_lowcomms_commit_msg(msg);
1484        return 0;
1485}
1486
1487