linux/net/tipc/link.c
<<
>>
Prefs
   1/*
   2 * net/tipc/link.c: TIPC link code
   3 *
   4 * Copyright (c) 1996-2007, 2012-2015, Ericsson AB
   5 * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include "core.h"
  38#include "subscr.h"
  39#include "link.h"
  40#include "bcast.h"
  41#include "socket.h"
  42#include "name_distr.h"
  43#include "discover.h"
  44#include "netlink.h"
  45
  46#include <linux/pkt_sched.h>
  47
  48/*
  49 * Error message prefixes
  50 */
  51static const char *link_co_err = "Link changeover error, ";
  52static const char *link_rst_msg = "Resetting link ";
  53static const char *link_unk_evt = "Unknown link event ";
  54
  55static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = {
  56        [TIPC_NLA_LINK_UNSPEC]          = { .type = NLA_UNSPEC },
  57        [TIPC_NLA_LINK_NAME] = {
  58                .type = NLA_STRING,
  59                .len = TIPC_MAX_LINK_NAME
  60        },
  61        [TIPC_NLA_LINK_MTU]             = { .type = NLA_U32 },
  62        [TIPC_NLA_LINK_BROADCAST]       = { .type = NLA_FLAG },
  63        [TIPC_NLA_LINK_UP]              = { .type = NLA_FLAG },
  64        [TIPC_NLA_LINK_ACTIVE]          = { .type = NLA_FLAG },
  65        [TIPC_NLA_LINK_PROP]            = { .type = NLA_NESTED },
  66        [TIPC_NLA_LINK_STATS]           = { .type = NLA_NESTED },
  67        [TIPC_NLA_LINK_RX]              = { .type = NLA_U32 },
  68        [TIPC_NLA_LINK_TX]              = { .type = NLA_U32 }
  69};
  70
  71/* Properties valid for media, bearar and link */
  72static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
  73        [TIPC_NLA_PROP_UNSPEC]          = { .type = NLA_UNSPEC },
  74        [TIPC_NLA_PROP_PRIO]            = { .type = NLA_U32 },
  75        [TIPC_NLA_PROP_TOL]             = { .type = NLA_U32 },
  76        [TIPC_NLA_PROP_WIN]             = { .type = NLA_U32 }
  77};
  78
  79/*
  80 * Out-of-range value for link session numbers
  81 */
  82#define INVALID_SESSION 0x10000
  83
  84/*
  85 * Link state events:
  86 */
  87#define  STARTING_EVT    856384768      /* link processing trigger */
  88#define  TRAFFIC_MSG_EVT 560815u        /* rx'd ??? */
  89#define  SILENCE_EVT     560817u        /* timer dicovered silence from peer */
  90
  91/*
  92 * State value stored in 'failover_pkts'
  93 */
  94#define FIRST_FAILOVER 0xffffu
  95
  96static void link_handle_out_of_seq_msg(struct tipc_link *link,
  97                                       struct sk_buff *skb);
  98static void tipc_link_proto_rcv(struct tipc_link *link,
  99                                struct sk_buff *skb);
 100static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol);
 101static void link_state_event(struct tipc_link *l_ptr, u32 event);
 102static void link_reset_statistics(struct tipc_link *l_ptr);
 103static void link_print(struct tipc_link *l_ptr, const char *str);
 104static void tipc_link_sync_xmit(struct tipc_link *l);
 105static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
 106static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
 107static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
 108static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb);
 109static void link_set_timer(struct tipc_link *link, unsigned long time);
 110/*
 111 *  Simple link routines
 112 */
 113static unsigned int align(unsigned int i)
 114{
 115        return (i + 3) & ~3u;
 116}
 117
 118static void tipc_link_release(struct kref *kref)
 119{
 120        kfree(container_of(kref, struct tipc_link, ref));
 121}
 122
 123static void tipc_link_get(struct tipc_link *l_ptr)
 124{
 125        kref_get(&l_ptr->ref);
 126}
 127
 128static void tipc_link_put(struct tipc_link *l_ptr)
 129{
 130        kref_put(&l_ptr->ref, tipc_link_release);
 131}
 132
 133static struct tipc_link *tipc_parallel_link(struct tipc_link *l)
 134{
 135        if (l->owner->active_links[0] != l)
 136                return l->owner->active_links[0];
 137        return l->owner->active_links[1];
 138}
 139
 140/*
 141 *  Simple non-static link routines (i.e. referenced outside this file)
 142 */
 143int tipc_link_is_up(struct tipc_link *l_ptr)
 144{
 145        if (!l_ptr)
 146                return 0;
 147        return link_working_working(l_ptr) || link_working_unknown(l_ptr);
 148}
 149
 150int tipc_link_is_active(struct tipc_link *l_ptr)
 151{
 152        return  (l_ptr->owner->active_links[0] == l_ptr) ||
 153                (l_ptr->owner->active_links[1] == l_ptr);
 154}
 155
 156/**
 157 * link_timeout - handle expiration of link timer
 158 * @l_ptr: pointer to link
 159 */
 160static void link_timeout(unsigned long data)
 161{
 162        struct tipc_link *l_ptr = (struct tipc_link *)data;
 163        struct sk_buff *skb;
 164
 165        tipc_node_lock(l_ptr->owner);
 166
 167        /* update counters used in statistical profiling of send traffic */
 168        l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->transmq);
 169        l_ptr->stats.queue_sz_counts++;
 170
 171        skb = skb_peek(&l_ptr->transmq);
 172        if (skb) {
 173                struct tipc_msg *msg = buf_msg(skb);
 174                u32 length = msg_size(msg);
 175
 176                if ((msg_user(msg) == MSG_FRAGMENTER) &&
 177                    (msg_type(msg) == FIRST_FRAGMENT)) {
 178                        length = msg_size(msg_get_wrapped(msg));
 179                }
 180                if (length) {
 181                        l_ptr->stats.msg_lengths_total += length;
 182                        l_ptr->stats.msg_length_counts++;
 183                        if (length <= 64)
 184                                l_ptr->stats.msg_length_profile[0]++;
 185                        else if (length <= 256)
 186                                l_ptr->stats.msg_length_profile[1]++;
 187                        else if (length <= 1024)
 188                                l_ptr->stats.msg_length_profile[2]++;
 189                        else if (length <= 4096)
 190                                l_ptr->stats.msg_length_profile[3]++;
 191                        else if (length <= 16384)
 192                                l_ptr->stats.msg_length_profile[4]++;
 193                        else if (length <= 32768)
 194                                l_ptr->stats.msg_length_profile[5]++;
 195                        else
 196                                l_ptr->stats.msg_length_profile[6]++;
 197                }
 198        }
 199
 200        /* do all other link processing performed on a periodic basis */
 201        if (l_ptr->silent_intv_cnt || tipc_bclink_acks_missing(l_ptr->owner))
 202                link_state_event(l_ptr, SILENCE_EVT);
 203        l_ptr->silent_intv_cnt++;
 204        if (skb_queue_len(&l_ptr->backlogq))
 205                tipc_link_push_packets(l_ptr);
 206        link_set_timer(l_ptr, l_ptr->keepalive_intv);
 207        tipc_node_unlock(l_ptr->owner);
 208        tipc_link_put(l_ptr);
 209}
 210
 211static void link_set_timer(struct tipc_link *link, unsigned long time)
 212{
 213        if (!mod_timer(&link->timer, jiffies + time))
 214                tipc_link_get(link);
 215}
 216
 217/**
 218 * tipc_link_create - create a new link
 219 * @n_ptr: pointer to associated node
 220 * @b_ptr: pointer to associated bearer
 221 * @media_addr: media address to use when sending messages over link
 222 *
 223 * Returns pointer to link.
 224 */
 225struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
 226                                   struct tipc_bearer *b_ptr,
 227                                   const struct tipc_media_addr *media_addr)
 228{
 229        struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
 230        struct tipc_link *l_ptr;
 231        struct tipc_msg *msg;
 232        char *if_name;
 233        char addr_string[16];
 234        u32 peer = n_ptr->addr;
 235
 236        if (n_ptr->link_cnt >= MAX_BEARERS) {
 237                tipc_addr_string_fill(addr_string, n_ptr->addr);
 238                pr_err("Cannot establish %uth link to %s. Max %u allowed.\n",
 239                       n_ptr->link_cnt, addr_string, MAX_BEARERS);
 240                return NULL;
 241        }
 242
 243        if (n_ptr->links[b_ptr->identity]) {
 244                tipc_addr_string_fill(addr_string, n_ptr->addr);
 245                pr_err("Attempt to establish second link on <%s> to %s\n",
 246                       b_ptr->name, addr_string);
 247                return NULL;
 248        }
 249
 250        l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
 251        if (!l_ptr) {
 252                pr_warn("Link creation failed, no memory\n");
 253                return NULL;
 254        }
 255        kref_init(&l_ptr->ref);
 256        l_ptr->addr = peer;
 257        if_name = strchr(b_ptr->name, ':') + 1;
 258        sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
 259                tipc_zone(tn->own_addr), tipc_cluster(tn->own_addr),
 260                tipc_node(tn->own_addr),
 261                if_name,
 262                tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
 263                /* note: peer i/f name is updated by reset/activate message */
 264        memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
 265        l_ptr->owner = n_ptr;
 266        l_ptr->peer_session = INVALID_SESSION;
 267        l_ptr->bearer_id = b_ptr->identity;
 268        link_set_supervision_props(l_ptr, b_ptr->tolerance);
 269        l_ptr->state = RESET_UNKNOWN;
 270
 271        l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
 272        msg = l_ptr->pmsg;
 273        tipc_msg_init(tn->own_addr, msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE,
 274                      l_ptr->addr);
 275        msg_set_size(msg, sizeof(l_ptr->proto_msg));
 276        msg_set_session(msg, (tn->random & 0xffff));
 277        msg_set_bearer_id(msg, b_ptr->identity);
 278        strcpy((char *)msg_data(msg), if_name);
 279        l_ptr->net_plane = b_ptr->net_plane;
 280        l_ptr->advertised_mtu = b_ptr->mtu;
 281        l_ptr->mtu = l_ptr->advertised_mtu;
 282        l_ptr->priority = b_ptr->priority;
 283        tipc_link_set_queue_limits(l_ptr, b_ptr->window);
 284        l_ptr->snd_nxt = 1;
 285        __skb_queue_head_init(&l_ptr->transmq);
 286        __skb_queue_head_init(&l_ptr->backlogq);
 287        __skb_queue_head_init(&l_ptr->deferdq);
 288        skb_queue_head_init(&l_ptr->wakeupq);
 289        skb_queue_head_init(&l_ptr->inputq);
 290        skb_queue_head_init(&l_ptr->namedq);
 291        link_reset_statistics(l_ptr);
 292        tipc_node_attach_link(n_ptr, l_ptr);
 293        setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr);
 294        link_state_event(l_ptr, STARTING_EVT);
 295
 296        return l_ptr;
 297}
 298
 299/**
 300 * tipc_link_delete - Delete a link
 301 * @l: link to be deleted
 302 */
 303void tipc_link_delete(struct tipc_link *l)
 304{
 305        tipc_link_reset(l);
 306        if (del_timer(&l->timer))
 307                tipc_link_put(l);
 308        l->flags |= LINK_STOPPED;
 309        /* Delete link now, or when timer is finished: */
 310        tipc_link_reset_fragments(l);
 311        tipc_node_detach_link(l->owner, l);
 312        tipc_link_put(l);
 313}
 314
 315void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
 316{
 317        struct tipc_net *tn = net_generic(net, tipc_net_id);
 318        struct tipc_link *link;
 319        struct tipc_node *node;
 320
 321        rcu_read_lock();
 322        list_for_each_entry_rcu(node, &tn->node_list, list) {
 323                tipc_node_lock(node);
 324                link = node->links[bearer_id];
 325                if (link)
 326                        tipc_link_delete(link);
 327                tipc_node_unlock(node);
 328        }
 329        rcu_read_unlock();
 330}
 331
 332/**
 333 * link_schedule_user - schedule a message sender for wakeup after congestion
 334 * @link: congested link
 335 * @list: message that was attempted sent
 336 * Create pseudo msg to send back to user when congestion abates
 337 * Only consumes message if there is an error
 338 */
 339static int link_schedule_user(struct tipc_link *link, struct sk_buff_head *list)
 340{
 341        struct tipc_msg *msg = buf_msg(skb_peek(list));
 342        int imp = msg_importance(msg);
 343        u32 oport = msg_origport(msg);
 344        u32 addr = link_own_addr(link);
 345        struct sk_buff *skb;
 346
 347        /* This really cannot happen...  */
 348        if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
 349                pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
 350                tipc_link_reset(link);
 351                goto err;
 352        }
 353        /* Non-blocking sender: */
 354        if (TIPC_SKB_CB(skb_peek(list))->wakeup_pending)
 355                return -ELINKCONG;
 356
 357        /* Create and schedule wakeup pseudo message */
 358        skb = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
 359                              addr, addr, oport, 0, 0);
 360        if (!skb)
 361                goto err;
 362        TIPC_SKB_CB(skb)->chain_sz = skb_queue_len(list);
 363        TIPC_SKB_CB(skb)->chain_imp = imp;
 364        skb_queue_tail(&link->wakeupq, skb);
 365        link->stats.link_congs++;
 366        return -ELINKCONG;
 367err:
 368        __skb_queue_purge(list);
 369        return -ENOBUFS;
 370}
 371
 372/**
 373 * link_prepare_wakeup - prepare users for wakeup after congestion
 374 * @link: congested link
 375 * Move a number of waiting users, as permitted by available space in
 376 * the send queue, from link wait queue to node wait queue for wakeup
 377 */
 378void link_prepare_wakeup(struct tipc_link *l)
 379{
 380        int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
 381        int imp, lim;
 382        struct sk_buff *skb, *tmp;
 383
 384        skb_queue_walk_safe(&l->wakeupq, skb, tmp) {
 385                imp = TIPC_SKB_CB(skb)->chain_imp;
 386                lim = l->window + l->backlog[imp].limit;
 387                pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
 388                if ((pnd[imp] + l->backlog[imp].len) >= lim)
 389                        break;
 390                skb_unlink(skb, &l->wakeupq);
 391                skb_queue_tail(&l->inputq, skb);
 392                l->owner->inputq = &l->inputq;
 393                l->owner->action_flags |= TIPC_MSG_EVT;
 394        }
 395}
 396
 397/**
 398 * tipc_link_reset_fragments - purge link's inbound message fragments queue
 399 * @l_ptr: pointer to link
 400 */
 401void tipc_link_reset_fragments(struct tipc_link *l_ptr)
 402{
 403        kfree_skb(l_ptr->reasm_buf);
 404        l_ptr->reasm_buf = NULL;
 405}
 406
 407void tipc_link_purge_backlog(struct tipc_link *l)
 408{
 409        __skb_queue_purge(&l->backlogq);
 410        l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
 411        l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
 412        l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
 413        l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
 414        l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
 415}
 416
 417/**
 418 * tipc_link_purge_queues - purge all pkt queues associated with link
 419 * @l_ptr: pointer to link
 420 */
 421void tipc_link_purge_queues(struct tipc_link *l_ptr)
 422{
 423        __skb_queue_purge(&l_ptr->deferdq);
 424        __skb_queue_purge(&l_ptr->transmq);
 425        tipc_link_purge_backlog(l_ptr);
 426        tipc_link_reset_fragments(l_ptr);
 427}
 428
 429void tipc_link_reset(struct tipc_link *l_ptr)
 430{
 431        u32 prev_state = l_ptr->state;
 432        int was_active_link = tipc_link_is_active(l_ptr);
 433        struct tipc_node *owner = l_ptr->owner;
 434        struct tipc_link *pl = tipc_parallel_link(l_ptr);
 435
 436        msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
 437
 438        /* Link is down, accept any session */
 439        l_ptr->peer_session = INVALID_SESSION;
 440
 441        /* Prepare for renewed mtu size negotiation */
 442        l_ptr->mtu = l_ptr->advertised_mtu;
 443
 444        l_ptr->state = RESET_UNKNOWN;
 445
 446        if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
 447                return;
 448
 449        tipc_node_link_down(l_ptr->owner, l_ptr);
 450        tipc_bearer_remove_dest(owner->net, l_ptr->bearer_id, l_ptr->addr);
 451
 452        if (was_active_link && tipc_node_is_up(l_ptr->owner) && (pl != l_ptr)) {
 453                l_ptr->flags |= LINK_FAILINGOVER;
 454                l_ptr->failover_checkpt = l_ptr->rcv_nxt;
 455                pl->failover_pkts = FIRST_FAILOVER;
 456                pl->failover_checkpt = l_ptr->rcv_nxt;
 457                pl->failover_skb = l_ptr->reasm_buf;
 458        } else {
 459                kfree_skb(l_ptr->reasm_buf);
 460        }
 461        /* Clean up all queues, except inputq: */
 462        __skb_queue_purge(&l_ptr->transmq);
 463        __skb_queue_purge(&l_ptr->deferdq);
 464        if (!owner->inputq)
 465                owner->inputq = &l_ptr->inputq;
 466        skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
 467        if (!skb_queue_empty(owner->inputq))
 468                owner->action_flags |= TIPC_MSG_EVT;
 469        tipc_link_purge_backlog(l_ptr);
 470        l_ptr->reasm_buf = NULL;
 471        l_ptr->rcv_unacked = 0;
 472        l_ptr->snd_nxt = 1;
 473        l_ptr->silent_intv_cnt = 0;
 474        l_ptr->stale_count = 0;
 475        link_reset_statistics(l_ptr);
 476}
 477
 478static void link_activate(struct tipc_link *link)
 479{
 480        struct tipc_node *node = link->owner;
 481
 482        link->rcv_nxt = 1;
 483        link->stats.recv_info = 1;
 484        link->silent_intv_cnt = 0;
 485        tipc_node_link_up(node, link);
 486        tipc_bearer_add_dest(node->net, link->bearer_id, link->addr);
 487}
 488
 489/**
 490 * link_state_event - link finite state machine
 491 * @l_ptr: pointer to link
 492 * @event: state machine event to process
 493 */
 494static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
 495{
 496        struct tipc_link *other;
 497        unsigned long timer_intv = l_ptr->keepalive_intv;
 498
 499        if (l_ptr->flags & LINK_STOPPED)
 500                return;
 501
 502        if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT))
 503                return;         /* Not yet. */
 504
 505        if (l_ptr->flags & LINK_FAILINGOVER)
 506                return;
 507
 508        switch (l_ptr->state) {
 509        case WORKING_WORKING:
 510                switch (event) {
 511                case TRAFFIC_MSG_EVT:
 512                case ACTIVATE_MSG:
 513                        l_ptr->silent_intv_cnt = 0;
 514                        break;
 515                case SILENCE_EVT:
 516                        if (!l_ptr->silent_intv_cnt) {
 517                                if (tipc_bclink_acks_missing(l_ptr->owner))
 518                                        tipc_link_proto_xmit(l_ptr, STATE_MSG,
 519                                                             0, 0, 0, 0);
 520                                break;
 521                        }
 522                        l_ptr->state = WORKING_UNKNOWN;
 523                        tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0);
 524                        break;
 525                case RESET_MSG:
 526                        pr_debug("%s<%s>, requested by peer\n",
 527                                 link_rst_msg, l_ptr->name);
 528                        tipc_link_reset(l_ptr);
 529                        l_ptr->state = RESET_RESET;
 530                        tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
 531                                             0, 0, 0, 0);
 532                        break;
 533                default:
 534                        pr_debug("%s%u in WW state\n", link_unk_evt, event);
 535                }
 536                break;
 537        case WORKING_UNKNOWN:
 538                switch (event) {
 539                case TRAFFIC_MSG_EVT:
 540                case ACTIVATE_MSG:
 541                        l_ptr->state = WORKING_WORKING;
 542                        l_ptr->silent_intv_cnt = 0;
 543                        break;
 544                case RESET_MSG:
 545                        pr_debug("%s<%s>, requested by peer while probing\n",
 546                                 link_rst_msg, l_ptr->name);
 547                        tipc_link_reset(l_ptr);
 548                        l_ptr->state = RESET_RESET;
 549                        tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
 550                                             0, 0, 0, 0);
 551                        break;
 552                case SILENCE_EVT:
 553                        if (!l_ptr->silent_intv_cnt) {
 554                                l_ptr->state = WORKING_WORKING;
 555                                if (tipc_bclink_acks_missing(l_ptr->owner))
 556                                        tipc_link_proto_xmit(l_ptr, STATE_MSG,
 557                                                             0, 0, 0, 0);
 558                        } else if (l_ptr->silent_intv_cnt <
 559                                   l_ptr->abort_limit) {
 560                                tipc_link_proto_xmit(l_ptr, STATE_MSG,
 561                                                     1, 0, 0, 0);
 562                        } else {        /* Link has failed */
 563                                pr_debug("%s<%s>, peer not responding\n",
 564                                         link_rst_msg, l_ptr->name);
 565                                tipc_link_reset(l_ptr);
 566                                l_ptr->state = RESET_UNKNOWN;
 567                                tipc_link_proto_xmit(l_ptr, RESET_MSG,
 568                                                     0, 0, 0, 0);
 569                        }
 570                        break;
 571                default:
 572                        pr_err("%s%u in WU state\n", link_unk_evt, event);
 573                }
 574                break;
 575        case RESET_UNKNOWN:
 576                switch (event) {
 577                case TRAFFIC_MSG_EVT:
 578                        break;
 579                case ACTIVATE_MSG:
 580                        other = l_ptr->owner->active_links[0];
 581                        if (other && link_working_unknown(other))
 582                                break;
 583                        l_ptr->state = WORKING_WORKING;
 584                        link_activate(l_ptr);
 585                        tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0);
 586                        if (l_ptr->owner->working_links == 1)
 587                                tipc_link_sync_xmit(l_ptr);
 588                        break;
 589                case RESET_MSG:
 590                        l_ptr->state = RESET_RESET;
 591                        tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
 592                                             1, 0, 0, 0);
 593                        break;
 594                case STARTING_EVT:
 595                        l_ptr->flags |= LINK_STARTED;
 596                        link_set_timer(l_ptr, timer_intv);
 597                        break;
 598                case SILENCE_EVT:
 599                        tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0);
 600                        break;
 601                default:
 602                        pr_err("%s%u in RU state\n", link_unk_evt, event);
 603                }
 604                break;
 605        case RESET_RESET:
 606                switch (event) {
 607                case TRAFFIC_MSG_EVT:
 608                case ACTIVATE_MSG:
 609                        other = l_ptr->owner->active_links[0];
 610                        if (other && link_working_unknown(other))
 611                                break;
 612                        l_ptr->state = WORKING_WORKING;
 613                        link_activate(l_ptr);
 614                        tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0);
 615                        if (l_ptr->owner->working_links == 1)
 616                                tipc_link_sync_xmit(l_ptr);
 617                        break;
 618                case RESET_MSG:
 619                        break;
 620                case SILENCE_EVT:
 621                        tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
 622                                             0, 0, 0, 0);
 623                        break;
 624                default:
 625                        pr_err("%s%u in RR state\n", link_unk_evt, event);
 626                }
 627                break;
 628        default:
 629                pr_err("Unknown link state %u/%u\n", l_ptr->state, event);
 630        }
 631}
 632
 633/**
 634 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
 635 * @link: link to use
 636 * @list: chain of buffers containing message
 637 *
 638 * Consumes the buffer chain, except when returning -ELINKCONG,
 639 * since the caller then may want to make more send attempts.
 640 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
 641 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
 642 */
 643int __tipc_link_xmit(struct net *net, struct tipc_link *link,
 644                     struct sk_buff_head *list)
 645{
 646        struct tipc_msg *msg = buf_msg(skb_peek(list));
 647        unsigned int maxwin = link->window;
 648        unsigned int i, imp = msg_importance(msg);
 649        uint mtu = link->mtu;
 650        u16 ack = mod(link->rcv_nxt - 1);
 651        u16 seqno = link->snd_nxt;
 652        u16 bc_last_in = link->owner->bclink.last_in;
 653        struct tipc_media_addr *addr = &link->media_addr;
 654        struct sk_buff_head *transmq = &link->transmq;
 655        struct sk_buff_head *backlogq = &link->backlogq;
 656        struct sk_buff *skb, *bskb;
 657
 658        /* Match msg importance against this and all higher backlog limits: */
 659        for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
 660                if (unlikely(link->backlog[i].len >= link->backlog[i].limit))
 661                        return link_schedule_user(link, list);
 662        }
 663        if (unlikely(msg_size(msg) > mtu)) {
 664                __skb_queue_purge(list);
 665                return -EMSGSIZE;
 666        }
 667        /* Prepare each packet for sending, and add to relevant queue: */
 668        while (skb_queue_len(list)) {
 669                skb = skb_peek(list);
 670                msg = buf_msg(skb);
 671                msg_set_seqno(msg, seqno);
 672                msg_set_ack(msg, ack);
 673                msg_set_bcast_ack(msg, bc_last_in);
 674
 675                if (likely(skb_queue_len(transmq) < maxwin)) {
 676                        __skb_dequeue(list);
 677                        __skb_queue_tail(transmq, skb);
 678                        tipc_bearer_send(net, link->bearer_id, skb, addr);
 679                        link->rcv_unacked = 0;
 680                        seqno++;
 681                        continue;
 682                }
 683                if (tipc_msg_bundle(skb_peek_tail(backlogq), msg, mtu)) {
 684                        kfree_skb(__skb_dequeue(list));
 685                        link->stats.sent_bundled++;
 686                        continue;
 687                }
 688                if (tipc_msg_make_bundle(&bskb, msg, mtu, link->addr)) {
 689                        kfree_skb(__skb_dequeue(list));
 690                        __skb_queue_tail(backlogq, bskb);
 691                        link->backlog[msg_importance(buf_msg(bskb))].len++;
 692                        link->stats.sent_bundled++;
 693                        link->stats.sent_bundles++;
 694                        continue;
 695                }
 696                link->backlog[imp].len += skb_queue_len(list);
 697                skb_queue_splice_tail_init(list, backlogq);
 698        }
 699        link->snd_nxt = seqno;
 700        return 0;
 701}
 702
 703static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
 704{
 705        skb_queue_head_init(list);
 706        __skb_queue_tail(list, skb);
 707}
 708
 709static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
 710{
 711        struct sk_buff_head head;
 712
 713        skb2list(skb, &head);
 714        return __tipc_link_xmit(link->owner->net, link, &head);
 715}
 716
 717/* tipc_link_xmit_skb(): send single buffer to destination
 718 * Buffers sent via this functon are generally TIPC_SYSTEM_IMPORTANCE
 719 * messages, which will not be rejected
 720 * The only exception is datagram messages rerouted after secondary
 721 * lookup, which are rare and safe to dispose of anyway.
 722 * TODO: Return real return value, and let callers use
 723 * tipc_wait_for_sendpkt() where applicable
 724 */
 725int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
 726                       u32 selector)
 727{
 728        struct sk_buff_head head;
 729        int rc;
 730
 731        skb2list(skb, &head);
 732        rc = tipc_link_xmit(net, &head, dnode, selector);
 733        if (rc == -ELINKCONG)
 734                kfree_skb(skb);
 735        return 0;
 736}
 737
 738/**
 739 * tipc_link_xmit() is the general link level function for message sending
 740 * @net: the applicable net namespace
 741 * @list: chain of buffers containing message
 742 * @dsz: amount of user data to be sent
 743 * @dnode: address of destination node
 744 * @selector: a number used for deterministic link selection
 745 * Consumes the buffer chain, except when returning -ELINKCONG
 746 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
 747 */
 748int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
 749                   u32 selector)
 750{
 751        struct tipc_link *link = NULL;
 752        struct tipc_node *node;
 753        int rc = -EHOSTUNREACH;
 754
 755        node = tipc_node_find(net, dnode);
 756        if (node) {
 757                tipc_node_lock(node);
 758                link = node->active_links[selector & 1];
 759                if (link)
 760                        rc = __tipc_link_xmit(net, link, list);
 761                tipc_node_unlock(node);
 762                tipc_node_put(node);
 763        }
 764        if (link)
 765                return rc;
 766
 767        if (likely(in_own_node(net, dnode))) {
 768                tipc_sk_rcv(net, list);
 769                return 0;
 770        }
 771
 772        __skb_queue_purge(list);
 773        return rc;
 774}
 775
 776/*
 777 * tipc_link_sync_xmit - synchronize broadcast link endpoints.
 778 *
 779 * Give a newly added peer node the sequence number where it should
 780 * start receiving and acking broadcast packets.
 781 *
 782 * Called with node locked
 783 */
 784static void tipc_link_sync_xmit(struct tipc_link *link)
 785{
 786        struct sk_buff *skb;
 787        struct tipc_msg *msg;
 788
 789        skb = tipc_buf_acquire(INT_H_SIZE);
 790        if (!skb)
 791                return;
 792
 793        msg = buf_msg(skb);
 794        tipc_msg_init(link_own_addr(link), msg, BCAST_PROTOCOL, STATE_MSG,
 795                      INT_H_SIZE, link->addr);
 796        msg_set_last_bcast(msg, link->owner->bclink.acked);
 797        __tipc_link_xmit_skb(link, skb);
 798}
 799
 800/*
 801 * tipc_link_sync_rcv - synchronize broadcast link endpoints.
 802 * Receive the sequence number where we should start receiving and
 803 * acking broadcast packets from a newly added peer node, and open
 804 * up for reception of such packets.
 805 *
 806 * Called with node locked
 807 */
 808static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
 809{
 810        struct tipc_msg *msg = buf_msg(buf);
 811
 812        n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
 813        n->bclink.recv_permitted = true;
 814        kfree_skb(buf);
 815}
 816
 817/*
 818 * tipc_link_push_packets - push unsent packets to bearer
 819 *
 820 * Push out the unsent messages of a link where congestion
 821 * has abated. Node is locked.
 822 *
 823 * Called with node locked
 824 */
 825void tipc_link_push_packets(struct tipc_link *link)
 826{
 827        struct sk_buff *skb;
 828        struct tipc_msg *msg;
 829        u16 seqno = link->snd_nxt;
 830        u16 ack = mod(link->rcv_nxt - 1);
 831
 832        while (skb_queue_len(&link->transmq) < link->window) {
 833                skb = __skb_dequeue(&link->backlogq);
 834                if (!skb)
 835                        break;
 836                msg = buf_msg(skb);
 837                link->backlog[msg_importance(msg)].len--;
 838                msg_set_ack(msg, ack);
 839                msg_set_seqno(msg, seqno);
 840                seqno = mod(seqno + 1);
 841                msg_set_bcast_ack(msg, link->owner->bclink.last_in);
 842                link->rcv_unacked = 0;
 843                __skb_queue_tail(&link->transmq, skb);
 844                tipc_bearer_send(link->owner->net, link->bearer_id,
 845                                 skb, &link->media_addr);
 846        }
 847        link->snd_nxt = seqno;
 848}
 849
 850void tipc_link_reset_all(struct tipc_node *node)
 851{
 852        char addr_string[16];
 853        u32 i;
 854
 855        tipc_node_lock(node);
 856
 857        pr_warn("Resetting all links to %s\n",
 858                tipc_addr_string_fill(addr_string, node->addr));
 859
 860        for (i = 0; i < MAX_BEARERS; i++) {
 861                if (node->links[i]) {
 862                        link_print(node->links[i], "Resetting link\n");
 863                        tipc_link_reset(node->links[i]);
 864                }
 865        }
 866
 867        tipc_node_unlock(node);
 868}
 869
 870static void link_retransmit_failure(struct tipc_link *l_ptr,
 871                                    struct sk_buff *buf)
 872{
 873        struct tipc_msg *msg = buf_msg(buf);
 874        struct net *net = l_ptr->owner->net;
 875
 876        pr_warn("Retransmission failure on link <%s>\n", l_ptr->name);
 877
 878        if (l_ptr->addr) {
 879                /* Handle failure on standard link */
 880                link_print(l_ptr, "Resetting link\n");
 881                tipc_link_reset(l_ptr);
 882
 883        } else {
 884                /* Handle failure on broadcast link */
 885                struct tipc_node *n_ptr;
 886                char addr_string[16];
 887
 888                pr_info("Msg seq number: %u,  ", msg_seqno(msg));
 889                pr_cont("Outstanding acks: %lu\n",
 890                        (unsigned long) TIPC_SKB_CB(buf)->handle);
 891
 892                n_ptr = tipc_bclink_retransmit_to(net);
 893
 894                tipc_addr_string_fill(addr_string, n_ptr->addr);
 895                pr_info("Broadcast link info for %s\n", addr_string);
 896                pr_info("Reception permitted: %d,  Acked: %u\n",
 897                        n_ptr->bclink.recv_permitted,
 898                        n_ptr->bclink.acked);
 899                pr_info("Last in: %u,  Oos state: %u,  Last sent: %u\n",
 900                        n_ptr->bclink.last_in,
 901                        n_ptr->bclink.oos_state,
 902                        n_ptr->bclink.last_sent);
 903
 904                n_ptr->action_flags |= TIPC_BCAST_RESET;
 905                l_ptr->stale_count = 0;
 906        }
 907}
 908
 909void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
 910                          u32 retransmits)
 911{
 912        struct tipc_msg *msg;
 913
 914        if (!skb)
 915                return;
 916
 917        msg = buf_msg(skb);
 918
 919        /* Detect repeated retransmit failures */
 920        if (l_ptr->last_retransm == msg_seqno(msg)) {
 921                if (++l_ptr->stale_count > 100) {
 922                        link_retransmit_failure(l_ptr, skb);
 923                        return;
 924                }
 925        } else {
 926                l_ptr->last_retransm = msg_seqno(msg);
 927                l_ptr->stale_count = 1;
 928        }
 929
 930        skb_queue_walk_from(&l_ptr->transmq, skb) {
 931                if (!retransmits)
 932                        break;
 933                msg = buf_msg(skb);
 934                msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1));
 935                msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
 936                tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, skb,
 937                                 &l_ptr->media_addr);
 938                retransmits--;
 939                l_ptr->stats.retransmitted++;
 940        }
 941}
 942
 943/* link_synch(): check if all packets arrived before the synch
 944 *               point have been consumed
 945 * Returns true if the parallel links are synched, otherwise false
 946 */
 947static bool link_synch(struct tipc_link *l)
 948{
 949        unsigned int post_synch;
 950        struct tipc_link *pl;
 951
 952        pl  = tipc_parallel_link(l);
 953        if (pl == l)
 954                goto synched;
 955
 956        /* Was last pre-synch packet added to input queue ? */
 957        if (less_eq(pl->rcv_nxt, l->synch_point))
 958                return false;
 959
 960        /* Is it still in the input queue ? */
 961        post_synch = mod(pl->rcv_nxt - l->synch_point) - 1;
 962        if (skb_queue_len(&pl->inputq) > post_synch)
 963                return false;
 964synched:
 965        l->flags &= ~LINK_SYNCHING;
 966        return true;
 967}
 968
 969static void link_retrieve_defq(struct tipc_link *link,
 970                               struct sk_buff_head *list)
 971{
 972        u16 seq_no;
 973
 974        if (skb_queue_empty(&link->deferdq))
 975                return;
 976
 977        seq_no = buf_seqno(skb_peek(&link->deferdq));
 978        if (seq_no == link->rcv_nxt)
 979                skb_queue_splice_tail_init(&link->deferdq, list);
 980}
 981
 982/**
 983 * tipc_rcv - process TIPC packets/messages arriving from off-node
 984 * @net: the applicable net namespace
 985 * @skb: TIPC packet
 986 * @b_ptr: pointer to bearer message arrived on
 987 *
 988 * Invoked with no locks held.  Bearer pointer must point to a valid bearer
 989 * structure (i.e. cannot be NULL), but bearer can be inactive.
 990 */
 991void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
 992{
 993        struct tipc_net *tn = net_generic(net, tipc_net_id);
 994        struct sk_buff_head head;
 995        struct tipc_node *n_ptr;
 996        struct tipc_link *l_ptr;
 997        struct sk_buff *skb1, *tmp;
 998        struct tipc_msg *msg;
 999        u16 seq_no;
1000        u16 ackd;
1001        u32 released;
1002
1003        skb2list(skb, &head);
1004
1005        while ((skb = __skb_dequeue(&head))) {
1006                /* Ensure message is well-formed */
1007                if (unlikely(!tipc_msg_validate(skb)))
1008                        goto discard;
1009
1010                /* Handle arrival of a non-unicast link message */
1011                msg = buf_msg(skb);
1012                if (unlikely(msg_non_seq(msg))) {
1013                        if (msg_user(msg) ==  LINK_CONFIG)
1014                                tipc_disc_rcv(net, skb, b_ptr);
1015                        else
1016                                tipc_bclink_rcv(net, skb);
1017                        continue;
1018                }
1019
1020                /* Discard unicast link messages destined for another node */
1021                if (unlikely(!msg_short(msg) &&
1022                             (msg_destnode(msg) != tn->own_addr)))
1023                        goto discard;
1024
1025                /* Locate neighboring node that sent message */
1026                n_ptr = tipc_node_find(net, msg_prevnode(msg));
1027                if (unlikely(!n_ptr))
1028                        goto discard;
1029
1030                tipc_node_lock(n_ptr);
1031                /* Locate unicast link endpoint that should handle message */
1032                l_ptr = n_ptr->links[b_ptr->identity];
1033                if (unlikely(!l_ptr))
1034                        goto unlock;
1035
1036                /* Verify that communication with node is currently allowed */
1037                if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) &&
1038                    msg_user(msg) == LINK_PROTOCOL &&
1039                    (msg_type(msg) == RESET_MSG ||
1040                    msg_type(msg) == ACTIVATE_MSG) &&
1041                    !msg_redundant_link(msg))
1042                        n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN;
1043
1044                if (tipc_node_blocked(n_ptr))
1045                        goto unlock;
1046
1047                /* Validate message sequence number info */
1048                seq_no = msg_seqno(msg);
1049                ackd = msg_ack(msg);
1050
1051                /* Release acked messages */
1052                if (unlikely(n_ptr->bclink.acked != msg_bcast_ack(msg)))
1053                        tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1054
1055                released = 0;
1056                skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) {
1057                        if (more(buf_seqno(skb1), ackd))
1058                                break;
1059                         __skb_unlink(skb1, &l_ptr->transmq);
1060                         kfree_skb(skb1);
1061                         released = 1;
1062                }
1063
1064                /* Try sending any messages link endpoint has pending */
1065                if (unlikely(skb_queue_len(&l_ptr->backlogq)))
1066                        tipc_link_push_packets(l_ptr);
1067
1068                if (released && !skb_queue_empty(&l_ptr->wakeupq))
1069                        link_prepare_wakeup(l_ptr);
1070
1071                /* Process the incoming packet */
1072                if (unlikely(!link_working_working(l_ptr))) {
1073                        if (msg_user(msg) == LINK_PROTOCOL) {
1074                                tipc_link_proto_rcv(l_ptr, skb);
1075                                link_retrieve_defq(l_ptr, &head);
1076                                skb = NULL;
1077                                goto unlock;
1078                        }
1079
1080                        /* Traffic message. Conditionally activate link */
1081                        link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1082
1083                        if (link_working_working(l_ptr)) {
1084                                /* Re-insert buffer in front of queue */
1085                                __skb_queue_head(&head, skb);
1086                                skb = NULL;
1087                                goto unlock;
1088                        }
1089                        goto unlock;
1090                }
1091
1092                /* Link is now in state WORKING_WORKING */
1093                if (unlikely(seq_no != l_ptr->rcv_nxt)) {
1094                        link_handle_out_of_seq_msg(l_ptr, skb);
1095                        link_retrieve_defq(l_ptr, &head);
1096                        skb = NULL;
1097                        goto unlock;
1098                }
1099                l_ptr->silent_intv_cnt = 0;
1100
1101                /* Synchronize with parallel link if applicable */
1102                if (unlikely((l_ptr->flags & LINK_SYNCHING) && !msg_dup(msg))) {
1103                        if (!link_synch(l_ptr))
1104                                goto unlock;
1105                }
1106                l_ptr->rcv_nxt++;
1107                if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
1108                        link_retrieve_defq(l_ptr, &head);
1109                if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
1110                        l_ptr->stats.sent_acks++;
1111                        tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);
1112                }
1113                tipc_link_input(l_ptr, skb);
1114                skb = NULL;
1115unlock:
1116                tipc_node_unlock(n_ptr);
1117                tipc_node_put(n_ptr);
1118discard:
1119                if (unlikely(skb))
1120                        kfree_skb(skb);
1121        }
1122}
1123
1124/* tipc_data_input - deliver data and name distr msgs to upper layer
1125 *
1126 * Consumes buffer if message is of right type
1127 * Node lock must be held
1128 */
1129static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
1130{
1131        struct tipc_node *node = link->owner;
1132        struct tipc_msg *msg = buf_msg(skb);
1133        u32 dport = msg_destport(msg);
1134
1135        switch (msg_user(msg)) {
1136        case TIPC_LOW_IMPORTANCE:
1137        case TIPC_MEDIUM_IMPORTANCE:
1138        case TIPC_HIGH_IMPORTANCE:
1139        case TIPC_CRITICAL_IMPORTANCE:
1140        case CONN_MANAGER:
1141                if (tipc_skb_queue_tail(&link->inputq, skb, dport)) {
1142                        node->inputq = &link->inputq;
1143                        node->action_flags |= TIPC_MSG_EVT;
1144                }
1145                return true;
1146        case NAME_DISTRIBUTOR:
1147                node->bclink.recv_permitted = true;
1148                node->namedq = &link->namedq;
1149                skb_queue_tail(&link->namedq, skb);
1150                if (skb_queue_len(&link->namedq) == 1)
1151                        node->action_flags |= TIPC_NAMED_MSG_EVT;
1152                return true;
1153        case MSG_BUNDLER:
1154        case TUNNEL_PROTOCOL:
1155        case MSG_FRAGMENTER:
1156        case BCAST_PROTOCOL:
1157                return false;
1158        default:
1159                pr_warn("Dropping received illegal msg type\n");
1160                kfree_skb(skb);
1161                return false;
1162        };
1163}
1164
1165/* tipc_link_input - process packet that has passed link protocol check
1166 *
1167 * Consumes buffer
1168 * Node lock must be held
1169 */
1170static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
1171{
1172        struct tipc_node *node = link->owner;
1173        struct tipc_msg *msg = buf_msg(skb);
1174        struct sk_buff *iskb;
1175        int pos = 0;
1176
1177        if (likely(tipc_data_input(link, skb)))
1178                return;
1179
1180        switch (msg_user(msg)) {
1181        case TUNNEL_PROTOCOL:
1182                if (msg_dup(msg)) {
1183                        link->flags |= LINK_SYNCHING;
1184                        link->synch_point = msg_seqno(msg_get_wrapped(msg));
1185                        kfree_skb(skb);
1186                        break;
1187                }
1188                if (!tipc_link_failover_rcv(link, &skb))
1189                        break;
1190                if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
1191                        tipc_data_input(link, skb);
1192                        break;
1193                }
1194        case MSG_BUNDLER:
1195                link->stats.recv_bundles++;
1196                link->stats.recv_bundled += msg_msgcnt(msg);
1197
1198                while (tipc_msg_extract(skb, &iskb, &pos))
1199                        tipc_data_input(link, iskb);
1200                break;
1201        case MSG_FRAGMENTER:
1202                link->stats.recv_fragments++;
1203                if (tipc_buf_append(&link->reasm_buf, &skb)) {
1204                        link->stats.recv_fragmented++;
1205                        tipc_data_input(link, skb);
1206                } else if (!link->reasm_buf) {
1207                        tipc_link_reset(link);
1208                }
1209                break;
1210        case BCAST_PROTOCOL:
1211                tipc_link_sync_rcv(node, skb);
1212                break;
1213        default:
1214                break;
1215        };
1216}
1217
1218/**
1219 * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1220 *
1221 * Returns increase in queue length (i.e. 0 or 1)
1222 */
1223u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
1224{
1225        struct sk_buff *skb1;
1226        u16 seq_no = buf_seqno(skb);
1227
1228        /* Empty queue ? */
1229        if (skb_queue_empty(list)) {
1230                __skb_queue_tail(list, skb);
1231                return 1;
1232        }
1233
1234        /* Last ? */
1235        if (less(buf_seqno(skb_peek_tail(list)), seq_no)) {
1236                __skb_queue_tail(list, skb);
1237                return 1;
1238        }
1239
1240        /* Locate insertion point in queue, then insert; discard if duplicate */
1241        skb_queue_walk(list, skb1) {
1242                u16 curr_seqno = buf_seqno(skb1);
1243
1244                if (seq_no == curr_seqno) {
1245                        kfree_skb(skb);
1246                        return 0;
1247                }
1248
1249                if (less(seq_no, curr_seqno))
1250                        break;
1251        }
1252
1253        __skb_queue_before(list, skb1, skb);
1254        return 1;
1255}
1256
1257/*
1258 * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
1259 */
1260static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1261                                       struct sk_buff *buf)
1262{
1263        u32 seq_no = buf_seqno(buf);
1264
1265        if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
1266                tipc_link_proto_rcv(l_ptr, buf);
1267                return;
1268        }
1269
1270        /* Record OOS packet arrival */
1271        l_ptr->silent_intv_cnt = 0;
1272
1273        /*
1274         * Discard packet if a duplicate; otherwise add it to deferred queue
1275         * and notify peer of gap as per protocol specification
1276         */
1277        if (less(seq_no, l_ptr->rcv_nxt)) {
1278                l_ptr->stats.duplicates++;
1279                kfree_skb(buf);
1280                return;
1281        }
1282
1283        if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) {
1284                l_ptr->stats.deferred_recv++;
1285                if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1)
1286                        tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);
1287        } else {
1288                l_ptr->stats.duplicates++;
1289        }
1290}
1291
1292/*
1293 * Send protocol message to the other endpoint.
1294 */
1295void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1296                          u32 gap, u32 tolerance, u32 priority)
1297{
1298        struct sk_buff *buf = NULL;
1299        struct tipc_msg *msg = l_ptr->pmsg;
1300        u32 msg_size = sizeof(l_ptr->proto_msg);
1301        int r_flag;
1302        u16 last_rcv;
1303
1304        /* Don't send protocol message during link failover */
1305        if (l_ptr->flags & LINK_FAILINGOVER)
1306                return;
1307
1308        /* Abort non-RESET send if communication with node is prohibited */
1309        if ((tipc_node_blocked(l_ptr->owner)) && (msg_typ != RESET_MSG))
1310                return;
1311
1312        /* Create protocol message with "out-of-sequence" sequence number */
1313        msg_set_type(msg, msg_typ);
1314        msg_set_net_plane(msg, l_ptr->net_plane);
1315        msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1316        msg_set_last_bcast(msg, tipc_bclink_get_last_sent(l_ptr->owner->net));
1317
1318        if (msg_typ == STATE_MSG) {
1319                u16 next_sent = l_ptr->snd_nxt;
1320
1321                if (!tipc_link_is_up(l_ptr))
1322                        return;
1323                msg_set_next_sent(msg, next_sent);
1324                if (!skb_queue_empty(&l_ptr->deferdq)) {
1325                        last_rcv = buf_seqno(skb_peek(&l_ptr->deferdq));
1326                        gap = mod(last_rcv - l_ptr->rcv_nxt);
1327                }
1328                msg_set_seq_gap(msg, gap);
1329                if (gap)
1330                        l_ptr->stats.sent_nacks++;
1331                msg_set_link_tolerance(msg, tolerance);
1332                msg_set_linkprio(msg, priority);
1333                msg_set_max_pkt(msg, l_ptr->mtu);
1334                msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1));
1335                msg_set_probe(msg, probe_msg != 0);
1336                if (probe_msg)
1337                        l_ptr->stats.sent_probes++;
1338                l_ptr->stats.sent_states++;
1339        } else {                /* RESET_MSG or ACTIVATE_MSG */
1340                msg_set_ack(msg, mod(l_ptr->failover_checkpt - 1));
1341                msg_set_seq_gap(msg, 0);
1342                msg_set_next_sent(msg, 1);
1343                msg_set_probe(msg, 0);
1344                msg_set_link_tolerance(msg, l_ptr->tolerance);
1345                msg_set_linkprio(msg, l_ptr->priority);
1346                msg_set_max_pkt(msg, l_ptr->advertised_mtu);
1347        }
1348
1349        r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
1350        msg_set_redundant_link(msg, r_flag);
1351        msg_set_linkprio(msg, l_ptr->priority);
1352        msg_set_size(msg, msg_size);
1353
1354        msg_set_seqno(msg, mod(l_ptr->snd_nxt + (0xffff / 2)));
1355
1356        buf = tipc_buf_acquire(msg_size);
1357        if (!buf)
1358                return;
1359
1360        skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1361        buf->priority = TC_PRIO_CONTROL;
1362        tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf,
1363                         &l_ptr->media_addr);
1364        l_ptr->rcv_unacked = 0;
1365        kfree_skb(buf);
1366}
1367
1368/*
1369 * Receive protocol message :
1370 * Note that network plane id propagates through the network, and may
1371 * change at any time. The node with lowest address rules
1372 */
1373static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
1374                                struct sk_buff *buf)
1375{
1376        u32 rec_gap = 0;
1377        u32 msg_tol;
1378        struct tipc_msg *msg = buf_msg(buf);
1379
1380        if (l_ptr->flags & LINK_FAILINGOVER)
1381                goto exit;
1382
1383        if (l_ptr->net_plane != msg_net_plane(msg))
1384                if (link_own_addr(l_ptr) > msg_prevnode(msg))
1385                        l_ptr->net_plane = msg_net_plane(msg);
1386
1387        switch (msg_type(msg)) {
1388
1389        case RESET_MSG:
1390                if (!link_working_unknown(l_ptr) &&
1391                    (l_ptr->peer_session != INVALID_SESSION)) {
1392                        if (less_eq(msg_session(msg), l_ptr->peer_session))
1393                                break; /* duplicate or old reset: ignore */
1394                }
1395
1396                if (!msg_redundant_link(msg) && (link_working_working(l_ptr) ||
1397                                link_working_unknown(l_ptr))) {
1398                        /*
1399                         * peer has lost contact -- don't allow peer's links
1400                         * to reactivate before we recognize loss & clean up
1401                         */
1402                        l_ptr->owner->action_flags |= TIPC_WAIT_OWN_LINKS_DOWN;
1403                }
1404
1405                link_state_event(l_ptr, RESET_MSG);
1406
1407                /* fall thru' */
1408        case ACTIVATE_MSG:
1409                /* Update link settings according other endpoint's values */
1410                strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
1411
1412                msg_tol = msg_link_tolerance(msg);
1413                if (msg_tol > l_ptr->tolerance)
1414                        link_set_supervision_props(l_ptr, msg_tol);
1415
1416                if (msg_linkprio(msg) > l_ptr->priority)
1417                        l_ptr->priority = msg_linkprio(msg);
1418
1419                if (l_ptr->mtu > msg_max_pkt(msg))
1420                        l_ptr->mtu = msg_max_pkt(msg);
1421
1422                /* Synchronize broadcast link info, if not done previously */
1423                if (!tipc_node_is_up(l_ptr->owner)) {
1424                        l_ptr->owner->bclink.last_sent =
1425                                l_ptr->owner->bclink.last_in =
1426                                msg_last_bcast(msg);
1427                        l_ptr->owner->bclink.oos_state = 0;
1428                }
1429
1430                l_ptr->peer_session = msg_session(msg);
1431                l_ptr->peer_bearer_id = msg_bearer_id(msg);
1432
1433                if (msg_type(msg) == ACTIVATE_MSG)
1434                        link_state_event(l_ptr, ACTIVATE_MSG);
1435                break;
1436        case STATE_MSG:
1437
1438                msg_tol = msg_link_tolerance(msg);
1439                if (msg_tol)
1440                        link_set_supervision_props(l_ptr, msg_tol);
1441
1442                if (msg_linkprio(msg) &&
1443                    (msg_linkprio(msg) != l_ptr->priority)) {
1444                        pr_debug("%s<%s>, priority change %u->%u\n",
1445                                 link_rst_msg, l_ptr->name,
1446                                 l_ptr->priority, msg_linkprio(msg));
1447                        l_ptr->priority = msg_linkprio(msg);
1448                        tipc_link_reset(l_ptr); /* Enforce change to take effect */
1449                        break;
1450                }
1451
1452                /* Record reception; force mismatch at next timeout: */
1453                l_ptr->silent_intv_cnt = 0;
1454
1455                link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1456                l_ptr->stats.recv_states++;
1457                if (link_reset_unknown(l_ptr))
1458                        break;
1459
1460                if (less_eq(l_ptr->rcv_nxt, msg_next_sent(msg)))
1461                        rec_gap = mod(msg_next_sent(msg) - l_ptr->rcv_nxt);
1462
1463                if (msg_probe(msg))
1464                        l_ptr->stats.recv_probes++;
1465
1466                /* Protocol message before retransmits, reduce loss risk */
1467                if (l_ptr->owner->bclink.recv_permitted)
1468                        tipc_bclink_update_link_state(l_ptr->owner,
1469                                                      msg_last_bcast(msg));
1470
1471                if (rec_gap || (msg_probe(msg))) {
1472                        tipc_link_proto_xmit(l_ptr, STATE_MSG, 0,
1473                                             rec_gap, 0, 0);
1474                }
1475                if (msg_seq_gap(msg)) {
1476                        l_ptr->stats.recv_nacks++;
1477                        tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq),
1478                                             msg_seq_gap(msg));
1479                }
1480                break;
1481        }
1482exit:
1483        kfree_skb(buf);
1484}
1485
1486
1487/* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to
1488 * a different bearer. Owner node is locked.
1489 */
1490static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1491                                  struct tipc_msg *tunnel_hdr,
1492                                  struct tipc_msg *msg,
1493                                  u32 selector)
1494{
1495        struct tipc_link *tunnel;
1496        struct sk_buff *skb;
1497        u32 length = msg_size(msg);
1498
1499        tunnel = l_ptr->owner->active_links[selector & 1];
1500        if (!tipc_link_is_up(tunnel)) {
1501                pr_warn("%stunnel link no longer available\n", link_co_err);
1502                return;
1503        }
1504        msg_set_size(tunnel_hdr, length + INT_H_SIZE);
1505        skb = tipc_buf_acquire(length + INT_H_SIZE);
1506        if (!skb) {
1507                pr_warn("%sunable to send tunnel msg\n", link_co_err);
1508                return;
1509        }
1510        skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE);
1511        skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length);
1512        __tipc_link_xmit_skb(tunnel, skb);
1513}
1514
1515
1516/* tipc_link_failover_send_queue(): A link has gone down, but a second
1517 * link is still active. We can do failover. Tunnel the failing link's
1518 * whole send queue via the remaining link. This way, we don't lose
1519 * any packets, and sequence order is preserved for subsequent traffic
1520 * sent over the remaining link. Owner node is locked.
1521 */
1522void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1523{
1524        int msgcount;
1525        struct tipc_link *tunnel = l_ptr->owner->active_links[0];
1526        struct tipc_msg tunnel_hdr;
1527        struct sk_buff *skb;
1528        int split_bundles;
1529
1530        if (!tunnel)
1531                return;
1532
1533        tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, TUNNEL_PROTOCOL,
1534                      FAILOVER_MSG, INT_H_SIZE, l_ptr->addr);
1535
1536        skb_queue_walk(&l_ptr->backlogq, skb) {
1537                msg_set_seqno(buf_msg(skb), l_ptr->snd_nxt);
1538                l_ptr->snd_nxt = mod(l_ptr->snd_nxt + 1);
1539        }
1540        skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
1541        tipc_link_purge_backlog(l_ptr);
1542        msgcount = skb_queue_len(&l_ptr->transmq);
1543        msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1544        msg_set_msgcnt(&tunnel_hdr, msgcount);
1545
1546        if (skb_queue_empty(&l_ptr->transmq)) {
1547                skb = tipc_buf_acquire(INT_H_SIZE);
1548                if (skb) {
1549                        skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
1550                        msg_set_size(&tunnel_hdr, INT_H_SIZE);
1551                        __tipc_link_xmit_skb(tunnel, skb);
1552                } else {
1553                        pr_warn("%sunable to send changeover msg\n",
1554                                link_co_err);
1555                }
1556                return;
1557        }
1558
1559        split_bundles = (l_ptr->owner->active_links[0] !=
1560                         l_ptr->owner->active_links[1]);
1561
1562        skb_queue_walk(&l_ptr->transmq, skb) {
1563                struct tipc_msg *msg = buf_msg(skb);
1564
1565                if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
1566                        struct tipc_msg *m = msg_get_wrapped(msg);
1567                        unchar *pos = (unchar *)m;
1568
1569                        msgcount = msg_msgcnt(msg);
1570                        while (msgcount--) {
1571                                msg_set_seqno(m, msg_seqno(msg));
1572                                tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m,
1573                                                      msg_link_selector(m));
1574                                pos += align(msg_size(m));
1575                                m = (struct tipc_msg *)pos;
1576                        }
1577                } else {
1578                        tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
1579                                              msg_link_selector(msg));
1580                }
1581        }
1582}
1583
1584/* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a
1585 * duplicate of the first link's send queue via the new link. This way, we
1586 * are guaranteed that currently queued packets from a socket are delivered
1587 * before future traffic from the same socket, even if this is using the
1588 * new link. The last arriving copy of each duplicate packet is dropped at
1589 * the receiving end by the regular protocol check, so packet cardinality
1590 * and sequence order is preserved per sender/receiver socket pair.
1591 * Owner node is locked.
1592 */
1593void tipc_link_dup_queue_xmit(struct tipc_link *link,
1594                              struct tipc_link *tnl)
1595{
1596        struct sk_buff *skb;
1597        struct tipc_msg tnl_hdr;
1598        struct sk_buff_head *queue = &link->transmq;
1599        int mcnt;
1600        u16 seqno;
1601
1602        tipc_msg_init(link_own_addr(link), &tnl_hdr, TUNNEL_PROTOCOL,
1603                      SYNCH_MSG, INT_H_SIZE, link->addr);
1604        mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq);
1605        msg_set_msgcnt(&tnl_hdr, mcnt);
1606        msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id);
1607
1608tunnel_queue:
1609        skb_queue_walk(queue, skb) {
1610                struct sk_buff *outskb;
1611                struct tipc_msg *msg = buf_msg(skb);
1612                u32 len = msg_size(msg);
1613
1614                msg_set_ack(msg, mod(link->rcv_nxt - 1));
1615                msg_set_bcast_ack(msg, link->owner->bclink.last_in);
1616                msg_set_size(&tnl_hdr, len + INT_H_SIZE);
1617                outskb = tipc_buf_acquire(len + INT_H_SIZE);
1618                if (outskb == NULL) {
1619                        pr_warn("%sunable to send duplicate msg\n",
1620                                link_co_err);
1621                        return;
1622                }
1623                skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE);
1624                skb_copy_to_linear_data_offset(outskb, INT_H_SIZE,
1625                                               skb->data, len);
1626                __tipc_link_xmit_skb(tnl, outskb);
1627                if (!tipc_link_is_up(link))
1628                        return;
1629        }
1630        if (queue == &link->backlogq)
1631                return;
1632        seqno = link->snd_nxt;
1633        skb_queue_walk(&link->backlogq, skb) {
1634                msg_set_seqno(buf_msg(skb), seqno);
1635                seqno = mod(seqno + 1);
1636        }
1637        queue = &link->backlogq;
1638        goto tunnel_queue;
1639}
1640
1641/*  tipc_link_failover_rcv(): Receive a tunnelled FAILOVER_MSG packet
1642 *  Owner node is locked.
1643 */
1644static bool tipc_link_failover_rcv(struct tipc_link *link,
1645                                   struct sk_buff **skb)
1646{
1647        struct tipc_msg *msg = buf_msg(*skb);
1648        struct sk_buff *iskb = NULL;
1649        struct tipc_link *pl = NULL;
1650        int bearer_id = msg_bearer_id(msg);
1651        int pos = 0;
1652
1653        if (msg_type(msg) != FAILOVER_MSG) {
1654                pr_warn("%sunknown tunnel pkt received\n", link_co_err);
1655                goto exit;
1656        }
1657        if (bearer_id >= MAX_BEARERS)
1658                goto exit;
1659
1660        if (bearer_id == link->bearer_id)
1661                goto exit;
1662
1663        pl = link->owner->links[bearer_id];
1664        if (pl && tipc_link_is_up(pl))
1665                tipc_link_reset(pl);
1666
1667        if (link->failover_pkts == FIRST_FAILOVER)
1668                link->failover_pkts = msg_msgcnt(msg);
1669
1670        /* Should we expect an inner packet? */
1671        if (!link->failover_pkts)
1672                goto exit;
1673
1674        if (!tipc_msg_extract(*skb, &iskb, &pos)) {
1675                pr_warn("%sno inner failover pkt\n", link_co_err);
1676                *skb = NULL;
1677                goto exit;
1678        }
1679        link->failover_pkts--;
1680        *skb = NULL;
1681
1682        /* Was this packet already delivered? */
1683        if (less(buf_seqno(iskb), link->failover_checkpt)) {
1684                kfree_skb(iskb);
1685                iskb = NULL;
1686                goto exit;
1687        }
1688        if (msg_user(buf_msg(iskb)) == MSG_FRAGMENTER) {
1689                link->stats.recv_fragments++;
1690                tipc_buf_append(&link->failover_skb, &iskb);
1691        }
1692exit:
1693        if (!link->failover_pkts && pl)
1694                pl->flags &= ~LINK_FAILINGOVER;
1695        kfree_skb(*skb);
1696        *skb = iskb;
1697        return *skb;
1698}
1699
1700static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
1701{
1702        unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
1703
1704        if ((tol < TIPC_MIN_LINK_TOL) || (tol > TIPC_MAX_LINK_TOL))
1705                return;
1706
1707        l_ptr->tolerance = tol;
1708        l_ptr->keepalive_intv = msecs_to_jiffies(intv);
1709        l_ptr->abort_limit = tol / (jiffies_to_msecs(l_ptr->keepalive_intv));
1710}
1711
1712void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
1713{
1714        int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE);
1715
1716        l->window = win;
1717        l->backlog[TIPC_LOW_IMPORTANCE].limit      = win / 2;
1718        l->backlog[TIPC_MEDIUM_IMPORTANCE].limit   = win;
1719        l->backlog[TIPC_HIGH_IMPORTANCE].limit     = win / 2 * 3;
1720        l->backlog[TIPC_CRITICAL_IMPORTANCE].limit = win * 2;
1721        l->backlog[TIPC_SYSTEM_IMPORTANCE].limit   = max_bulk;
1722}
1723
1724/* tipc_link_find_owner - locate owner node of link by link's name
1725 * @net: the applicable net namespace
1726 * @name: pointer to link name string
1727 * @bearer_id: pointer to index in 'node->links' array where the link was found.
1728 *
1729 * Returns pointer to node owning the link, or 0 if no matching link is found.
1730 */
1731static struct tipc_node *tipc_link_find_owner(struct net *net,
1732                                              const char *link_name,
1733                                              unsigned int *bearer_id)
1734{
1735        struct tipc_net *tn = net_generic(net, tipc_net_id);
1736        struct tipc_link *l_ptr;
1737        struct tipc_node *n_ptr;
1738        struct tipc_node *found_node = NULL;
1739        int i;
1740
1741        *bearer_id = 0;
1742        rcu_read_lock();
1743        list_for_each_entry_rcu(n_ptr, &tn->node_list, list) {
1744                tipc_node_lock(n_ptr);
1745                for (i = 0; i < MAX_BEARERS; i++) {
1746                        l_ptr = n_ptr->links[i];
1747                        if (l_ptr && !strcmp(l_ptr->name, link_name)) {
1748                                *bearer_id = i;
1749                                found_node = n_ptr;
1750                                break;
1751                        }
1752                }
1753                tipc_node_unlock(n_ptr);
1754                if (found_node)
1755                        break;
1756        }
1757        rcu_read_unlock();
1758
1759        return found_node;
1760}
1761
1762/**
1763 * link_reset_statistics - reset link statistics
1764 * @l_ptr: pointer to link
1765 */
1766static void link_reset_statistics(struct tipc_link *l_ptr)
1767{
1768        memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
1769        l_ptr->stats.sent_info = l_ptr->snd_nxt;
1770        l_ptr->stats.recv_info = l_ptr->rcv_nxt;
1771}
1772
1773static void link_print(struct tipc_link *l_ptr, const char *str)
1774{
1775        struct tipc_net *tn = net_generic(l_ptr->owner->net, tipc_net_id);
1776        struct tipc_bearer *b_ptr;
1777
1778        rcu_read_lock();
1779        b_ptr = rcu_dereference_rtnl(tn->bearer_list[l_ptr->bearer_id]);
1780        if (b_ptr)
1781                pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name);
1782        rcu_read_unlock();
1783
1784        if (link_working_unknown(l_ptr))
1785                pr_cont(":WU\n");
1786        else if (link_reset_reset(l_ptr))
1787                pr_cont(":RR\n");
1788        else if (link_reset_unknown(l_ptr))
1789                pr_cont(":RU\n");
1790        else if (link_working_working(l_ptr))
1791                pr_cont(":WW\n");
1792        else
1793                pr_cont("\n");
1794}
1795
1796/* Parse and validate nested (link) properties valid for media, bearer and link
1797 */
1798int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[])
1799{
1800        int err;
1801
1802        err = nla_parse_nested(props, TIPC_NLA_PROP_MAX, prop,
1803                               tipc_nl_prop_policy);
1804        if (err)
1805                return err;
1806
1807        if (props[TIPC_NLA_PROP_PRIO]) {
1808                u32 prio;
1809
1810                prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
1811                if (prio > TIPC_MAX_LINK_PRI)
1812                        return -EINVAL;
1813        }
1814
1815        if (props[TIPC_NLA_PROP_TOL]) {
1816                u32 tol;
1817
1818                tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
1819                if ((tol < TIPC_MIN_LINK_TOL) || (tol > TIPC_MAX_LINK_TOL))
1820                        return -EINVAL;
1821        }
1822
1823        if (props[TIPC_NLA_PROP_WIN]) {
1824                u32 win;
1825
1826                win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
1827                if ((win < TIPC_MIN_LINK_WIN) || (win > TIPC_MAX_LINK_WIN))
1828                        return -EINVAL;
1829        }
1830
1831        return 0;
1832}
1833
1834int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
1835{
1836        int err;
1837        int res = 0;
1838        int bearer_id;
1839        char *name;
1840        struct tipc_link *link;
1841        struct tipc_node *node;
1842        struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
1843        struct net *net = sock_net(skb->sk);
1844
1845        if (!info->attrs[TIPC_NLA_LINK])
1846                return -EINVAL;
1847
1848        err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
1849                               info->attrs[TIPC_NLA_LINK],
1850                               tipc_nl_link_policy);
1851        if (err)
1852                return err;
1853
1854        if (!attrs[TIPC_NLA_LINK_NAME])
1855                return -EINVAL;
1856
1857        name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
1858
1859        if (strcmp(name, tipc_bclink_name) == 0)
1860                return tipc_nl_bc_link_set(net, attrs);
1861
1862        node = tipc_link_find_owner(net, name, &bearer_id);
1863        if (!node)
1864                return -EINVAL;
1865
1866        tipc_node_lock(node);
1867
1868        link = node->links[bearer_id];
1869        if (!link) {
1870                res = -EINVAL;
1871                goto out;
1872        }
1873
1874        if (attrs[TIPC_NLA_LINK_PROP]) {
1875                struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
1876
1877                err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP],
1878                                              props);
1879                if (err) {
1880                        res = err;
1881                        goto out;
1882                }
1883
1884                if (props[TIPC_NLA_PROP_TOL]) {
1885                        u32 tol;
1886
1887                        tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
1888                        link_set_supervision_props(link, tol);
1889                        tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0);
1890                }
1891                if (props[TIPC_NLA_PROP_PRIO]) {
1892                        u32 prio;
1893
1894                        prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
1895                        link->priority = prio;
1896                        tipc_link_proto_xmit(link, STATE_MSG, 0, 0, 0, prio);
1897                }
1898                if (props[TIPC_NLA_PROP_WIN]) {
1899                        u32 win;
1900
1901                        win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
1902                        tipc_link_set_queue_limits(link, win);
1903                }
1904        }
1905
1906out:
1907        tipc_node_unlock(node);
1908
1909        return res;
1910}
1911
1912static int __tipc_nl_add_stats(struct sk_buff *skb, struct tipc_stats *s)
1913{
1914        int i;
1915        struct nlattr *stats;
1916
1917        struct nla_map {
1918                u32 key;
1919                u32 val;
1920        };
1921
1922        struct nla_map map[] = {
1923                {TIPC_NLA_STATS_RX_INFO, s->recv_info},
1924                {TIPC_NLA_STATS_RX_FRAGMENTS, s->recv_fragments},
1925                {TIPC_NLA_STATS_RX_FRAGMENTED, s->recv_fragmented},
1926                {TIPC_NLA_STATS_RX_BUNDLES, s->recv_bundles},
1927                {TIPC_NLA_STATS_RX_BUNDLED, s->recv_bundled},
1928                {TIPC_NLA_STATS_TX_INFO, s->sent_info},
1929                {TIPC_NLA_STATS_TX_FRAGMENTS, s->sent_fragments},
1930                {TIPC_NLA_STATS_TX_FRAGMENTED, s->sent_fragmented},
1931                {TIPC_NLA_STATS_TX_BUNDLES, s->sent_bundles},
1932                {TIPC_NLA_STATS_TX_BUNDLED, s->sent_bundled},
1933                {TIPC_NLA_STATS_MSG_PROF_TOT, (s->msg_length_counts) ?
1934                        s->msg_length_counts : 1},
1935                {TIPC_NLA_STATS_MSG_LEN_CNT, s->msg_length_counts},
1936                {TIPC_NLA_STATS_MSG_LEN_TOT, s->msg_lengths_total},
1937                {TIPC_NLA_STATS_MSG_LEN_P0, s->msg_length_profile[0]},
1938                {TIPC_NLA_STATS_MSG_LEN_P1, s->msg_length_profile[1]},
1939                {TIPC_NLA_STATS_MSG_LEN_P2, s->msg_length_profile[2]},
1940                {TIPC_NLA_STATS_MSG_LEN_P3, s->msg_length_profile[3]},
1941                {TIPC_NLA_STATS_MSG_LEN_P4, s->msg_length_profile[4]},
1942                {TIPC_NLA_STATS_MSG_LEN_P5, s->msg_length_profile[5]},
1943                {TIPC_NLA_STATS_MSG_LEN_P6, s->msg_length_profile[6]},
1944                {TIPC_NLA_STATS_RX_STATES, s->recv_states},
1945                {TIPC_NLA_STATS_RX_PROBES, s->recv_probes},
1946                {TIPC_NLA_STATS_RX_NACKS, s->recv_nacks},
1947                {TIPC_NLA_STATS_RX_DEFERRED, s->deferred_recv},
1948                {TIPC_NLA_STATS_TX_STATES, s->sent_states},
1949                {TIPC_NLA_STATS_TX_PROBES, s->sent_probes},
1950                {TIPC_NLA_STATS_TX_NACKS, s->sent_nacks},
1951                {TIPC_NLA_STATS_TX_ACKS, s->sent_acks},
1952                {TIPC_NLA_STATS_RETRANSMITTED, s->retransmitted},
1953                {TIPC_NLA_STATS_DUPLICATES, s->duplicates},
1954                {TIPC_NLA_STATS_LINK_CONGS, s->link_congs},
1955                {TIPC_NLA_STATS_MAX_QUEUE, s->max_queue_sz},
1956                {TIPC_NLA_STATS_AVG_QUEUE, s->queue_sz_counts ?
1957                        (s->accu_queue_sz / s->queue_sz_counts) : 0}
1958        };
1959
1960        stats = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
1961        if (!stats)
1962                return -EMSGSIZE;
1963
1964        for (i = 0; i <  ARRAY_SIZE(map); i++)
1965                if (nla_put_u32(skb, map[i].key, map[i].val))
1966                        goto msg_full;
1967
1968        nla_nest_end(skb, stats);
1969
1970        return 0;
1971msg_full:
1972        nla_nest_cancel(skb, stats);
1973
1974        return -EMSGSIZE;
1975}
1976
1977/* Caller should hold appropriate locks to protect the link */
1978static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg,
1979                              struct tipc_link *link, int nlflags)
1980{
1981        int err;
1982        void *hdr;
1983        struct nlattr *attrs;
1984        struct nlattr *prop;
1985        struct tipc_net *tn = net_generic(net, tipc_net_id);
1986
1987        hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
1988                          nlflags, TIPC_NL_LINK_GET);
1989        if (!hdr)
1990                return -EMSGSIZE;
1991
1992        attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
1993        if (!attrs)
1994                goto msg_full;
1995
1996        if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, link->name))
1997                goto attr_msg_full;
1998        if (nla_put_u32(msg->skb, TIPC_NLA_LINK_DEST,
1999                        tipc_cluster_mask(tn->own_addr)))
2000                goto attr_msg_full;
2001        if (nla_put_u32(msg->skb, TIPC_NLA_LINK_MTU, link->mtu))
2002                goto attr_msg_full;
2003        if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, link->rcv_nxt))
2004                goto attr_msg_full;
2005        if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, link->snd_nxt))
2006                goto attr_msg_full;
2007
2008        if (tipc_link_is_up(link))
2009                if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
2010                        goto attr_msg_full;
2011        if (tipc_link_is_active(link))
2012                if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE))
2013                        goto attr_msg_full;
2014
2015        prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
2016        if (!prop)
2017                goto attr_msg_full;
2018        if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
2019                goto prop_msg_full;
2020        if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, link->tolerance))
2021                goto prop_msg_full;
2022        if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN,
2023                        link->window))
2024                goto prop_msg_full;
2025        if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
2026                goto prop_msg_full;
2027        nla_nest_end(msg->skb, prop);
2028
2029        err = __tipc_nl_add_stats(msg->skb, &link->stats);
2030        if (err)
2031                goto attr_msg_full;
2032
2033        nla_nest_end(msg->skb, attrs);
2034        genlmsg_end(msg->skb, hdr);
2035
2036        return 0;
2037
2038prop_msg_full:
2039        nla_nest_cancel(msg->skb, prop);
2040attr_msg_full:
2041        nla_nest_cancel(msg->skb, attrs);
2042msg_full:
2043        genlmsg_cancel(msg->skb, hdr);
2044
2045        return -EMSGSIZE;
2046}
2047
2048/* Caller should hold node lock  */
2049static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,
2050                                    struct tipc_node *node, u32 *prev_link)
2051{
2052        u32 i;
2053        int err;
2054
2055        for (i = *prev_link; i < MAX_BEARERS; i++) {
2056                *prev_link = i;
2057
2058                if (!node->links[i])
2059                        continue;
2060
2061                err = __tipc_nl_add_link(net, msg, node->links[i], NLM_F_MULTI);
2062                if (err)
2063                        return err;
2064        }
2065        *prev_link = 0;
2066
2067        return 0;
2068}
2069
2070int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)
2071{
2072        struct net *net = sock_net(skb->sk);
2073        struct tipc_net *tn = net_generic(net, tipc_net_id);
2074        struct tipc_node *node;
2075        struct tipc_nl_msg msg;
2076        u32 prev_node = cb->args[0];
2077        u32 prev_link = cb->args[1];
2078        int done = cb->args[2];
2079        int err;
2080
2081        if (done)
2082                return 0;
2083
2084        msg.skb = skb;
2085        msg.portid = NETLINK_CB(cb->skb).portid;
2086        msg.seq = cb->nlh->nlmsg_seq;
2087
2088        rcu_read_lock();
2089        if (prev_node) {
2090                node = tipc_node_find(net, prev_node);
2091                if (!node) {
2092                        /* We never set seq or call nl_dump_check_consistent()
2093                         * this means that setting prev_seq here will cause the
2094                         * consistence check to fail in the netlink callback
2095                         * handler. Resulting in the last NLMSG_DONE message
2096                         * having the NLM_F_DUMP_INTR flag set.
2097                         */
2098                        cb->prev_seq = 1;
2099                        goto out;
2100                }
2101                tipc_node_put(node);
2102
2103                list_for_each_entry_continue_rcu(node, &tn->node_list,
2104                                                 list) {
2105                        tipc_node_lock(node);
2106                        err = __tipc_nl_add_node_links(net, &msg, node,
2107                                                       &prev_link);
2108                        tipc_node_unlock(node);
2109                        if (err)
2110                                goto out;
2111
2112                        prev_node = node->addr;
2113                }
2114        } else {
2115                err = tipc_nl_add_bc_link(net, &msg);
2116                if (err)
2117                        goto out;
2118
2119                list_for_each_entry_rcu(node, &tn->node_list, list) {
2120                        tipc_node_lock(node);
2121                        err = __tipc_nl_add_node_links(net, &msg, node,
2122                                                       &prev_link);
2123                        tipc_node_unlock(node);
2124                        if (err)
2125                                goto out;
2126
2127                        prev_node = node->addr;
2128                }
2129        }
2130        done = 1;
2131out:
2132        rcu_read_unlock();
2133
2134        cb->args[0] = prev_node;
2135        cb->args[1] = prev_link;
2136        cb->args[2] = done;
2137
2138        return skb->len;
2139}
2140
2141int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info)
2142{
2143        struct net *net = genl_info_net(info);
2144        struct tipc_nl_msg msg;
2145        char *name;
2146        int err;
2147
2148        msg.portid = info->snd_portid;
2149        msg.seq = info->snd_seq;
2150
2151        if (!info->attrs[TIPC_NLA_LINK_NAME])
2152                return -EINVAL;
2153        name = nla_data(info->attrs[TIPC_NLA_LINK_NAME]);
2154
2155        msg.skb = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL);
2156        if (!msg.skb)
2157                return -ENOMEM;
2158
2159        if (strcmp(name, tipc_bclink_name) == 0) {
2160                err = tipc_nl_add_bc_link(net, &msg);
2161                if (err) {
2162                        nlmsg_free(msg.skb);
2163                        return err;
2164                }
2165        } else {
2166                int bearer_id;
2167                struct tipc_node *node;
2168                struct tipc_link *link;
2169
2170                node = tipc_link_find_owner(net, name, &bearer_id);
2171                if (!node)
2172                        return -EINVAL;
2173
2174                tipc_node_lock(node);
2175                link = node->links[bearer_id];
2176                if (!link) {
2177                        tipc_node_unlock(node);
2178                        nlmsg_free(msg.skb);
2179                        return -EINVAL;
2180                }
2181
2182                err = __tipc_nl_add_link(net, &msg, link, 0);
2183                tipc_node_unlock(node);
2184                if (err) {
2185                        nlmsg_free(msg.skb);
2186                        return err;
2187                }
2188        }
2189
2190        return genlmsg_reply(msg.skb, info);
2191}
2192
2193int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
2194{
2195        int err;
2196        char *link_name;
2197        unsigned int bearer_id;
2198        struct tipc_link *link;
2199        struct tipc_node *node;
2200        struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
2201        struct net *net = sock_net(skb->sk);
2202
2203        if (!info->attrs[TIPC_NLA_LINK])
2204                return -EINVAL;
2205
2206        err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
2207                               info->attrs[TIPC_NLA_LINK],
2208                               tipc_nl_link_policy);
2209        if (err)
2210                return err;
2211
2212        if (!attrs[TIPC_NLA_LINK_NAME])
2213                return -EINVAL;
2214
2215        link_name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
2216
2217        if (strcmp(link_name, tipc_bclink_name) == 0) {
2218                err = tipc_bclink_reset_stats(net);
2219                if (err)
2220                        return err;
2221                return 0;
2222        }
2223
2224        node = tipc_link_find_owner(net, link_name, &bearer_id);
2225        if (!node)
2226                return -EINVAL;
2227
2228        tipc_node_lock(node);
2229
2230        link = node->links[bearer_id];
2231        if (!link) {
2232                tipc_node_unlock(node);
2233                return -EINVAL;
2234        }
2235
2236        link_reset_statistics(link);
2237
2238        tipc_node_unlock(node);
2239
2240        return 0;
2241}
2242