linux/net/tipc/node.c
<<
>>
Prefs
   1/*
   2 * net/tipc/node.c: TIPC node management routines
   3 *
   4 * Copyright (c) 2000-2006, 2012-2015, Ericsson AB
   5 * Copyright (c) 2005-2006, 2010-2014, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include "core.h"
  38#include "link.h"
  39#include "node.h"
  40#include "name_distr.h"
  41#include "socket.h"
  42#include "bcast.h"
  43#include "discover.h"
  44#include "netlink.h"
  45
  46#define INVALID_NODE_SIG        0x10000
  47
  48/* Flags used to take different actions according to flag type
  49 * TIPC_NOTIFY_NODE_DOWN: notify node is down
  50 * TIPC_NOTIFY_NODE_UP: notify node is up
  51 * TIPC_DISTRIBUTE_NAME: publish or withdraw link state name type
  52 */
  53enum {
  54        TIPC_NOTIFY_NODE_DOWN           = (1 << 3),
  55        TIPC_NOTIFY_NODE_UP             = (1 << 4),
  56        TIPC_NOTIFY_LINK_UP             = (1 << 6),
  57        TIPC_NOTIFY_LINK_DOWN           = (1 << 7)
  58};
  59
  60struct tipc_link_entry {
  61        struct tipc_link *link;
  62        spinlock_t lock; /* per link */
  63        u32 mtu;
  64        struct sk_buff_head inputq;
  65        struct tipc_media_addr maddr;
  66};
  67
  68struct tipc_bclink_entry {
  69        struct tipc_link *link;
  70        struct sk_buff_head inputq1;
  71        struct sk_buff_head arrvq;
  72        struct sk_buff_head inputq2;
  73        struct sk_buff_head namedq;
  74};
  75
  76/**
  77 * struct tipc_node - TIPC node structure
  78 * @addr: network address of node
  79 * @ref: reference counter to node object
  80 * @lock: rwlock governing access to structure
  81 * @net: the applicable net namespace
  82 * @hash: links to adjacent nodes in unsorted hash chain
  83 * @inputq: pointer to input queue containing messages for msg event
  84 * @namedq: pointer to name table input queue with name table messages
  85 * @active_links: bearer ids of active links, used as index into links[] array
  86 * @links: array containing references to all links to node
  87 * @action_flags: bit mask of different types of node actions
  88 * @state: connectivity state vs peer node
  89 * @sync_point: sequence number where synch/failover is finished
  90 * @list: links to adjacent nodes in sorted list of cluster's nodes
  91 * @working_links: number of working links to node (both active and standby)
  92 * @link_cnt: number of links to node
  93 * @capabilities: bitmap, indicating peer node's functional capabilities
  94 * @signature: node instance identifier
  95 * @link_id: local and remote bearer ids of changing link, if any
  96 * @publ_list: list of publications
  97 * @rcu: rcu struct for tipc_node
  98 */
  99struct tipc_node {
 100        u32 addr;
 101        struct kref kref;
 102        rwlock_t lock;
 103        struct net *net;
 104        struct hlist_node hash;
 105        int active_links[2];
 106        struct tipc_link_entry links[MAX_BEARERS];
 107        struct tipc_bclink_entry bc_entry;
 108        int action_flags;
 109        struct list_head list;
 110        int state;
 111        u16 sync_point;
 112        int link_cnt;
 113        u16 working_links;
 114        u16 capabilities;
 115        u32 signature;
 116        u32 link_id;
 117        struct list_head publ_list;
 118        struct list_head conn_sks;
 119        unsigned long keepalive_intv;
 120        struct timer_list timer;
 121        struct rcu_head rcu;
 122};
 123
 124/* Node FSM states and events:
 125 */
 126enum {
 127        SELF_DOWN_PEER_DOWN    = 0xdd,
 128        SELF_UP_PEER_UP        = 0xaa,
 129        SELF_DOWN_PEER_LEAVING = 0xd1,
 130        SELF_UP_PEER_COMING    = 0xac,
 131        SELF_COMING_PEER_UP    = 0xca,
 132        SELF_LEAVING_PEER_DOWN = 0x1d,
 133        NODE_FAILINGOVER       = 0xf0,
 134        NODE_SYNCHING          = 0xcc
 135};
 136
 137enum {
 138        SELF_ESTABL_CONTACT_EVT = 0xece,
 139        SELF_LOST_CONTACT_EVT   = 0x1ce,
 140        PEER_ESTABL_CONTACT_EVT = 0x9ece,
 141        PEER_LOST_CONTACT_EVT   = 0x91ce,
 142        NODE_FAILOVER_BEGIN_EVT = 0xfbe,
 143        NODE_FAILOVER_END_EVT   = 0xfee,
 144        NODE_SYNCH_BEGIN_EVT    = 0xcbe,
 145        NODE_SYNCH_END_EVT      = 0xcee
 146};
 147
 148static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
 149                                  struct sk_buff_head *xmitq,
 150                                  struct tipc_media_addr **maddr);
 151static void tipc_node_link_down(struct tipc_node *n, int bearer_id,
 152                                bool delete);
 153static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq);
 154static void tipc_node_delete(struct tipc_node *node);
 155static void tipc_node_timeout(unsigned long data);
 156static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
 157static struct tipc_node *tipc_node_find(struct net *net, u32 addr);
 158static void tipc_node_put(struct tipc_node *node);
 159static bool tipc_node_is_up(struct tipc_node *n);
 160
 161struct tipc_sock_conn {
 162        u32 port;
 163        u32 peer_port;
 164        u32 peer_node;
 165        struct list_head list;
 166};
 167
 168static struct tipc_link *node_active_link(struct tipc_node *n, int sel)
 169{
 170        int bearer_id = n->active_links[sel & 1];
 171
 172        if (unlikely(bearer_id == INVALID_BEARER_ID))
 173                return NULL;
 174
 175        return n->links[bearer_id].link;
 176}
 177
 178int tipc_node_get_mtu(struct net *net, u32 addr, u32 sel)
 179{
 180        struct tipc_node *n;
 181        int bearer_id;
 182        unsigned int mtu = MAX_MSG_SIZE;
 183
 184        n = tipc_node_find(net, addr);
 185        if (unlikely(!n))
 186                return mtu;
 187
 188        bearer_id = n->active_links[sel & 1];
 189        if (likely(bearer_id != INVALID_BEARER_ID))
 190                mtu = n->links[bearer_id].mtu;
 191        tipc_node_put(n);
 192        return mtu;
 193}
 194/*
 195 * A trivial power-of-two bitmask technique is used for speed, since this
 196 * operation is done for every incoming TIPC packet. The number of hash table
 197 * entries has been chosen so that no hash chain exceeds 8 nodes and will
 198 * usually be much smaller (typically only a single node).
 199 */
 200static unsigned int tipc_hashfn(u32 addr)
 201{
 202        return addr & (NODE_HTABLE_SIZE - 1);
 203}
 204
 205static void tipc_node_kref_release(struct kref *kref)
 206{
 207        struct tipc_node *n = container_of(kref, struct tipc_node, kref);
 208
 209        kfree(n->bc_entry.link);
 210        kfree_rcu(n, rcu);
 211}
 212
 213static void tipc_node_put(struct tipc_node *node)
 214{
 215        kref_put(&node->kref, tipc_node_kref_release);
 216}
 217
 218static void tipc_node_get(struct tipc_node *node)
 219{
 220        kref_get(&node->kref);
 221}
 222
 223/*
 224 * tipc_node_find - locate specified node object, if it exists
 225 */
 226static struct tipc_node *tipc_node_find(struct net *net, u32 addr)
 227{
 228        struct tipc_net *tn = tipc_net(net);
 229        struct tipc_node *node;
 230        unsigned int thash = tipc_hashfn(addr);
 231
 232        if (unlikely(!in_own_cluster_exact(net, addr)))
 233                return NULL;
 234
 235        rcu_read_lock();
 236        hlist_for_each_entry_rcu(node, &tn->node_htable[thash], hash) {
 237                if (node->addr != addr)
 238                        continue;
 239                if (!kref_get_unless_zero(&node->kref))
 240                        node = NULL;
 241                break;
 242        }
 243        rcu_read_unlock();
 244        return node;
 245}
 246
 247static void tipc_node_read_lock(struct tipc_node *n)
 248{
 249        read_lock_bh(&n->lock);
 250}
 251
 252static void tipc_node_read_unlock(struct tipc_node *n)
 253{
 254        read_unlock_bh(&n->lock);
 255}
 256
 257static void tipc_node_write_lock(struct tipc_node *n)
 258{
 259        write_lock_bh(&n->lock);
 260}
 261
 262static void tipc_node_write_unlock(struct tipc_node *n)
 263{
 264        struct net *net = n->net;
 265        u32 addr = 0;
 266        u32 flags = n->action_flags;
 267        u32 link_id = 0;
 268        struct list_head *publ_list;
 269
 270        if (likely(!flags)) {
 271                write_unlock_bh(&n->lock);
 272                return;
 273        }
 274
 275        addr = n->addr;
 276        link_id = n->link_id;
 277        publ_list = &n->publ_list;
 278
 279        n->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
 280                             TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
 281
 282        write_unlock_bh(&n->lock);
 283
 284        if (flags & TIPC_NOTIFY_NODE_DOWN)
 285                tipc_publ_notify(net, publ_list, addr);
 286
 287        if (flags & TIPC_NOTIFY_NODE_UP)
 288                tipc_named_node_up(net, addr);
 289
 290        if (flags & TIPC_NOTIFY_LINK_UP)
 291                tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
 292                                     TIPC_NODE_SCOPE, link_id, addr);
 293
 294        if (flags & TIPC_NOTIFY_LINK_DOWN)
 295                tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
 296                                      link_id, addr);
 297}
 298
 299struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
 300{
 301        struct tipc_net *tn = net_generic(net, tipc_net_id);
 302        struct tipc_node *n, *temp_node;
 303        int i;
 304
 305        spin_lock_bh(&tn->node_list_lock);
 306        n = tipc_node_find(net, addr);
 307        if (n)
 308                goto exit;
 309        n = kzalloc(sizeof(*n), GFP_ATOMIC);
 310        if (!n) {
 311                pr_warn("Node creation failed, no memory\n");
 312                goto exit;
 313        }
 314        n->addr = addr;
 315        n->net = net;
 316        n->capabilities = capabilities;
 317        kref_init(&n->kref);
 318        rwlock_init(&n->lock);
 319        INIT_HLIST_NODE(&n->hash);
 320        INIT_LIST_HEAD(&n->list);
 321        INIT_LIST_HEAD(&n->publ_list);
 322        INIT_LIST_HEAD(&n->conn_sks);
 323        skb_queue_head_init(&n->bc_entry.namedq);
 324        skb_queue_head_init(&n->bc_entry.inputq1);
 325        __skb_queue_head_init(&n->bc_entry.arrvq);
 326        skb_queue_head_init(&n->bc_entry.inputq2);
 327        for (i = 0; i < MAX_BEARERS; i++)
 328                spin_lock_init(&n->links[i].lock);
 329        n->state = SELF_DOWN_PEER_LEAVING;
 330        n->signature = INVALID_NODE_SIG;
 331        n->active_links[0] = INVALID_BEARER_ID;
 332        n->active_links[1] = INVALID_BEARER_ID;
 333        if (!tipc_link_bc_create(net, tipc_own_addr(net), n->addr,
 334                                 U16_MAX,
 335                                 tipc_link_window(tipc_bc_sndlink(net)),
 336                                 n->capabilities,
 337                                 &n->bc_entry.inputq1,
 338                                 &n->bc_entry.namedq,
 339                                 tipc_bc_sndlink(net),
 340                                 &n->bc_entry.link)) {
 341                pr_warn("Broadcast rcv link creation failed, no memory\n");
 342                kfree(n);
 343                n = NULL;
 344                goto exit;
 345        }
 346        tipc_node_get(n);
 347        setup_timer(&n->timer, tipc_node_timeout, (unsigned long)n);
 348        n->keepalive_intv = U32_MAX;
 349        hlist_add_head_rcu(&n->hash, &tn->node_htable[tipc_hashfn(addr)]);
 350        list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
 351                if (n->addr < temp_node->addr)
 352                        break;
 353        }
 354        list_add_tail_rcu(&n->list, &temp_node->list);
 355exit:
 356        spin_unlock_bh(&tn->node_list_lock);
 357        return n;
 358}
 359
 360static void tipc_node_calculate_timer(struct tipc_node *n, struct tipc_link *l)
 361{
 362        unsigned long tol = tipc_link_tolerance(l);
 363        unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
 364        unsigned long keepalive_intv = msecs_to_jiffies(intv);
 365
 366        /* Link with lowest tolerance determines timer interval */
 367        if (keepalive_intv < n->keepalive_intv)
 368                n->keepalive_intv = keepalive_intv;
 369
 370        /* Ensure link's abort limit corresponds to current interval */
 371        tipc_link_set_abort_limit(l, tol / jiffies_to_msecs(n->keepalive_intv));
 372}
 373
 374static void tipc_node_delete(struct tipc_node *node)
 375{
 376        list_del_rcu(&node->list);
 377        hlist_del_rcu(&node->hash);
 378        tipc_node_put(node);
 379
 380        del_timer_sync(&node->timer);
 381        tipc_node_put(node);
 382}
 383
 384void tipc_node_stop(struct net *net)
 385{
 386        struct tipc_net *tn = tipc_net(net);
 387        struct tipc_node *node, *t_node;
 388
 389        spin_lock_bh(&tn->node_list_lock);
 390        list_for_each_entry_safe(node, t_node, &tn->node_list, list)
 391                tipc_node_delete(node);
 392        spin_unlock_bh(&tn->node_list_lock);
 393}
 394
 395void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr)
 396{
 397        struct tipc_node *n;
 398
 399        if (in_own_node(net, addr))
 400                return;
 401
 402        n = tipc_node_find(net, addr);
 403        if (!n) {
 404                pr_warn("Node subscribe rejected, unknown node 0x%x\n", addr);
 405                return;
 406        }
 407        tipc_node_write_lock(n);
 408        list_add_tail(subscr, &n->publ_list);
 409        tipc_node_write_unlock(n);
 410        tipc_node_put(n);
 411}
 412
 413void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32 addr)
 414{
 415        struct tipc_node *n;
 416
 417        if (in_own_node(net, addr))
 418                return;
 419
 420        n = tipc_node_find(net, addr);
 421        if (!n) {
 422                pr_warn("Node unsubscribe rejected, unknown node 0x%x\n", addr);
 423                return;
 424        }
 425        tipc_node_write_lock(n);
 426        list_del_init(subscr);
 427        tipc_node_write_unlock(n);
 428        tipc_node_put(n);
 429}
 430
 431int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)
 432{
 433        struct tipc_node *node;
 434        struct tipc_sock_conn *conn;
 435        int err = 0;
 436
 437        if (in_own_node(net, dnode))
 438                return 0;
 439
 440        node = tipc_node_find(net, dnode);
 441        if (!node) {
 442                pr_warn("Connecting sock to node 0x%x failed\n", dnode);
 443                return -EHOSTUNREACH;
 444        }
 445        conn = kmalloc(sizeof(*conn), GFP_ATOMIC);
 446        if (!conn) {
 447                err = -EHOSTUNREACH;
 448                goto exit;
 449        }
 450        conn->peer_node = dnode;
 451        conn->port = port;
 452        conn->peer_port = peer_port;
 453
 454        tipc_node_write_lock(node);
 455        list_add_tail(&conn->list, &node->conn_sks);
 456        tipc_node_write_unlock(node);
 457exit:
 458        tipc_node_put(node);
 459        return err;
 460}
 461
 462void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
 463{
 464        struct tipc_node *node;
 465        struct tipc_sock_conn *conn, *safe;
 466
 467        if (in_own_node(net, dnode))
 468                return;
 469
 470        node = tipc_node_find(net, dnode);
 471        if (!node)
 472                return;
 473
 474        tipc_node_write_lock(node);
 475        list_for_each_entry_safe(conn, safe, &node->conn_sks, list) {
 476                if (port != conn->port)
 477                        continue;
 478                list_del(&conn->list);
 479                kfree(conn);
 480        }
 481        tipc_node_write_unlock(node);
 482        tipc_node_put(node);
 483}
 484
 485/* tipc_node_timeout - handle expiration of node timer
 486 */
 487static void tipc_node_timeout(unsigned long data)
 488{
 489        struct tipc_node *n = (struct tipc_node *)data;
 490        struct tipc_link_entry *le;
 491        struct sk_buff_head xmitq;
 492        int bearer_id;
 493        int rc = 0;
 494
 495        __skb_queue_head_init(&xmitq);
 496
 497        for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
 498                tipc_node_read_lock(n);
 499                le = &n->links[bearer_id];
 500                spin_lock_bh(&le->lock);
 501                if (le->link) {
 502                        /* Link tolerance may change asynchronously: */
 503                        tipc_node_calculate_timer(n, le->link);
 504                        rc = tipc_link_timeout(le->link, &xmitq);
 505                }
 506                spin_unlock_bh(&le->lock);
 507                tipc_node_read_unlock(n);
 508                tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
 509                if (rc & TIPC_LINK_DOWN_EVT)
 510                        tipc_node_link_down(n, bearer_id, false);
 511        }
 512        mod_timer(&n->timer, jiffies + n->keepalive_intv);
 513}
 514
 515/**
 516 * __tipc_node_link_up - handle addition of link
 517 * Node lock must be held by caller
 518 * Link becomes active (alone or shared) or standby, depending on its priority.
 519 */
 520static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
 521                                struct sk_buff_head *xmitq)
 522{
 523        int *slot0 = &n->active_links[0];
 524        int *slot1 = &n->active_links[1];
 525        struct tipc_link *ol = node_active_link(n, 0);
 526        struct tipc_link *nl = n->links[bearer_id].link;
 527
 528        if (!nl)
 529                return;
 530
 531        tipc_link_fsm_evt(nl, LINK_ESTABLISH_EVT);
 532        if (!tipc_link_is_up(nl))
 533                return;
 534
 535        n->working_links++;
 536        n->action_flags |= TIPC_NOTIFY_LINK_UP;
 537        n->link_id = tipc_link_id(nl);
 538
 539        /* Leave room for tunnel header when returning 'mtu' to users: */
 540        n->links[bearer_id].mtu = tipc_link_mtu(nl) - INT_H_SIZE;
 541
 542        tipc_bearer_add_dest(n->net, bearer_id, n->addr);
 543        tipc_bcast_inc_bearer_dst_cnt(n->net, bearer_id);
 544
 545        pr_debug("Established link <%s> on network plane %c\n",
 546                 tipc_link_name(nl), tipc_link_plane(nl));
 547
 548        /* First link? => give it both slots */
 549        if (!ol) {
 550                *slot0 = bearer_id;
 551                *slot1 = bearer_id;
 552                tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT);
 553                n->action_flags |= TIPC_NOTIFY_NODE_UP;
 554                tipc_bcast_add_peer(n->net, nl, xmitq);
 555                return;
 556        }
 557
 558        /* Second link => redistribute slots */
 559        if (tipc_link_prio(nl) > tipc_link_prio(ol)) {
 560                pr_debug("Old link <%s> becomes standby\n", tipc_link_name(ol));
 561                *slot0 = bearer_id;
 562                *slot1 = bearer_id;
 563                tipc_link_set_active(nl, true);
 564                tipc_link_set_active(ol, false);
 565        } else if (tipc_link_prio(nl) == tipc_link_prio(ol)) {
 566                tipc_link_set_active(nl, true);
 567                *slot1 = bearer_id;
 568        } else {
 569                pr_debug("New link <%s> is standby\n", tipc_link_name(nl));
 570        }
 571
 572        /* Prepare synchronization with first link */
 573        tipc_link_tnl_prepare(ol, nl, SYNCH_MSG, xmitq);
 574}
 575
 576/**
 577 * tipc_node_link_up - handle addition of link
 578 *
 579 * Link becomes active (alone or shared) or standby, depending on its priority.
 580 */
 581static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
 582                              struct sk_buff_head *xmitq)
 583{
 584        tipc_node_write_lock(n);
 585        __tipc_node_link_up(n, bearer_id, xmitq);
 586        tipc_node_write_unlock(n);
 587}
 588
 589/**
 590 * __tipc_node_link_down - handle loss of link
 591 */
 592static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
 593                                  struct sk_buff_head *xmitq,
 594                                  struct tipc_media_addr **maddr)
 595{
 596        struct tipc_link_entry *le = &n->links[*bearer_id];
 597        int *slot0 = &n->active_links[0];
 598        int *slot1 = &n->active_links[1];
 599        int i, highest = 0, prio;
 600        struct tipc_link *l, *_l, *tnl;
 601
 602        l = n->links[*bearer_id].link;
 603        if (!l || tipc_link_is_reset(l))
 604                return;
 605
 606        n->working_links--;
 607        n->action_flags |= TIPC_NOTIFY_LINK_DOWN;
 608        n->link_id = tipc_link_id(l);
 609
 610        tipc_bearer_remove_dest(n->net, *bearer_id, n->addr);
 611
 612        pr_debug("Lost link <%s> on network plane %c\n",
 613                 tipc_link_name(l), tipc_link_plane(l));
 614
 615        /* Select new active link if any available */
 616        *slot0 = INVALID_BEARER_ID;
 617        *slot1 = INVALID_BEARER_ID;
 618        for (i = 0; i < MAX_BEARERS; i++) {
 619                _l = n->links[i].link;
 620                if (!_l || !tipc_link_is_up(_l))
 621                        continue;
 622                if (_l == l)
 623                        continue;
 624                prio = tipc_link_prio(_l);
 625                if (prio < highest)
 626                        continue;
 627                if (prio > highest) {
 628                        highest = prio;
 629                        *slot0 = i;
 630                        *slot1 = i;
 631                        continue;
 632                }
 633                *slot1 = i;
 634        }
 635
 636        if (!tipc_node_is_up(n)) {
 637                if (tipc_link_peer_is_down(l))
 638                        tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
 639                tipc_node_fsm_evt(n, SELF_LOST_CONTACT_EVT);
 640                tipc_link_fsm_evt(l, LINK_RESET_EVT);
 641                tipc_link_reset(l);
 642                tipc_link_build_reset_msg(l, xmitq);
 643                *maddr = &n->links[*bearer_id].maddr;
 644                node_lost_contact(n, &le->inputq);
 645                tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id);
 646                return;
 647        }
 648        tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id);
 649
 650        /* There is still a working link => initiate failover */
 651        *bearer_id = n->active_links[0];
 652        tnl = n->links[*bearer_id].link;
 653        tipc_link_fsm_evt(tnl, LINK_SYNCH_END_EVT);
 654        tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT);
 655        n->sync_point = tipc_link_rcv_nxt(tnl) + (U16_MAX / 2 - 1);
 656        tipc_link_tnl_prepare(l, tnl, FAILOVER_MSG, xmitq);
 657        tipc_link_reset(l);
 658        tipc_link_fsm_evt(l, LINK_RESET_EVT);
 659        tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
 660        tipc_node_fsm_evt(n, NODE_FAILOVER_BEGIN_EVT);
 661        *maddr = &n->links[*bearer_id].maddr;
 662}
 663
 664static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
 665{
 666        struct tipc_link_entry *le = &n->links[bearer_id];
 667        struct tipc_link *l = le->link;
 668        struct tipc_media_addr *maddr;
 669        struct sk_buff_head xmitq;
 670
 671        if (!l)
 672                return;
 673
 674        __skb_queue_head_init(&xmitq);
 675
 676        tipc_node_write_lock(n);
 677        if (!tipc_link_is_establishing(l)) {
 678                __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
 679                if (delete) {
 680                        kfree(l);
 681                        le->link = NULL;
 682                        n->link_cnt--;
 683                }
 684        } else {
 685                /* Defuse pending tipc_node_link_up() */
 686                tipc_link_fsm_evt(l, LINK_RESET_EVT);
 687        }
 688        tipc_node_write_unlock(n);
 689        tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
 690        tipc_sk_rcv(n->net, &le->inputq);
 691}
 692
 693static bool tipc_node_is_up(struct tipc_node *n)
 694{
 695        return n->active_links[0] != INVALID_BEARER_ID;
 696}
 697
 698void tipc_node_check_dest(struct net *net, u32 onode,
 699                          struct tipc_bearer *b,
 700                          u16 capabilities, u32 signature,
 701                          struct tipc_media_addr *maddr,
 702                          bool *respond, bool *dupl_addr)
 703{
 704        struct tipc_node *n;
 705        struct tipc_link *l;
 706        struct tipc_link_entry *le;
 707        bool addr_match = false;
 708        bool sign_match = false;
 709        bool link_up = false;
 710        bool accept_addr = false;
 711        bool reset = true;
 712        char *if_name;
 713
 714        *dupl_addr = false;
 715        *respond = false;
 716
 717        n = tipc_node_create(net, onode, capabilities);
 718        if (!n)
 719                return;
 720
 721        tipc_node_write_lock(n);
 722
 723        le = &n->links[b->identity];
 724
 725        /* Prepare to validate requesting node's signature and media address */
 726        l = le->link;
 727        link_up = l && tipc_link_is_up(l);
 728        addr_match = l && !memcmp(&le->maddr, maddr, sizeof(*maddr));
 729        sign_match = (signature == n->signature);
 730
 731        /* These three flags give us eight permutations: */
 732
 733        if (sign_match && addr_match && link_up) {
 734                /* All is fine. Do nothing. */
 735                reset = false;
 736        } else if (sign_match && addr_match && !link_up) {
 737                /* Respond. The link will come up in due time */
 738                *respond = true;
 739        } else if (sign_match && !addr_match && link_up) {
 740                /* Peer has changed i/f address without rebooting.
 741                 * If so, the link will reset soon, and the next
 742                 * discovery will be accepted. So we can ignore it.
 743                 * It may also be an cloned or malicious peer having
 744                 * chosen the same node address and signature as an
 745                 * existing one.
 746                 * Ignore requests until the link goes down, if ever.
 747                 */
 748                *dupl_addr = true;
 749        } else if (sign_match && !addr_match && !link_up) {
 750                /* Peer link has changed i/f address without rebooting.
 751                 * It may also be a cloned or malicious peer; we can't
 752                 * distinguish between the two.
 753                 * The signature is correct, so we must accept.
 754                 */
 755                accept_addr = true;
 756                *respond = true;
 757        } else if (!sign_match && addr_match && link_up) {
 758                /* Peer node rebooted. Two possibilities:
 759                 *  - Delayed re-discovery; this link endpoint has already
 760                 *    reset and re-established contact with the peer, before
 761                 *    receiving a discovery message from that node.
 762                 *    (The peer happened to receive one from this node first).
 763                 *  - The peer came back so fast that our side has not
 764                 *    discovered it yet. Probing from this side will soon
 765                 *    reset the link, since there can be no working link
 766                 *    endpoint at the peer end, and the link will re-establish.
 767                 *  Accept the signature, since it comes from a known peer.
 768                 */
 769                n->signature = signature;
 770        } else if (!sign_match && addr_match && !link_up) {
 771                /*  The peer node has rebooted.
 772                 *  Accept signature, since it is a known peer.
 773                 */
 774                n->signature = signature;
 775                *respond = true;
 776        } else if (!sign_match && !addr_match && link_up) {
 777                /* Peer rebooted with new address, or a new/duplicate peer.
 778                 * Ignore until the link goes down, if ever.
 779                 */
 780                *dupl_addr = true;
 781        } else if (!sign_match && !addr_match && !link_up) {
 782                /* Peer rebooted with new address, or it is a new peer.
 783                 * Accept signature and address.
 784                 */
 785                n->signature = signature;
 786                accept_addr = true;
 787                *respond = true;
 788        }
 789
 790        if (!accept_addr)
 791                goto exit;
 792
 793        /* Now create new link if not already existing */
 794        if (!l) {
 795                if (n->link_cnt == 2) {
 796                        pr_warn("Cannot establish 3rd link to %x\n", n->addr);
 797                        goto exit;
 798                }
 799                if_name = strchr(b->name, ':') + 1;
 800                if (!tipc_link_create(net, if_name, b->identity, b->tolerance,
 801                                      b->net_plane, b->mtu, b->priority,
 802                                      b->window, mod(tipc_net(net)->random),
 803                                      tipc_own_addr(net), onode,
 804                                      n->capabilities,
 805                                      tipc_bc_sndlink(n->net), n->bc_entry.link,
 806                                      &le->inputq,
 807                                      &n->bc_entry.namedq, &l)) {
 808                        *respond = false;
 809                        goto exit;
 810                }
 811                tipc_link_reset(l);
 812                tipc_link_fsm_evt(l, LINK_RESET_EVT);
 813                if (n->state == NODE_FAILINGOVER)
 814                        tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
 815                le->link = l;
 816                n->link_cnt++;
 817                tipc_node_calculate_timer(n, l);
 818                if (n->link_cnt == 1)
 819                        if (!mod_timer(&n->timer, jiffies + n->keepalive_intv))
 820                                tipc_node_get(n);
 821        }
 822        memcpy(&le->maddr, maddr, sizeof(*maddr));
 823exit:
 824        tipc_node_write_unlock(n);
 825        if (reset && l && !tipc_link_is_reset(l))
 826                tipc_node_link_down(n, b->identity, false);
 827        tipc_node_put(n);
 828}
 829
 830void tipc_node_delete_links(struct net *net, int bearer_id)
 831{
 832        struct tipc_net *tn = net_generic(net, tipc_net_id);
 833        struct tipc_node *n;
 834
 835        rcu_read_lock();
 836        list_for_each_entry_rcu(n, &tn->node_list, list) {
 837                tipc_node_link_down(n, bearer_id, true);
 838        }
 839        rcu_read_unlock();
 840}
 841
 842static void tipc_node_reset_links(struct tipc_node *n)
 843{
 844        char addr_string[16];
 845        int i;
 846
 847        pr_warn("Resetting all links to %s\n",
 848                tipc_addr_string_fill(addr_string, n->addr));
 849
 850        for (i = 0; i < MAX_BEARERS; i++) {
 851                tipc_node_link_down(n, i, false);
 852        }
 853}
 854
 855/* tipc_node_fsm_evt - node finite state machine
 856 * Determines when contact is allowed with peer node
 857 */
 858static void tipc_node_fsm_evt(struct tipc_node *n, int evt)
 859{
 860        int state = n->state;
 861
 862        switch (state) {
 863        case SELF_DOWN_PEER_DOWN:
 864                switch (evt) {
 865                case SELF_ESTABL_CONTACT_EVT:
 866                        state = SELF_UP_PEER_COMING;
 867                        break;
 868                case PEER_ESTABL_CONTACT_EVT:
 869                        state = SELF_COMING_PEER_UP;
 870                        break;
 871                case SELF_LOST_CONTACT_EVT:
 872                case PEER_LOST_CONTACT_EVT:
 873                        break;
 874                case NODE_SYNCH_END_EVT:
 875                case NODE_SYNCH_BEGIN_EVT:
 876                case NODE_FAILOVER_BEGIN_EVT:
 877                case NODE_FAILOVER_END_EVT:
 878                default:
 879                        goto illegal_evt;
 880                }
 881                break;
 882        case SELF_UP_PEER_UP:
 883                switch (evt) {
 884                case SELF_LOST_CONTACT_EVT:
 885                        state = SELF_DOWN_PEER_LEAVING;
 886                        break;
 887                case PEER_LOST_CONTACT_EVT:
 888                        state = SELF_LEAVING_PEER_DOWN;
 889                        break;
 890                case NODE_SYNCH_BEGIN_EVT:
 891                        state = NODE_SYNCHING;
 892                        break;
 893                case NODE_FAILOVER_BEGIN_EVT:
 894                        state = NODE_FAILINGOVER;
 895                        break;
 896                case SELF_ESTABL_CONTACT_EVT:
 897                case PEER_ESTABL_CONTACT_EVT:
 898                case NODE_SYNCH_END_EVT:
 899                case NODE_FAILOVER_END_EVT:
 900                        break;
 901                default:
 902                        goto illegal_evt;
 903                }
 904                break;
 905        case SELF_DOWN_PEER_LEAVING:
 906                switch (evt) {
 907                case PEER_LOST_CONTACT_EVT:
 908                        state = SELF_DOWN_PEER_DOWN;
 909                        break;
 910                case SELF_ESTABL_CONTACT_EVT:
 911                case PEER_ESTABL_CONTACT_EVT:
 912                case SELF_LOST_CONTACT_EVT:
 913                        break;
 914                case NODE_SYNCH_END_EVT:
 915                case NODE_SYNCH_BEGIN_EVT:
 916                case NODE_FAILOVER_BEGIN_EVT:
 917                case NODE_FAILOVER_END_EVT:
 918                default:
 919                        goto illegal_evt;
 920                }
 921                break;
 922        case SELF_UP_PEER_COMING:
 923                switch (evt) {
 924                case PEER_ESTABL_CONTACT_EVT:
 925                        state = SELF_UP_PEER_UP;
 926                        break;
 927                case SELF_LOST_CONTACT_EVT:
 928                        state = SELF_DOWN_PEER_LEAVING;
 929                        break;
 930                case SELF_ESTABL_CONTACT_EVT:
 931                case PEER_LOST_CONTACT_EVT:
 932                case NODE_SYNCH_END_EVT:
 933                case NODE_FAILOVER_BEGIN_EVT:
 934                        break;
 935                case NODE_SYNCH_BEGIN_EVT:
 936                case NODE_FAILOVER_END_EVT:
 937                default:
 938                        goto illegal_evt;
 939                }
 940                break;
 941        case SELF_COMING_PEER_UP:
 942                switch (evt) {
 943                case SELF_ESTABL_CONTACT_EVT:
 944                        state = SELF_UP_PEER_UP;
 945                        break;
 946                case PEER_LOST_CONTACT_EVT:
 947                        state = SELF_LEAVING_PEER_DOWN;
 948                        break;
 949                case SELF_LOST_CONTACT_EVT:
 950                case PEER_ESTABL_CONTACT_EVT:
 951                        break;
 952                case NODE_SYNCH_END_EVT:
 953                case NODE_SYNCH_BEGIN_EVT:
 954                case NODE_FAILOVER_BEGIN_EVT:
 955                case NODE_FAILOVER_END_EVT:
 956                default:
 957                        goto illegal_evt;
 958                }
 959                break;
 960        case SELF_LEAVING_PEER_DOWN:
 961                switch (evt) {
 962                case SELF_LOST_CONTACT_EVT:
 963                        state = SELF_DOWN_PEER_DOWN;
 964                        break;
 965                case SELF_ESTABL_CONTACT_EVT:
 966                case PEER_ESTABL_CONTACT_EVT:
 967                case PEER_LOST_CONTACT_EVT:
 968                        break;
 969                case NODE_SYNCH_END_EVT:
 970                case NODE_SYNCH_BEGIN_EVT:
 971                case NODE_FAILOVER_BEGIN_EVT:
 972                case NODE_FAILOVER_END_EVT:
 973                default:
 974                        goto illegal_evt;
 975                }
 976                break;
 977        case NODE_FAILINGOVER:
 978                switch (evt) {
 979                case SELF_LOST_CONTACT_EVT:
 980                        state = SELF_DOWN_PEER_LEAVING;
 981                        break;
 982                case PEER_LOST_CONTACT_EVT:
 983                        state = SELF_LEAVING_PEER_DOWN;
 984                        break;
 985                case NODE_FAILOVER_END_EVT:
 986                        state = SELF_UP_PEER_UP;
 987                        break;
 988                case NODE_FAILOVER_BEGIN_EVT:
 989                case SELF_ESTABL_CONTACT_EVT:
 990                case PEER_ESTABL_CONTACT_EVT:
 991                        break;
 992                case NODE_SYNCH_BEGIN_EVT:
 993                case NODE_SYNCH_END_EVT:
 994                default:
 995                        goto illegal_evt;
 996                }
 997                break;
 998        case NODE_SYNCHING:
 999                switch (evt) {
1000                case SELF_LOST_CONTACT_EVT:
1001                        state = SELF_DOWN_PEER_LEAVING;
1002                        break;
1003                case PEER_LOST_CONTACT_EVT:
1004                        state = SELF_LEAVING_PEER_DOWN;
1005                        break;
1006                case NODE_SYNCH_END_EVT:
1007                        state = SELF_UP_PEER_UP;
1008                        break;
1009                case NODE_FAILOVER_BEGIN_EVT:
1010                        state = NODE_FAILINGOVER;
1011                        break;
1012                case NODE_SYNCH_BEGIN_EVT:
1013                case SELF_ESTABL_CONTACT_EVT:
1014                case PEER_ESTABL_CONTACT_EVT:
1015                        break;
1016                case NODE_FAILOVER_END_EVT:
1017                default:
1018                        goto illegal_evt;
1019                }
1020                break;
1021        default:
1022                pr_err("Unknown node fsm state %x\n", state);
1023                break;
1024        }
1025        n->state = state;
1026        return;
1027
1028illegal_evt:
1029        pr_err("Illegal node fsm evt %x in state %x\n", evt, state);
1030}
1031
1032static void node_lost_contact(struct tipc_node *n,
1033                              struct sk_buff_head *inputq)
1034{
1035        char addr_string[16];
1036        struct tipc_sock_conn *conn, *safe;
1037        struct tipc_link *l;
1038        struct list_head *conns = &n->conn_sks;
1039        struct sk_buff *skb;
1040        uint i;
1041
1042        pr_debug("Lost contact with %s\n",
1043                 tipc_addr_string_fill(addr_string, n->addr));
1044
1045        /* Clean up broadcast state */
1046        tipc_bcast_remove_peer(n->net, n->bc_entry.link);
1047
1048        /* Abort any ongoing link failover */
1049        for (i = 0; i < MAX_BEARERS; i++) {
1050                l = n->links[i].link;
1051                if (l)
1052                        tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT);
1053        }
1054
1055        /* Notify publications from this node */
1056        n->action_flags |= TIPC_NOTIFY_NODE_DOWN;
1057
1058        /* Notify sockets connected to node */
1059        list_for_each_entry_safe(conn, safe, conns, list) {
1060                skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
1061                                      SHORT_H_SIZE, 0, tipc_own_addr(n->net),
1062                                      conn->peer_node, conn->port,
1063                                      conn->peer_port, TIPC_ERR_NO_NODE);
1064                if (likely(skb))
1065                        skb_queue_tail(inputq, skb);
1066                list_del(&conn->list);
1067                kfree(conn);
1068        }
1069}
1070
1071/**
1072 * tipc_node_get_linkname - get the name of a link
1073 *
1074 * @bearer_id: id of the bearer
1075 * @node: peer node address
1076 * @linkname: link name output buffer
1077 *
1078 * Returns 0 on success
1079 */
1080int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr,
1081                           char *linkname, size_t len)
1082{
1083        struct tipc_link *link;
1084        int err = -EINVAL;
1085        struct tipc_node *node = tipc_node_find(net, addr);
1086
1087        if (!node)
1088                return err;
1089
1090        if (bearer_id >= MAX_BEARERS)
1091                goto exit;
1092
1093        tipc_node_read_lock(node);
1094        link = node->links[bearer_id].link;
1095        if (link) {
1096                strncpy(linkname, tipc_link_name(link), len);
1097                err = 0;
1098        }
1099exit:
1100        tipc_node_read_unlock(node);
1101        tipc_node_put(node);
1102        return err;
1103}
1104
1105/* Caller should hold node lock for the passed node */
1106static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
1107{
1108        void *hdr;
1109        struct nlattr *attrs;
1110
1111        hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
1112                          NLM_F_MULTI, TIPC_NL_NODE_GET);
1113        if (!hdr)
1114                return -EMSGSIZE;
1115
1116        attrs = nla_nest_start(msg->skb, TIPC_NLA_NODE);
1117        if (!attrs)
1118                goto msg_full;
1119
1120        if (nla_put_u32(msg->skb, TIPC_NLA_NODE_ADDR, node->addr))
1121                goto attr_msg_full;
1122        if (tipc_node_is_up(node))
1123                if (nla_put_flag(msg->skb, TIPC_NLA_NODE_UP))
1124                        goto attr_msg_full;
1125
1126        nla_nest_end(msg->skb, attrs);
1127        genlmsg_end(msg->skb, hdr);
1128
1129        return 0;
1130
1131attr_msg_full:
1132        nla_nest_cancel(msg->skb, attrs);
1133msg_full:
1134        genlmsg_cancel(msg->skb, hdr);
1135
1136        return -EMSGSIZE;
1137}
1138
1139/**
1140 * tipc_node_xmit() is the general link level function for message sending
1141 * @net: the applicable net namespace
1142 * @list: chain of buffers containing message
1143 * @dnode: address of destination node
1144 * @selector: a number used for deterministic link selection
1145 * Consumes the buffer chain, except when returning -ELINKCONG
1146 * Returns 0 if success, otherwise: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE,-ENOBUF
1147 */
1148int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
1149                   u32 dnode, int selector)
1150{
1151        struct tipc_link_entry *le = NULL;
1152        struct tipc_node *n;
1153        struct sk_buff_head xmitq;
1154        int bearer_id;
1155        int rc;
1156
1157        if (in_own_node(net, dnode)) {
1158                tipc_sk_rcv(net, list);
1159                return 0;
1160        }
1161
1162        n = tipc_node_find(net, dnode);
1163        if (unlikely(!n)) {
1164                skb_queue_purge(list);
1165                return -EHOSTUNREACH;
1166        }
1167
1168        tipc_node_read_lock(n);
1169        bearer_id = n->active_links[selector & 1];
1170        if (unlikely(bearer_id == INVALID_BEARER_ID)) {
1171                tipc_node_read_unlock(n);
1172                tipc_node_put(n);
1173                skb_queue_purge(list);
1174                return -EHOSTUNREACH;
1175        }
1176
1177        __skb_queue_head_init(&xmitq);
1178        le = &n->links[bearer_id];
1179        spin_lock_bh(&le->lock);
1180        rc = tipc_link_xmit(le->link, list, &xmitq);
1181        spin_unlock_bh(&le->lock);
1182        tipc_node_read_unlock(n);
1183
1184        if (likely(rc == 0))
1185                tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1186        else if (rc == -ENOBUFS)
1187                tipc_node_link_down(n, bearer_id, false);
1188
1189        tipc_node_put(n);
1190
1191        return rc;
1192}
1193
1194/* tipc_node_xmit_skb(): send single buffer to destination
1195 * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
1196 * messages, which will not be rejected
1197 * The only exception is datagram messages rerouted after secondary
1198 * lookup, which are rare and safe to dispose of anyway.
1199 * TODO: Return real return value, and let callers use
1200 * tipc_wait_for_sendpkt() where applicable
1201 */
1202int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
1203                       u32 selector)
1204{
1205        struct sk_buff_head head;
1206        int rc;
1207
1208        skb_queue_head_init(&head);
1209        __skb_queue_tail(&head, skb);
1210        rc = tipc_node_xmit(net, &head, dnode, selector);
1211        if (rc == -ELINKCONG)
1212                kfree_skb(skb);
1213        return 0;
1214}
1215
1216void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
1217{
1218        struct sk_buff *txskb;
1219        struct tipc_node *n;
1220        u32 dst;
1221
1222        rcu_read_lock();
1223        list_for_each_entry_rcu(n, tipc_nodes(net), list) {
1224                dst = n->addr;
1225                if (in_own_node(net, dst))
1226                        continue;
1227                if (!tipc_node_is_up(n))
1228                        continue;
1229                txskb = pskb_copy(skb, GFP_ATOMIC);
1230                if (!txskb)
1231                        break;
1232                msg_set_destnode(buf_msg(txskb), dst);
1233                tipc_node_xmit_skb(net, txskb, dst, 0);
1234        }
1235        rcu_read_unlock();
1236
1237        kfree_skb(skb);
1238}
1239
1240/**
1241 * tipc_node_bc_rcv - process TIPC broadcast packet arriving from off-node
1242 * @net: the applicable net namespace
1243 * @skb: TIPC packet
1244 * @bearer_id: id of bearer message arrived on
1245 *
1246 * Invoked with no locks held.
1247 */
1248static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id)
1249{
1250        int rc;
1251        struct sk_buff_head xmitq;
1252        struct tipc_bclink_entry *be;
1253        struct tipc_link_entry *le;
1254        struct tipc_msg *hdr = buf_msg(skb);
1255        int usr = msg_user(hdr);
1256        u32 dnode = msg_destnode(hdr);
1257        struct tipc_node *n;
1258
1259        __skb_queue_head_init(&xmitq);
1260
1261        /* If NACK for other node, let rcv link for that node peek into it */
1262        if ((usr == BCAST_PROTOCOL) && (dnode != tipc_own_addr(net)))
1263                n = tipc_node_find(net, dnode);
1264        else
1265                n = tipc_node_find(net, msg_prevnode(hdr));
1266        if (!n) {
1267                kfree_skb(skb);
1268                return;
1269        }
1270        be = &n->bc_entry;
1271        le = &n->links[bearer_id];
1272
1273        rc = tipc_bcast_rcv(net, be->link, skb);
1274
1275        /* Broadcast link reset may happen at reassembly failure */
1276        if (rc & TIPC_LINK_DOWN_EVT)
1277                tipc_node_reset_links(n);
1278
1279        /* Broadcast ACKs are sent on a unicast link */
1280        if (rc & TIPC_LINK_SND_BC_ACK) {
1281                tipc_node_read_lock(n);
1282                tipc_link_build_ack_msg(le->link, &xmitq);
1283                tipc_node_read_unlock(n);
1284        }
1285
1286        if (!skb_queue_empty(&xmitq))
1287                tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1288
1289        /* Deliver. 'arrvq' is under inputq2's lock protection */
1290        if (!skb_queue_empty(&be->inputq1)) {
1291                spin_lock_bh(&be->inputq2.lock);
1292                spin_lock_bh(&be->inputq1.lock);
1293                skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
1294                spin_unlock_bh(&be->inputq1.lock);
1295                spin_unlock_bh(&be->inputq2.lock);
1296                tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
1297        }
1298        tipc_node_put(n);
1299}
1300
1301/**
1302 * tipc_node_check_state - check and if necessary update node state
1303 * @skb: TIPC packet
1304 * @bearer_id: identity of bearer delivering the packet
1305 * Returns true if state is ok, otherwise consumes buffer and returns false
1306 */
1307static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
1308                                  int bearer_id, struct sk_buff_head *xmitq)
1309{
1310        struct tipc_msg *hdr = buf_msg(skb);
1311        int usr = msg_user(hdr);
1312        int mtyp = msg_type(hdr);
1313        u16 oseqno = msg_seqno(hdr);
1314        u16 iseqno = msg_seqno(msg_get_wrapped(hdr));
1315        u16 exp_pkts = msg_msgcnt(hdr);
1316        u16 rcv_nxt, syncpt, dlv_nxt, inputq_len;
1317        int state = n->state;
1318        struct tipc_link *l, *tnl, *pl = NULL;
1319        struct tipc_media_addr *maddr;
1320        int pb_id;
1321
1322        l = n->links[bearer_id].link;
1323        if (!l)
1324                return false;
1325        rcv_nxt = tipc_link_rcv_nxt(l);
1326
1327
1328        if (likely((state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL)))
1329                return true;
1330
1331        /* Find parallel link, if any */
1332        for (pb_id = 0; pb_id < MAX_BEARERS; pb_id++) {
1333                if ((pb_id != bearer_id) && n->links[pb_id].link) {
1334                        pl = n->links[pb_id].link;
1335                        break;
1336                }
1337        }
1338
1339        /* Check and update node accesibility if applicable */
1340        if (state == SELF_UP_PEER_COMING) {
1341                if (!tipc_link_is_up(l))
1342                        return true;
1343                if (!msg_peer_link_is_up(hdr))
1344                        return true;
1345                tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT);
1346        }
1347
1348        if (state == SELF_DOWN_PEER_LEAVING) {
1349                if (msg_peer_node_is_up(hdr))
1350                        return false;
1351                tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
1352                return true;
1353        }
1354
1355        if (state == SELF_LEAVING_PEER_DOWN)
1356                return false;
1357
1358        /* Ignore duplicate packets */
1359        if ((usr != LINK_PROTOCOL) && less(oseqno, rcv_nxt))
1360                return true;
1361
1362        /* Initiate or update failover mode if applicable */
1363        if ((usr == TUNNEL_PROTOCOL) && (mtyp == FAILOVER_MSG)) {
1364                syncpt = oseqno + exp_pkts - 1;
1365                if (pl && tipc_link_is_up(pl)) {
1366                        __tipc_node_link_down(n, &pb_id, xmitq, &maddr);
1367                        tipc_skb_queue_splice_tail_init(tipc_link_inputq(pl),
1368                                                        tipc_link_inputq(l));
1369                }
1370                /* If pkts arrive out of order, use lowest calculated syncpt */
1371                if (less(syncpt, n->sync_point))
1372                        n->sync_point = syncpt;
1373        }
1374
1375        /* Open parallel link when tunnel link reaches synch point */
1376        if ((n->state == NODE_FAILINGOVER) && tipc_link_is_up(l)) {
1377                if (!more(rcv_nxt, n->sync_point))
1378                        return true;
1379                tipc_node_fsm_evt(n, NODE_FAILOVER_END_EVT);
1380                if (pl)
1381                        tipc_link_fsm_evt(pl, LINK_FAILOVER_END_EVT);
1382                return true;
1383        }
1384
1385        /* No synching needed if only one link */
1386        if (!pl || !tipc_link_is_up(pl))
1387                return true;
1388
1389        /* Initiate synch mode if applicable */
1390        if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG) && (oseqno == 1)) {
1391                syncpt = iseqno + exp_pkts - 1;
1392                if (!tipc_link_is_up(l)) {
1393                        tipc_link_fsm_evt(l, LINK_ESTABLISH_EVT);
1394                        __tipc_node_link_up(n, bearer_id, xmitq);
1395                }
1396                if (n->state == SELF_UP_PEER_UP) {
1397                        n->sync_point = syncpt;
1398                        tipc_link_fsm_evt(l, LINK_SYNCH_BEGIN_EVT);
1399                        tipc_node_fsm_evt(n, NODE_SYNCH_BEGIN_EVT);
1400                }
1401        }
1402
1403        /* Open tunnel link when parallel link reaches synch point */
1404        if (n->state == NODE_SYNCHING) {
1405                if (tipc_link_is_synching(l)) {
1406                        tnl = l;
1407                } else {
1408                        tnl = pl;
1409                        pl = l;
1410                }
1411                inputq_len = skb_queue_len(tipc_link_inputq(pl));
1412                dlv_nxt = tipc_link_rcv_nxt(pl) - inputq_len;
1413                if (more(dlv_nxt, n->sync_point)) {
1414                        tipc_link_fsm_evt(tnl, LINK_SYNCH_END_EVT);
1415                        tipc_node_fsm_evt(n, NODE_SYNCH_END_EVT);
1416                        return true;
1417                }
1418                if (l == pl)
1419                        return true;
1420                if ((usr == TUNNEL_PROTOCOL) && (mtyp == SYNCH_MSG))
1421                        return true;
1422                if (usr == LINK_PROTOCOL)
1423                        return true;
1424                return false;
1425        }
1426        return true;
1427}
1428
1429/**
1430 * tipc_rcv - process TIPC packets/messages arriving from off-node
1431 * @net: the applicable net namespace
1432 * @skb: TIPC packet
1433 * @bearer: pointer to bearer message arrived on
1434 *
1435 * Invoked with no locks held. Bearer pointer must point to a valid bearer
1436 * structure (i.e. cannot be NULL), but bearer can be inactive.
1437 */
1438void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1439{
1440        struct sk_buff_head xmitq;
1441        struct tipc_node *n;
1442        struct tipc_msg *hdr = buf_msg(skb);
1443        int usr = msg_user(hdr);
1444        int bearer_id = b->identity;
1445        struct tipc_link_entry *le;
1446        u16 bc_ack = msg_bcast_ack(hdr);
1447        u32 self = tipc_own_addr(net);
1448        int rc = 0;
1449
1450        __skb_queue_head_init(&xmitq);
1451
1452        /* Ensure message is well-formed */
1453        if (unlikely(!tipc_msg_validate(skb)))
1454                goto discard;
1455
1456        /* Handle arrival of discovery or broadcast packet */
1457        if (unlikely(msg_non_seq(hdr))) {
1458                if (unlikely(usr == LINK_CONFIG))
1459                        return tipc_disc_rcv(net, skb, b);
1460                else
1461                        return tipc_node_bc_rcv(net, skb, bearer_id);
1462        }
1463
1464        /* Discard unicast link messages destined for another node */
1465        if (unlikely(!msg_short(hdr) && (msg_destnode(hdr) != self)))
1466                goto discard;
1467
1468        /* Locate neighboring node that sent packet */
1469        n = tipc_node_find(net, msg_prevnode(hdr));
1470        if (unlikely(!n))
1471                goto discard;
1472        le = &n->links[bearer_id];
1473
1474        /* Ensure broadcast reception is in synch with peer's send state */
1475        if (unlikely(usr == LINK_PROTOCOL))
1476                tipc_bcast_sync_rcv(net, n->bc_entry.link, hdr);
1477        else if (unlikely(tipc_link_acked(n->bc_entry.link) != bc_ack))
1478                tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);
1479
1480        /* Receive packet directly if conditions permit */
1481        tipc_node_read_lock(n);
1482        if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) {
1483                spin_lock_bh(&le->lock);
1484                if (le->link) {
1485                        rc = tipc_link_rcv(le->link, skb, &xmitq);
1486                        skb = NULL;
1487                }
1488                spin_unlock_bh(&le->lock);
1489        }
1490        tipc_node_read_unlock(n);
1491
1492        /* Check/update node state before receiving */
1493        if (unlikely(skb)) {
1494                tipc_node_write_lock(n);
1495                if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) {
1496                        if (le->link) {
1497                                rc = tipc_link_rcv(le->link, skb, &xmitq);
1498                                skb = NULL;
1499                        }
1500                }
1501                tipc_node_write_unlock(n);
1502        }
1503
1504        if (unlikely(rc & TIPC_LINK_UP_EVT))
1505                tipc_node_link_up(n, bearer_id, &xmitq);
1506
1507        if (unlikely(rc & TIPC_LINK_DOWN_EVT))
1508                tipc_node_link_down(n, bearer_id, false);
1509
1510        if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
1511                tipc_named_rcv(net, &n->bc_entry.namedq);
1512
1513        if (!skb_queue_empty(&le->inputq))
1514                tipc_sk_rcv(net, &le->inputq);
1515
1516        if (!skb_queue_empty(&xmitq))
1517                tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1518
1519        tipc_node_put(n);
1520discard:
1521        kfree_skb(skb);
1522}
1523
1524int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
1525{
1526        int err;
1527        struct net *net = sock_net(skb->sk);
1528        struct tipc_net *tn = net_generic(net, tipc_net_id);
1529        int done = cb->args[0];
1530        int last_addr = cb->args[1];
1531        struct tipc_node *node;
1532        struct tipc_nl_msg msg;
1533
1534        if (done)
1535                return 0;
1536
1537        msg.skb = skb;
1538        msg.portid = NETLINK_CB(cb->skb).portid;
1539        msg.seq = cb->nlh->nlmsg_seq;
1540
1541        rcu_read_lock();
1542        if (last_addr) {
1543                node = tipc_node_find(net, last_addr);
1544                if (!node) {
1545                        rcu_read_unlock();
1546                        /* We never set seq or call nl_dump_check_consistent()
1547                         * this means that setting prev_seq here will cause the
1548                         * consistence check to fail in the netlink callback
1549                         * handler. Resulting in the NLMSG_DONE message having
1550                         * the NLM_F_DUMP_INTR flag set if the node state
1551                         * changed while we released the lock.
1552                         */
1553                        cb->prev_seq = 1;
1554                        return -EPIPE;
1555                }
1556                tipc_node_put(node);
1557        }
1558
1559        list_for_each_entry_rcu(node, &tn->node_list, list) {
1560                if (last_addr) {
1561                        if (node->addr == last_addr)
1562                                last_addr = 0;
1563                        else
1564                                continue;
1565                }
1566
1567                tipc_node_read_lock(node);
1568                err = __tipc_nl_add_node(&msg, node);
1569                if (err) {
1570                        last_addr = node->addr;
1571                        tipc_node_read_unlock(node);
1572                        goto out;
1573                }
1574
1575                tipc_node_read_unlock(node);
1576        }
1577        done = 1;
1578out:
1579        cb->args[0] = done;
1580        cb->args[1] = last_addr;
1581        rcu_read_unlock();
1582
1583        return skb->len;
1584}
1585
1586/* tipc_node_find_by_name - locate owner node of link by link's name
1587 * @net: the applicable net namespace
1588 * @name: pointer to link name string
1589 * @bearer_id: pointer to index in 'node->links' array where the link was found.
1590 *
1591 * Returns pointer to node owning the link, or 0 if no matching link is found.
1592 */
1593static struct tipc_node *tipc_node_find_by_name(struct net *net,
1594                                                const char *link_name,
1595                                                unsigned int *bearer_id)
1596{
1597        struct tipc_net *tn = net_generic(net, tipc_net_id);
1598        struct tipc_link *l;
1599        struct tipc_node *n;
1600        struct tipc_node *found_node = NULL;
1601        int i;
1602
1603        *bearer_id = 0;
1604        rcu_read_lock();
1605        list_for_each_entry_rcu(n, &tn->node_list, list) {
1606                tipc_node_read_lock(n);
1607                for (i = 0; i < MAX_BEARERS; i++) {
1608                        l = n->links[i].link;
1609                        if (l && !strcmp(tipc_link_name(l), link_name)) {
1610                                *bearer_id = i;
1611                                found_node = n;
1612                                break;
1613                        }
1614                }
1615                tipc_node_read_unlock(n);
1616                if (found_node)
1617                        break;
1618        }
1619        rcu_read_unlock();
1620
1621        return found_node;
1622}
1623
1624int tipc_nl_node_set_link(struct sk_buff *skb, struct genl_info *info)
1625{
1626        int err;
1627        int res = 0;
1628        int bearer_id;
1629        char *name;
1630        struct tipc_link *link;
1631        struct tipc_node *node;
1632        struct sk_buff_head xmitq;
1633        struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
1634        struct net *net = sock_net(skb->sk);
1635
1636        __skb_queue_head_init(&xmitq);
1637
1638        if (!info->attrs[TIPC_NLA_LINK])
1639                return -EINVAL;
1640
1641        err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
1642                               info->attrs[TIPC_NLA_LINK],
1643                               tipc_nl_link_policy);
1644        if (err)
1645                return err;
1646
1647        if (!attrs[TIPC_NLA_LINK_NAME])
1648                return -EINVAL;
1649
1650        name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
1651
1652        if (strcmp(name, tipc_bclink_name) == 0)
1653                return tipc_nl_bc_link_set(net, attrs);
1654
1655        node = tipc_node_find_by_name(net, name, &bearer_id);
1656        if (!node)
1657                return -EINVAL;
1658
1659        tipc_node_read_lock(node);
1660
1661        link = node->links[bearer_id].link;
1662        if (!link) {
1663                res = -EINVAL;
1664                goto out;
1665        }
1666
1667        if (attrs[TIPC_NLA_LINK_PROP]) {
1668                struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
1669
1670                err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP],
1671                                              props);
1672                if (err) {
1673                        res = err;
1674                        goto out;
1675                }
1676
1677                if (props[TIPC_NLA_PROP_TOL]) {
1678                        u32 tol;
1679
1680                        tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
1681                        tipc_link_set_tolerance(link, tol, &xmitq);
1682                }
1683                if (props[TIPC_NLA_PROP_PRIO]) {
1684                        u32 prio;
1685
1686                        prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
1687                        tipc_link_set_prio(link, prio, &xmitq);
1688                }
1689                if (props[TIPC_NLA_PROP_WIN]) {
1690                        u32 win;
1691
1692                        win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
1693                        tipc_link_set_queue_limits(link, win);
1694                }
1695        }
1696
1697out:
1698        tipc_node_read_unlock(node);
1699        tipc_bearer_xmit(net, bearer_id, &xmitq, &node->links[bearer_id].maddr);
1700        return res;
1701}
1702
1703int tipc_nl_node_get_link(struct sk_buff *skb, struct genl_info *info)
1704{
1705        struct net *net = genl_info_net(info);
1706        struct tipc_nl_msg msg;
1707        char *name;
1708        int err;
1709
1710        msg.portid = info->snd_portid;
1711        msg.seq = info->snd_seq;
1712
1713        if (!info->attrs[TIPC_NLA_LINK_NAME])
1714                return -EINVAL;
1715        name = nla_data(info->attrs[TIPC_NLA_LINK_NAME]);
1716
1717        msg.skb = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL);
1718        if (!msg.skb)
1719                return -ENOMEM;
1720
1721        if (strcmp(name, tipc_bclink_name) == 0) {
1722                err = tipc_nl_add_bc_link(net, &msg);
1723                if (err) {
1724                        nlmsg_free(msg.skb);
1725                        return err;
1726                }
1727        } else {
1728                int bearer_id;
1729                struct tipc_node *node;
1730                struct tipc_link *link;
1731
1732                node = tipc_node_find_by_name(net, name, &bearer_id);
1733                if (!node)
1734                        return -EINVAL;
1735
1736                tipc_node_read_lock(node);
1737                link = node->links[bearer_id].link;
1738                if (!link) {
1739                        tipc_node_read_unlock(node);
1740                        nlmsg_free(msg.skb);
1741                        return -EINVAL;
1742                }
1743
1744                err = __tipc_nl_add_link(net, &msg, link, 0);
1745                tipc_node_read_unlock(node);
1746                if (err) {
1747                        nlmsg_free(msg.skb);
1748                        return err;
1749                }
1750        }
1751
1752        return genlmsg_reply(msg.skb, info);
1753}
1754
1755int tipc_nl_node_reset_link_stats(struct sk_buff *skb, struct genl_info *info)
1756{
1757        int err;
1758        char *link_name;
1759        unsigned int bearer_id;
1760        struct tipc_link *link;
1761        struct tipc_node *node;
1762        struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
1763        struct net *net = sock_net(skb->sk);
1764        struct tipc_link_entry *le;
1765
1766        if (!info->attrs[TIPC_NLA_LINK])
1767                return -EINVAL;
1768
1769        err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
1770                               info->attrs[TIPC_NLA_LINK],
1771                               tipc_nl_link_policy);
1772        if (err)
1773                return err;
1774
1775        if (!attrs[TIPC_NLA_LINK_NAME])
1776                return -EINVAL;
1777
1778        link_name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
1779
1780        if (strcmp(link_name, tipc_bclink_name) == 0) {
1781                err = tipc_bclink_reset_stats(net);
1782                if (err)
1783                        return err;
1784                return 0;
1785        }
1786
1787        node = tipc_node_find_by_name(net, link_name, &bearer_id);
1788        if (!node)
1789                return -EINVAL;
1790
1791        le = &node->links[bearer_id];
1792        tipc_node_read_lock(node);
1793        spin_lock_bh(&le->lock);
1794        link = node->links[bearer_id].link;
1795        if (!link) {
1796                spin_unlock_bh(&le->lock);
1797                tipc_node_read_unlock(node);
1798                return -EINVAL;
1799        }
1800        tipc_link_reset_stats(link);
1801        spin_unlock_bh(&le->lock);
1802        tipc_node_read_unlock(node);
1803        return 0;
1804}
1805
1806/* Caller should hold node lock  */
1807static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,
1808                                    struct tipc_node *node, u32 *prev_link)
1809{
1810        u32 i;
1811        int err;
1812
1813        for (i = *prev_link; i < MAX_BEARERS; i++) {
1814                *prev_link = i;
1815
1816                if (!node->links[i].link)
1817                        continue;
1818
1819                err = __tipc_nl_add_link(net, msg,
1820                                         node->links[i].link, NLM_F_MULTI);
1821                if (err)
1822                        return err;
1823        }
1824        *prev_link = 0;
1825
1826        return 0;
1827}
1828
1829int tipc_nl_node_dump_link(struct sk_buff *skb, struct netlink_callback *cb)
1830{
1831        struct net *net = sock_net(skb->sk);
1832        struct tipc_net *tn = net_generic(net, tipc_net_id);
1833        struct tipc_node *node;
1834        struct tipc_nl_msg msg;
1835        u32 prev_node = cb->args[0];
1836        u32 prev_link = cb->args[1];
1837        int done = cb->args[2];
1838        int err;
1839
1840        if (done)
1841                return 0;
1842
1843        msg.skb = skb;
1844        msg.portid = NETLINK_CB(cb->skb).portid;
1845        msg.seq = cb->nlh->nlmsg_seq;
1846
1847        rcu_read_lock();
1848        if (prev_node) {
1849                node = tipc_node_find(net, prev_node);
1850                if (!node) {
1851                        /* We never set seq or call nl_dump_check_consistent()
1852                         * this means that setting prev_seq here will cause the
1853                         * consistence check to fail in the netlink callback
1854                         * handler. Resulting in the last NLMSG_DONE message
1855                         * having the NLM_F_DUMP_INTR flag set.
1856                         */
1857                        cb->prev_seq = 1;
1858                        goto out;
1859                }
1860                tipc_node_put(node);
1861
1862                list_for_each_entry_continue_rcu(node, &tn->node_list,
1863                                                 list) {
1864                        tipc_node_read_lock(node);
1865                        err = __tipc_nl_add_node_links(net, &msg, node,
1866                                                       &prev_link);
1867                        tipc_node_read_unlock(node);
1868                        if (err)
1869                                goto out;
1870
1871                        prev_node = node->addr;
1872                }
1873        } else {
1874                err = tipc_nl_add_bc_link(net, &msg);
1875                if (err)
1876                        goto out;
1877
1878                list_for_each_entry_rcu(node, &tn->node_list, list) {
1879                        tipc_node_read_lock(node);
1880                        err = __tipc_nl_add_node_links(net, &msg, node,
1881                                                       &prev_link);
1882                        tipc_node_read_unlock(node);
1883                        if (err)
1884                                goto out;
1885
1886                        prev_node = node->addr;
1887                }
1888        }
1889        done = 1;
1890out:
1891        rcu_read_unlock();
1892
1893        cb->args[0] = prev_node;
1894        cb->args[1] = prev_link;
1895        cb->args[2] = done;
1896
1897        return skb->len;
1898}
1899