linux/net/tipc/bcast.c
<<
>>
Prefs
   1/*
   2 * net/tipc/bcast.c: TIPC broadcast code
   3 *
   4 * Copyright (c) 2004-2006, 2014-2015, Ericsson AB
   5 * Copyright (c) 2004, Intel Corporation.
   6 * Copyright (c) 2005, 2010-2011, Wind River Systems
   7 * All rights reserved.
   8 *
   9 * Redistribution and use in source and binary forms, with or without
  10 * modification, are permitted provided that the following conditions are met:
  11 *
  12 * 1. Redistributions of source code must retain the above copyright
  13 *    notice, this list of conditions and the following disclaimer.
  14 * 2. Redistributions in binary form must reproduce the above copyright
  15 *    notice, this list of conditions and the following disclaimer in the
  16 *    documentation and/or other materials provided with the distribution.
  17 * 3. Neither the names of the copyright holders nor the names of its
  18 *    contributors may be used to endorse or promote products derived from
  19 *    this software without specific prior written permission.
  20 *
  21 * Alternatively, this software may be distributed under the terms of the
  22 * GNU General Public License ("GPL") version 2 as published by the Free
  23 * Software Foundation.
  24 *
  25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  35 * POSSIBILITY OF SUCH DAMAGE.
  36 */
  37
  38#include "socket.h"
  39#include "msg.h"
  40#include "bcast.h"
  41#include "name_distr.h"
  42#include "core.h"
  43
  44#define MAX_PKT_DEFAULT_MCAST   1500    /* bcast link max packet size (fixed) */
  45#define BCLINK_WIN_DEFAULT      20      /* bcast link window size (default) */
  46
  47const char tipc_bclink_name[] = "broadcast-link";
  48
  49static void tipc_nmap_diff(struct tipc_node_map *nm_a,
  50                           struct tipc_node_map *nm_b,
  51                           struct tipc_node_map *nm_diff);
  52static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
  53static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
  54
  55static void tipc_bclink_lock(struct net *net)
  56{
  57        struct tipc_net *tn = net_generic(net, tipc_net_id);
  58
  59        spin_lock_bh(&tn->bclink->lock);
  60}
  61
  62static void tipc_bclink_unlock(struct net *net)
  63{
  64        struct tipc_net *tn = net_generic(net, tipc_net_id);
  65        struct tipc_node *node = NULL;
  66
  67        if (likely(!tn->bclink->flags)) {
  68                spin_unlock_bh(&tn->bclink->lock);
  69                return;
  70        }
  71
  72        if (tn->bclink->flags & TIPC_BCLINK_RESET) {
  73                tn->bclink->flags &= ~TIPC_BCLINK_RESET;
  74                node = tipc_bclink_retransmit_to(net);
  75        }
  76        spin_unlock_bh(&tn->bclink->lock);
  77
  78        if (node)
  79                tipc_link_reset_all(node);
  80}
  81
  82void tipc_bclink_input(struct net *net)
  83{
  84        struct tipc_net *tn = net_generic(net, tipc_net_id);
  85
  86        tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq);
  87}
  88
  89uint  tipc_bclink_get_mtu(void)
  90{
  91        return MAX_PKT_DEFAULT_MCAST;
  92}
  93
  94void tipc_bclink_set_flags(struct net *net, unsigned int flags)
  95{
  96        struct tipc_net *tn = net_generic(net, tipc_net_id);
  97
  98        tn->bclink->flags |= flags;
  99}
 100
 101static u32 bcbuf_acks(struct sk_buff *buf)
 102{
 103        return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle;
 104}
 105
 106static void bcbuf_set_acks(struct sk_buff *buf, u32 acks)
 107{
 108        TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks;
 109}
 110
 111static void bcbuf_decr_acks(struct sk_buff *buf)
 112{
 113        bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
 114}
 115
 116void tipc_bclink_add_node(struct net *net, u32 addr)
 117{
 118        struct tipc_net *tn = net_generic(net, tipc_net_id);
 119
 120        tipc_bclink_lock(net);
 121        tipc_nmap_add(&tn->bclink->bcast_nodes, addr);
 122        tipc_bclink_unlock(net);
 123}
 124
 125void tipc_bclink_remove_node(struct net *net, u32 addr)
 126{
 127        struct tipc_net *tn = net_generic(net, tipc_net_id);
 128
 129        tipc_bclink_lock(net);
 130        tipc_nmap_remove(&tn->bclink->bcast_nodes, addr);
 131        tipc_bclink_unlock(net);
 132}
 133
 134static void bclink_set_last_sent(struct net *net)
 135{
 136        struct tipc_net *tn = net_generic(net, tipc_net_id);
 137        struct tipc_link *bcl = tn->bcl;
 138
 139        if (bcl->next_out)
 140                bcl->fsm_msg_cnt = mod(buf_seqno(bcl->next_out) - 1);
 141        else
 142                bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1);
 143}
 144
 145u32 tipc_bclink_get_last_sent(struct net *net)
 146{
 147        struct tipc_net *tn = net_generic(net, tipc_net_id);
 148
 149        return tn->bcl->fsm_msg_cnt;
 150}
 151
 152static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
 153{
 154        node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
 155                                                seqno : node->bclink.last_sent;
 156}
 157
 158
 159/**
 160 * tipc_bclink_retransmit_to - get most recent node to request retransmission
 161 *
 162 * Called with bclink_lock locked
 163 */
 164struct tipc_node *tipc_bclink_retransmit_to(struct net *net)
 165{
 166        struct tipc_net *tn = net_generic(net, tipc_net_id);
 167
 168        return tn->bclink->retransmit_to;
 169}
 170
 171/**
 172 * bclink_retransmit_pkt - retransmit broadcast packets
 173 * @after: sequence number of last packet to *not* retransmit
 174 * @to: sequence number of last packet to retransmit
 175 *
 176 * Called with bclink_lock locked
 177 */
 178static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
 179{
 180        struct sk_buff *skb;
 181        struct tipc_link *bcl = tn->bcl;
 182
 183        skb_queue_walk(&bcl->outqueue, skb) {
 184                if (more(buf_seqno(skb), after)) {
 185                        tipc_link_retransmit(bcl, skb, mod(to - after));
 186                        break;
 187                }
 188        }
 189}
 190
 191/**
 192 * tipc_bclink_wakeup_users - wake up pending users
 193 *
 194 * Called with no locks taken
 195 */
 196void tipc_bclink_wakeup_users(struct net *net)
 197{
 198        struct tipc_net *tn = net_generic(net, tipc_net_id);
 199
 200        tipc_sk_rcv(net, &tn->bclink->link.wakeupq);
 201}
 202
 203/**
 204 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets
 205 * @n_ptr: node that sent acknowledgement info
 206 * @acked: broadcast sequence # that has been acknowledged
 207 *
 208 * Node is locked, bclink_lock unlocked.
 209 */
 210void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
 211{
 212        struct sk_buff *skb, *tmp;
 213        struct sk_buff *next;
 214        unsigned int released = 0;
 215        struct net *net = n_ptr->net;
 216        struct tipc_net *tn = net_generic(net, tipc_net_id);
 217
 218        tipc_bclink_lock(net);
 219        /* Bail out if tx queue is empty (no clean up is required) */
 220        skb = skb_peek(&tn->bcl->outqueue);
 221        if (!skb)
 222                goto exit;
 223
 224        /* Determine which messages need to be acknowledged */
 225        if (acked == INVALID_LINK_SEQ) {
 226                /*
 227                 * Contact with specified node has been lost, so need to
 228                 * acknowledge sent messages only (if other nodes still exist)
 229                 * or both sent and unsent messages (otherwise)
 230                 */
 231                if (tn->bclink->bcast_nodes.count)
 232                        acked = tn->bcl->fsm_msg_cnt;
 233                else
 234                        acked = tn->bcl->next_out_no;
 235        } else {
 236                /*
 237                 * Bail out if specified sequence number does not correspond
 238                 * to a message that has been sent and not yet acknowledged
 239                 */
 240                if (less(acked, buf_seqno(skb)) ||
 241                    less(tn->bcl->fsm_msg_cnt, acked) ||
 242                    less_eq(acked, n_ptr->bclink.acked))
 243                        goto exit;
 244        }
 245
 246        /* Skip over packets that node has previously acknowledged */
 247        skb_queue_walk(&tn->bcl->outqueue, skb) {
 248                if (more(buf_seqno(skb), n_ptr->bclink.acked))
 249                        break;
 250        }
 251
 252        /* Update packets that node is now acknowledging */
 253        skb_queue_walk_from_safe(&tn->bcl->outqueue, skb, tmp) {
 254                if (more(buf_seqno(skb), acked))
 255                        break;
 256
 257                next = tipc_skb_queue_next(&tn->bcl->outqueue, skb);
 258                if (skb != tn->bcl->next_out) {
 259                        bcbuf_decr_acks(skb);
 260                } else {
 261                        bcbuf_set_acks(skb, 0);
 262                        tn->bcl->next_out = next;
 263                        bclink_set_last_sent(net);
 264                }
 265
 266                if (bcbuf_acks(skb) == 0) {
 267                        __skb_unlink(skb, &tn->bcl->outqueue);
 268                        kfree_skb(skb);
 269                        released = 1;
 270                }
 271        }
 272        n_ptr->bclink.acked = acked;
 273
 274        /* Try resolving broadcast link congestion, if necessary */
 275        if (unlikely(tn->bcl->next_out)) {
 276                tipc_link_push_packets(tn->bcl);
 277                bclink_set_last_sent(net);
 278        }
 279        if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq)))
 280                n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;
 281exit:
 282        tipc_bclink_unlock(net);
 283}
 284
 285/**
 286 * tipc_bclink_update_link_state - update broadcast link state
 287 *
 288 * RCU and node lock set
 289 */
 290void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
 291                                   u32 last_sent)
 292{
 293        struct sk_buff *buf;
 294        struct net *net = n_ptr->net;
 295        struct tipc_net *tn = net_generic(net, tipc_net_id);
 296
 297        /* Ignore "stale" link state info */
 298        if (less_eq(last_sent, n_ptr->bclink.last_in))
 299                return;
 300
 301        /* Update link synchronization state; quit if in sync */
 302        bclink_update_last_sent(n_ptr, last_sent);
 303
 304        if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
 305                return;
 306
 307        /* Update out-of-sync state; quit if loss is still unconfirmed */
 308        if ((++n_ptr->bclink.oos_state) == 1) {
 309                if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
 310                        return;
 311                n_ptr->bclink.oos_state++;
 312        }
 313
 314        /* Don't NACK if one has been recently sent (or seen) */
 315        if (n_ptr->bclink.oos_state & 0x1)
 316                return;
 317
 318        /* Send NACK */
 319        buf = tipc_buf_acquire(INT_H_SIZE);
 320        if (buf) {
 321                struct tipc_msg *msg = buf_msg(buf);
 322                struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue);
 323                u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
 324
 325                tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG,
 326                              INT_H_SIZE, n_ptr->addr);
 327                msg_set_non_seq(msg, 1);
 328                msg_set_mc_netid(msg, tn->net_id);
 329                msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
 330                msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
 331                msg_set_bcgap_to(msg, to);
 332
 333                tipc_bclink_lock(net);
 334                tipc_bearer_send(net, MAX_BEARERS, buf, NULL);
 335                tn->bcl->stats.sent_nacks++;
 336                tipc_bclink_unlock(net);
 337                kfree_skb(buf);
 338
 339                n_ptr->bclink.oos_state++;
 340        }
 341}
 342
 343/**
 344 * bclink_peek_nack - monitor retransmission requests sent by other nodes
 345 *
 346 * Delay any upcoming NACK by this node if another node has already
 347 * requested the first message this node is going to ask for.
 348 */
 349static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
 350{
 351        struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg));
 352
 353        if (unlikely(!n_ptr))
 354                return;
 355
 356        tipc_node_lock(n_ptr);
 357
 358        if (n_ptr->bclink.recv_permitted &&
 359            (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
 360            (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
 361                n_ptr->bclink.oos_state = 2;
 362
 363        tipc_node_unlock(n_ptr);
 364}
 365
 366/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
 367 *                    and to identified node local sockets
 368 * @net: the applicable net namespace
 369 * @list: chain of buffers containing message
 370 * Consumes the buffer chain, except when returning -ELINKCONG
 371 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
 372 */
 373int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
 374{
 375        struct tipc_net *tn = net_generic(net, tipc_net_id);
 376        struct tipc_link *bcl = tn->bcl;
 377        struct tipc_bclink *bclink = tn->bclink;
 378        int rc = 0;
 379        int bc = 0;
 380        struct sk_buff *skb;
 381        struct sk_buff_head arrvq;
 382        struct sk_buff_head inputq;
 383
 384        /* Prepare clone of message for local node */
 385        skb = tipc_msg_reassemble(list);
 386        if (unlikely(!skb)) {
 387                __skb_queue_purge(list);
 388                return -EHOSTUNREACH;
 389        }
 390
 391        /* Broadcast to all nodes */
 392        if (likely(bclink)) {
 393                tipc_bclink_lock(net);
 394                if (likely(bclink->bcast_nodes.count)) {
 395                        rc = __tipc_link_xmit(net, bcl, list);
 396                        if (likely(!rc)) {
 397                                u32 len = skb_queue_len(&bcl->outqueue);
 398
 399                                bclink_set_last_sent(net);
 400                                bcl->stats.queue_sz_counts++;
 401                                bcl->stats.accu_queue_sz += len;
 402                        }
 403                        bc = 1;
 404                }
 405                tipc_bclink_unlock(net);
 406        }
 407
 408        if (unlikely(!bc))
 409                __skb_queue_purge(list);
 410
 411        if (unlikely(rc)) {
 412                kfree_skb(skb);
 413                return rc;
 414        }
 415        /* Deliver message clone */
 416        __skb_queue_head_init(&arrvq);
 417        skb_queue_head_init(&inputq);
 418        __skb_queue_tail(&arrvq, skb);
 419        tipc_sk_mcast_rcv(net, &arrvq, &inputq);
 420        return rc;
 421}
 422
 423/**
 424 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
 425 *
 426 * Called with both sending node's lock and bclink_lock taken.
 427 */
 428static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
 429{
 430        struct tipc_net *tn = net_generic(node->net, tipc_net_id);
 431
 432        bclink_update_last_sent(node, seqno);
 433        node->bclink.last_in = seqno;
 434        node->bclink.oos_state = 0;
 435        tn->bcl->stats.recv_info++;
 436
 437        /*
 438         * Unicast an ACK periodically, ensuring that
 439         * all nodes in the cluster don't ACK at the same time
 440         */
 441        if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) {
 442                tipc_link_proto_xmit(node->active_links[node->addr & 1],
 443                                     STATE_MSG, 0, 0, 0, 0, 0);
 444                tn->bcl->stats.sent_acks++;
 445        }
 446}
 447
 448/**
 449 * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
 450 *
 451 * RCU is locked, no other locks set
 452 */
 453void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
 454{
 455        struct tipc_net *tn = net_generic(net, tipc_net_id);
 456        struct tipc_link *bcl = tn->bcl;
 457        struct tipc_msg *msg = buf_msg(buf);
 458        struct tipc_node *node;
 459        u32 next_in;
 460        u32 seqno;
 461        int deferred = 0;
 462        int pos = 0;
 463        struct sk_buff *iskb;
 464        struct sk_buff_head *arrvq, *inputq;
 465
 466        /* Screen out unwanted broadcast messages */
 467        if (msg_mc_netid(msg) != tn->net_id)
 468                goto exit;
 469
 470        node = tipc_node_find(net, msg_prevnode(msg));
 471        if (unlikely(!node))
 472                goto exit;
 473
 474        tipc_node_lock(node);
 475        if (unlikely(!node->bclink.recv_permitted))
 476                goto unlock;
 477
 478        /* Handle broadcast protocol message */
 479        if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
 480                if (msg_type(msg) != STATE_MSG)
 481                        goto unlock;
 482                if (msg_destnode(msg) == tn->own_addr) {
 483                        tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
 484                        tipc_node_unlock(node);
 485                        tipc_bclink_lock(net);
 486                        bcl->stats.recv_nacks++;
 487                        tn->bclink->retransmit_to = node;
 488                        bclink_retransmit_pkt(tn, msg_bcgap_after(msg),
 489                                              msg_bcgap_to(msg));
 490                        tipc_bclink_unlock(net);
 491                } else {
 492                        tipc_node_unlock(node);
 493                        bclink_peek_nack(net, msg);
 494                }
 495                goto exit;
 496        }
 497
 498        /* Handle in-sequence broadcast message */
 499        seqno = msg_seqno(msg);
 500        next_in = mod(node->bclink.last_in + 1);
 501        arrvq = &tn->bclink->arrvq;
 502        inputq = &tn->bclink->inputq;
 503
 504        if (likely(seqno == next_in)) {
 505receive:
 506                /* Deliver message to destination */
 507                if (likely(msg_isdata(msg))) {
 508                        tipc_bclink_lock(net);
 509                        bclink_accept_pkt(node, seqno);
 510                        spin_lock_bh(&inputq->lock);
 511                        __skb_queue_tail(arrvq, buf);
 512                        spin_unlock_bh(&inputq->lock);
 513                        node->action_flags |= TIPC_BCAST_MSG_EVT;
 514                        tipc_bclink_unlock(net);
 515                        tipc_node_unlock(node);
 516                } else if (msg_user(msg) == MSG_BUNDLER) {
 517                        tipc_bclink_lock(net);
 518                        bclink_accept_pkt(node, seqno);
 519                        bcl->stats.recv_bundles++;
 520                        bcl->stats.recv_bundled += msg_msgcnt(msg);
 521                        pos = 0;
 522                        while (tipc_msg_extract(buf, &iskb, &pos)) {
 523                                spin_lock_bh(&inputq->lock);
 524                                __skb_queue_tail(arrvq, iskb);
 525                                spin_unlock_bh(&inputq->lock);
 526                        }
 527                        node->action_flags |= TIPC_BCAST_MSG_EVT;
 528                        tipc_bclink_unlock(net);
 529                        tipc_node_unlock(node);
 530                } else if (msg_user(msg) == MSG_FRAGMENTER) {
 531                        tipc_buf_append(&node->bclink.reasm_buf, &buf);
 532                        if (unlikely(!buf && !node->bclink.reasm_buf))
 533                                goto unlock;
 534                        tipc_bclink_lock(net);
 535                        bclink_accept_pkt(node, seqno);
 536                        bcl->stats.recv_fragments++;
 537                        if (buf) {
 538                                bcl->stats.recv_fragmented++;
 539                                msg = buf_msg(buf);
 540                                tipc_bclink_unlock(net);
 541                                goto receive;
 542                        }
 543                        tipc_bclink_unlock(net);
 544                        tipc_node_unlock(node);
 545                } else {
 546                        tipc_bclink_lock(net);
 547                        bclink_accept_pkt(node, seqno);
 548                        tipc_bclink_unlock(net);
 549                        tipc_node_unlock(node);
 550                        kfree_skb(buf);
 551                }
 552                buf = NULL;
 553
 554                /* Determine new synchronization state */
 555                tipc_node_lock(node);
 556                if (unlikely(!tipc_node_is_up(node)))
 557                        goto unlock;
 558
 559                if (node->bclink.last_in == node->bclink.last_sent)
 560                        goto unlock;
 561
 562                if (skb_queue_empty(&node->bclink.deferred_queue)) {
 563                        node->bclink.oos_state = 1;
 564                        goto unlock;
 565                }
 566
 567                msg = buf_msg(skb_peek(&node->bclink.deferred_queue));
 568                seqno = msg_seqno(msg);
 569                next_in = mod(next_in + 1);
 570                if (seqno != next_in)
 571                        goto unlock;
 572
 573                /* Take in-sequence message from deferred queue & deliver it */
 574                buf = __skb_dequeue(&node->bclink.deferred_queue);
 575                goto receive;
 576        }
 577
 578        /* Handle out-of-sequence broadcast message */
 579        if (less(next_in, seqno)) {
 580                deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue,
 581                                               buf);
 582                bclink_update_last_sent(node, seqno);
 583                buf = NULL;
 584        }
 585
 586        tipc_bclink_lock(net);
 587
 588        if (deferred)
 589                bcl->stats.deferred_recv++;
 590        else
 591                bcl->stats.duplicates++;
 592
 593        tipc_bclink_unlock(net);
 594
 595unlock:
 596        tipc_node_unlock(node);
 597exit:
 598        kfree_skb(buf);
 599}
 600
 601u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
 602{
 603        return (n_ptr->bclink.recv_permitted &&
 604                (tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked));
 605}
 606
 607
 608/**
 609 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer
 610 *
 611 * Send packet over as many bearers as necessary to reach all nodes
 612 * that have joined the broadcast link.
 613 *
 614 * Returns 0 (packet sent successfully) under all circumstances,
 615 * since the broadcast link's pseudo-bearer never blocks
 616 */
 617static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
 618                              struct tipc_bearer *unused1,
 619                              struct tipc_media_addr *unused2)
 620{
 621        int bp_index;
 622        struct tipc_msg *msg = buf_msg(buf);
 623        struct tipc_net *tn = net_generic(net, tipc_net_id);
 624        struct tipc_bcbearer *bcbearer = tn->bcbearer;
 625        struct tipc_bclink *bclink = tn->bclink;
 626
 627        /* Prepare broadcast link message for reliable transmission,
 628         * if first time trying to send it;
 629         * preparation is skipped for broadcast link protocol messages
 630         * since they are sent in an unreliable manner and don't need it
 631         */
 632        if (likely(!msg_non_seq(buf_msg(buf)))) {
 633                bcbuf_set_acks(buf, bclink->bcast_nodes.count);
 634                msg_set_non_seq(msg, 1);
 635                msg_set_mc_netid(msg, tn->net_id);
 636                tn->bcl->stats.sent_info++;
 637
 638                if (WARN_ON(!bclink->bcast_nodes.count)) {
 639                        dump_stack();
 640                        return 0;
 641                }
 642        }
 643
 644        /* Send buffer over bearers until all targets reached */
 645        bcbearer->remains = bclink->bcast_nodes;
 646
 647        for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
 648                struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
 649                struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
 650                struct tipc_bearer *bp[2] = {p, s};
 651                struct tipc_bearer *b = bp[msg_link_selector(msg)];
 652                struct sk_buff *tbuf;
 653
 654                if (!p)
 655                        break; /* No more bearers to try */
 656                if (!b)
 657                        b = p;
 658                tipc_nmap_diff(&bcbearer->remains, &b->nodes,
 659                               &bcbearer->remains_new);
 660                if (bcbearer->remains_new.count == bcbearer->remains.count)
 661                        continue; /* Nothing added by bearer pair */
 662
 663                if (bp_index == 0) {
 664                        /* Use original buffer for first bearer */
 665                        tipc_bearer_send(net, b->identity, buf, &b->bcast_addr);
 666                } else {
 667                        /* Avoid concurrent buffer access */
 668                        tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);
 669                        if (!tbuf)
 670                                break;
 671                        tipc_bearer_send(net, b->identity, tbuf,
 672                                         &b->bcast_addr);
 673                        kfree_skb(tbuf); /* Bearer keeps a clone */
 674                }
 675                if (bcbearer->remains_new.count == 0)
 676                        break; /* All targets reached */
 677
 678                bcbearer->remains = bcbearer->remains_new;
 679        }
 680
 681        return 0;
 682}
 683
 684/**
 685 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
 686 */
 687void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr,
 688                        u32 node, bool action)
 689{
 690        struct tipc_net *tn = net_generic(net, tipc_net_id);
 691        struct tipc_bcbearer *bcbearer = tn->bcbearer;
 692        struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
 693        struct tipc_bcbearer_pair *bp_curr;
 694        struct tipc_bearer *b;
 695        int b_index;
 696        int pri;
 697
 698        tipc_bclink_lock(net);
 699
 700        if (action)
 701                tipc_nmap_add(nm_ptr, node);
 702        else
 703                tipc_nmap_remove(nm_ptr, node);
 704
 705        /* Group bearers by priority (can assume max of two per priority) */
 706        memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));
 707
 708        rcu_read_lock();
 709        for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
 710                b = rcu_dereference_rtnl(tn->bearer_list[b_index]);
 711                if (!b || !b->nodes.count)
 712                        continue;
 713
 714                if (!bp_temp[b->priority].primary)
 715                        bp_temp[b->priority].primary = b;
 716                else
 717                        bp_temp[b->priority].secondary = b;
 718        }
 719        rcu_read_unlock();
 720
 721        /* Create array of bearer pairs for broadcasting */
 722        bp_curr = bcbearer->bpairs;
 723        memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs));
 724
 725        for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) {
 726
 727                if (!bp_temp[pri].primary)
 728                        continue;
 729
 730                bp_curr->primary = bp_temp[pri].primary;
 731
 732                if (bp_temp[pri].secondary) {
 733                        if (tipc_nmap_equal(&bp_temp[pri].primary->nodes,
 734                                            &bp_temp[pri].secondary->nodes)) {
 735                                bp_curr->secondary = bp_temp[pri].secondary;
 736                        } else {
 737                                bp_curr++;
 738                                bp_curr->primary = bp_temp[pri].secondary;
 739                        }
 740                }
 741
 742                bp_curr++;
 743        }
 744
 745        tipc_bclink_unlock(net);
 746}
 747
 748static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
 749                                      struct tipc_stats *stats)
 750{
 751        int i;
 752        struct nlattr *nest;
 753
 754        struct nla_map {
 755                __u32 key;
 756                __u32 val;
 757        };
 758
 759        struct nla_map map[] = {
 760                {TIPC_NLA_STATS_RX_INFO, stats->recv_info},
 761                {TIPC_NLA_STATS_RX_FRAGMENTS, stats->recv_fragments},
 762                {TIPC_NLA_STATS_RX_FRAGMENTED, stats->recv_fragmented},
 763                {TIPC_NLA_STATS_RX_BUNDLES, stats->recv_bundles},
 764                {TIPC_NLA_STATS_RX_BUNDLED, stats->recv_bundled},
 765                {TIPC_NLA_STATS_TX_INFO, stats->sent_info},
 766                {TIPC_NLA_STATS_TX_FRAGMENTS, stats->sent_fragments},
 767                {TIPC_NLA_STATS_TX_FRAGMENTED, stats->sent_fragmented},
 768                {TIPC_NLA_STATS_TX_BUNDLES, stats->sent_bundles},
 769                {TIPC_NLA_STATS_TX_BUNDLED, stats->sent_bundled},
 770                {TIPC_NLA_STATS_RX_NACKS, stats->recv_nacks},
 771                {TIPC_NLA_STATS_RX_DEFERRED, stats->deferred_recv},
 772                {TIPC_NLA_STATS_TX_NACKS, stats->sent_nacks},
 773                {TIPC_NLA_STATS_TX_ACKS, stats->sent_acks},
 774                {TIPC_NLA_STATS_RETRANSMITTED, stats->retransmitted},
 775                {TIPC_NLA_STATS_DUPLICATES, stats->duplicates},
 776                {TIPC_NLA_STATS_LINK_CONGS, stats->link_congs},
 777                {TIPC_NLA_STATS_MAX_QUEUE, stats->max_queue_sz},
 778                {TIPC_NLA_STATS_AVG_QUEUE, stats->queue_sz_counts ?
 779                        (stats->accu_queue_sz / stats->queue_sz_counts) : 0}
 780        };
 781
 782        nest = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
 783        if (!nest)
 784                return -EMSGSIZE;
 785
 786        for (i = 0; i <  ARRAY_SIZE(map); i++)
 787                if (nla_put_u32(skb, map[i].key, map[i].val))
 788                        goto msg_full;
 789
 790        nla_nest_end(skb, nest);
 791
 792        return 0;
 793msg_full:
 794        nla_nest_cancel(skb, nest);
 795
 796        return -EMSGSIZE;
 797}
 798
 799int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
 800{
 801        int err;
 802        void *hdr;
 803        struct nlattr *attrs;
 804        struct nlattr *prop;
 805        struct tipc_net *tn = net_generic(net, tipc_net_id);
 806        struct tipc_link *bcl = tn->bcl;
 807
 808        if (!bcl)
 809                return 0;
 810
 811        tipc_bclink_lock(net);
 812
 813        hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
 814                          NLM_F_MULTI, TIPC_NL_LINK_GET);
 815        if (!hdr)
 816                return -EMSGSIZE;
 817
 818        attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
 819        if (!attrs)
 820                goto msg_full;
 821
 822        /* The broadcast link is always up */
 823        if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
 824                goto attr_msg_full;
 825
 826        if (nla_put_flag(msg->skb, TIPC_NLA_LINK_BROADCAST))
 827                goto attr_msg_full;
 828        if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, bcl->name))
 829                goto attr_msg_full;
 830        if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, bcl->next_in_no))
 831                goto attr_msg_full;
 832        if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, bcl->next_out_no))
 833                goto attr_msg_full;
 834
 835        prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
 836        if (!prop)
 837                goto attr_msg_full;
 838        if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN, bcl->queue_limit[0]))
 839                goto prop_msg_full;
 840        nla_nest_end(msg->skb, prop);
 841
 842        err = __tipc_nl_add_bc_link_stat(msg->skb, &bcl->stats);
 843        if (err)
 844                goto attr_msg_full;
 845
 846        tipc_bclink_unlock(net);
 847        nla_nest_end(msg->skb, attrs);
 848        genlmsg_end(msg->skb, hdr);
 849
 850        return 0;
 851
 852prop_msg_full:
 853        nla_nest_cancel(msg->skb, prop);
 854attr_msg_full:
 855        nla_nest_cancel(msg->skb, attrs);
 856msg_full:
 857        tipc_bclink_unlock(net);
 858        genlmsg_cancel(msg->skb, hdr);
 859
 860        return -EMSGSIZE;
 861}
 862
 863int tipc_bclink_reset_stats(struct net *net)
 864{
 865        struct tipc_net *tn = net_generic(net, tipc_net_id);
 866        struct tipc_link *bcl = tn->bcl;
 867
 868        if (!bcl)
 869                return -ENOPROTOOPT;
 870
 871        tipc_bclink_lock(net);
 872        memset(&bcl->stats, 0, sizeof(bcl->stats));
 873        tipc_bclink_unlock(net);
 874        return 0;
 875}
 876
 877int tipc_bclink_set_queue_limits(struct net *net, u32 limit)
 878{
 879        struct tipc_net *tn = net_generic(net, tipc_net_id);
 880        struct tipc_link *bcl = tn->bcl;
 881
 882        if (!bcl)
 883                return -ENOPROTOOPT;
 884        if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN))
 885                return -EINVAL;
 886
 887        tipc_bclink_lock(net);
 888        tipc_link_set_queue_limits(bcl, limit);
 889        tipc_bclink_unlock(net);
 890        return 0;
 891}
 892
 893int tipc_bclink_init(struct net *net)
 894{
 895        struct tipc_net *tn = net_generic(net, tipc_net_id);
 896        struct tipc_bcbearer *bcbearer;
 897        struct tipc_bclink *bclink;
 898        struct tipc_link *bcl;
 899
 900        bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
 901        if (!bcbearer)
 902                return -ENOMEM;
 903
 904        bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
 905        if (!bclink) {
 906                kfree(bcbearer);
 907                return -ENOMEM;
 908        }
 909
 910        bcl = &bclink->link;
 911        bcbearer->bearer.media = &bcbearer->media;
 912        bcbearer->media.send_msg = tipc_bcbearer_send;
 913        sprintf(bcbearer->media.name, "tipc-broadcast");
 914
 915        spin_lock_init(&bclink->lock);
 916        __skb_queue_head_init(&bcl->outqueue);
 917        __skb_queue_head_init(&bcl->deferred_queue);
 918        skb_queue_head_init(&bcl->wakeupq);
 919        bcl->next_out_no = 1;
 920        spin_lock_init(&bclink->node.lock);
 921        __skb_queue_head_init(&bclink->arrvq);
 922        skb_queue_head_init(&bclink->inputq);
 923        bcl->owner = &bclink->node;
 924        bcl->owner->net = net;
 925        bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
 926        tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
 927        bcl->bearer_id = MAX_BEARERS;
 928        rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer);
 929        bcl->state = WORKING_WORKING;
 930        bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg;
 931        msg_set_prevnode(bcl->pmsg, tn->own_addr);
 932        strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
 933        tn->bcbearer = bcbearer;
 934        tn->bclink = bclink;
 935        tn->bcl = bcl;
 936        return 0;
 937}
 938
 939void tipc_bclink_stop(struct net *net)
 940{
 941        struct tipc_net *tn = net_generic(net, tipc_net_id);
 942
 943        tipc_bclink_lock(net);
 944        tipc_link_purge_queues(tn->bcl);
 945        tipc_bclink_unlock(net);
 946
 947        RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL);
 948        synchronize_net();
 949        kfree(tn->bcbearer);
 950        kfree(tn->bclink);
 951}
 952
 953/**
 954 * tipc_nmap_add - add a node to a node map
 955 */
 956static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
 957{
 958        int n = tipc_node(node);
 959        int w = n / WSIZE;
 960        u32 mask = (1 << (n % WSIZE));
 961
 962        if ((nm_ptr->map[w] & mask) == 0) {
 963                nm_ptr->count++;
 964                nm_ptr->map[w] |= mask;
 965        }
 966}
 967
 968/**
 969 * tipc_nmap_remove - remove a node from a node map
 970 */
 971static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
 972{
 973        int n = tipc_node(node);
 974        int w = n / WSIZE;
 975        u32 mask = (1 << (n % WSIZE));
 976
 977        if ((nm_ptr->map[w] & mask) != 0) {
 978                nm_ptr->map[w] &= ~mask;
 979                nm_ptr->count--;
 980        }
 981}
 982
 983/**
 984 * tipc_nmap_diff - find differences between node maps
 985 * @nm_a: input node map A
 986 * @nm_b: input node map B
 987 * @nm_diff: output node map A-B (i.e. nodes of A that are not in B)
 988 */
 989static void tipc_nmap_diff(struct tipc_node_map *nm_a,
 990                           struct tipc_node_map *nm_b,
 991                           struct tipc_node_map *nm_diff)
 992{
 993        int stop = ARRAY_SIZE(nm_a->map);
 994        int w;
 995        int b;
 996        u32 map;
 997
 998        memset(nm_diff, 0, sizeof(*nm_diff));
 999        for (w = 0; w < stop; w++) {
1000                map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]);
1001                nm_diff->map[w] = map;
1002                if (map != 0) {
1003                        for (b = 0 ; b < WSIZE; b++) {
1004                                if (map & (1 << b))
1005                                        nm_diff->count++;
1006                        }
1007                }
1008        }
1009}
1010