linux/net/tipc/link.c
<<
>>
Prefs
   1/*
   2 * net/tipc/link.c: TIPC link code
   3 *
   4 * Copyright (c) 1996-2007, 2012-2014, Ericsson AB
   5 * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include "core.h"
  38#include "link.h"
  39#include "bcast.h"
  40#include "socket.h"
  41#include "name_distr.h"
  42#include "discover.h"
  43#include "netlink.h"
  44
  45#include <linux/pkt_sched.h>
  46
  47/*
  48 * Error message prefixes
  49 */
  50static const char *link_co_err = "Link changeover error, ";
  51static const char *link_rst_msg = "Resetting link ";
  52static const char *link_unk_evt = "Unknown link event ";
  53
  54static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = {
  55        [TIPC_NLA_LINK_UNSPEC]          = { .type = NLA_UNSPEC },
  56        [TIPC_NLA_LINK_NAME] = {
  57                .type = NLA_STRING,
  58                .len = TIPC_MAX_LINK_NAME
  59        },
  60        [TIPC_NLA_LINK_MTU]             = { .type = NLA_U32 },
  61        [TIPC_NLA_LINK_BROADCAST]       = { .type = NLA_FLAG },
  62        [TIPC_NLA_LINK_UP]              = { .type = NLA_FLAG },
  63        [TIPC_NLA_LINK_ACTIVE]          = { .type = NLA_FLAG },
  64        [TIPC_NLA_LINK_PROP]            = { .type = NLA_NESTED },
  65        [TIPC_NLA_LINK_STATS]           = { .type = NLA_NESTED },
  66        [TIPC_NLA_LINK_RX]              = { .type = NLA_U32 },
  67        [TIPC_NLA_LINK_TX]              = { .type = NLA_U32 }
  68};
  69
  70/* Properties valid for media, bearar and link */
  71static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
  72        [TIPC_NLA_PROP_UNSPEC]          = { .type = NLA_UNSPEC },
  73        [TIPC_NLA_PROP_PRIO]            = { .type = NLA_U32 },
  74        [TIPC_NLA_PROP_TOL]             = { .type = NLA_U32 },
  75        [TIPC_NLA_PROP_WIN]             = { .type = NLA_U32 }
  76};
  77
  78/*
  79 * Out-of-range value for link session numbers
  80 */
  81#define INVALID_SESSION 0x10000
  82
  83/*
  84 * Link state events:
  85 */
  86#define  STARTING_EVT    856384768      /* link processing trigger */
  87#define  TRAFFIC_MSG_EVT 560815u        /* rx'd ??? */
  88#define  TIMEOUT_EVT     560817u        /* link timer expired */
  89
  90/*
  91 * The following two 'message types' is really just implementation
  92 * data conveniently stored in the message header.
  93 * They must not be considered part of the protocol
  94 */
  95#define OPEN_MSG   0
  96#define CLOSED_MSG 1
  97
  98/*
  99 * State value stored in 'exp_msg_count'
 100 */
 101#define START_CHANGEOVER 100000u
 102
 103static void link_handle_out_of_seq_msg(struct tipc_link *link,
 104                                       struct sk_buff *skb);
 105static void tipc_link_proto_rcv(struct tipc_link *link,
 106                                struct sk_buff *skb);
 107static int  tipc_link_tunnel_rcv(struct tipc_node *node,
 108                                 struct sk_buff **skb);
 109static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol);
 110static void link_state_event(struct tipc_link *l_ptr, u32 event);
 111static void link_reset_statistics(struct tipc_link *l_ptr);
 112static void link_print(struct tipc_link *l_ptr, const char *str);
 113static void tipc_link_sync_xmit(struct tipc_link *l);
 114static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
 115static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
 116static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
 117
 118/*
 119 *  Simple link routines
 120 */
 121static unsigned int align(unsigned int i)
 122{
 123        return (i + 3) & ~3u;
 124}
 125
 126static void tipc_link_release(struct kref *kref)
 127{
 128        kfree(container_of(kref, struct tipc_link, ref));
 129}
 130
 131static void tipc_link_get(struct tipc_link *l_ptr)
 132{
 133        kref_get(&l_ptr->ref);
 134}
 135
 136static void tipc_link_put(struct tipc_link *l_ptr)
 137{
 138        kref_put(&l_ptr->ref, tipc_link_release);
 139}
 140
 141static void link_init_max_pkt(struct tipc_link *l_ptr)
 142{
 143        struct tipc_node *node = l_ptr->owner;
 144        struct tipc_net *tn = net_generic(node->net, tipc_net_id);
 145        struct tipc_bearer *b_ptr;
 146        u32 max_pkt;
 147
 148        rcu_read_lock();
 149        b_ptr = rcu_dereference_rtnl(tn->bearer_list[l_ptr->bearer_id]);
 150        if (!b_ptr) {
 151                rcu_read_unlock();
 152                return;
 153        }
 154        max_pkt = (b_ptr->mtu & ~3);
 155        rcu_read_unlock();
 156
 157        if (max_pkt > MAX_MSG_SIZE)
 158                max_pkt = MAX_MSG_SIZE;
 159
 160        l_ptr->max_pkt_target = max_pkt;
 161        if (l_ptr->max_pkt_target < MAX_PKT_DEFAULT)
 162                l_ptr->max_pkt = l_ptr->max_pkt_target;
 163        else
 164                l_ptr->max_pkt = MAX_PKT_DEFAULT;
 165
 166        l_ptr->max_pkt_probes = 0;
 167}
 168
 169/*
 170 *  Simple non-static link routines (i.e. referenced outside this file)
 171 */
 172int tipc_link_is_up(struct tipc_link *l_ptr)
 173{
 174        if (!l_ptr)
 175                return 0;
 176        return link_working_working(l_ptr) || link_working_unknown(l_ptr);
 177}
 178
 179int tipc_link_is_active(struct tipc_link *l_ptr)
 180{
 181        return  (l_ptr->owner->active_links[0] == l_ptr) ||
 182                (l_ptr->owner->active_links[1] == l_ptr);
 183}
 184
 185/**
 186 * link_timeout - handle expiration of link timer
 187 * @l_ptr: pointer to link
 188 */
 189static void link_timeout(unsigned long data)
 190{
 191        struct tipc_link *l_ptr = (struct tipc_link *)data;
 192        struct sk_buff *skb;
 193
 194        tipc_node_lock(l_ptr->owner);
 195
 196        /* update counters used in statistical profiling of send traffic */
 197        l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue);
 198        l_ptr->stats.queue_sz_counts++;
 199
 200        skb = skb_peek(&l_ptr->outqueue);
 201        if (skb) {
 202                struct tipc_msg *msg = buf_msg(skb);
 203                u32 length = msg_size(msg);
 204
 205                if ((msg_user(msg) == MSG_FRAGMENTER) &&
 206                    (msg_type(msg) == FIRST_FRAGMENT)) {
 207                        length = msg_size(msg_get_wrapped(msg));
 208                }
 209                if (length) {
 210                        l_ptr->stats.msg_lengths_total += length;
 211                        l_ptr->stats.msg_length_counts++;
 212                        if (length <= 64)
 213                                l_ptr->stats.msg_length_profile[0]++;
 214                        else if (length <= 256)
 215                                l_ptr->stats.msg_length_profile[1]++;
 216                        else if (length <= 1024)
 217                                l_ptr->stats.msg_length_profile[2]++;
 218                        else if (length <= 4096)
 219                                l_ptr->stats.msg_length_profile[3]++;
 220                        else if (length <= 16384)
 221                                l_ptr->stats.msg_length_profile[4]++;
 222                        else if (length <= 32768)
 223                                l_ptr->stats.msg_length_profile[5]++;
 224                        else
 225                                l_ptr->stats.msg_length_profile[6]++;
 226                }
 227        }
 228
 229        /* do all other link processing performed on a periodic basis */
 230        link_state_event(l_ptr, TIMEOUT_EVT);
 231
 232        if (l_ptr->next_out)
 233                tipc_link_push_packets(l_ptr);
 234
 235        tipc_node_unlock(l_ptr->owner);
 236        tipc_link_put(l_ptr);
 237}
 238
 239static void link_set_timer(struct tipc_link *link, unsigned long time)
 240{
 241        if (!mod_timer(&link->timer, jiffies + time))
 242                tipc_link_get(link);
 243}
 244
 245/**
 246 * tipc_link_create - create a new link
 247 * @n_ptr: pointer to associated node
 248 * @b_ptr: pointer to associated bearer
 249 * @media_addr: media address to use when sending messages over link
 250 *
 251 * Returns pointer to link.
 252 */
 253struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
 254                                   struct tipc_bearer *b_ptr,
 255                                   const struct tipc_media_addr *media_addr)
 256{
 257        struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
 258        struct tipc_link *l_ptr;
 259        struct tipc_msg *msg;
 260        char *if_name;
 261        char addr_string[16];
 262        u32 peer = n_ptr->addr;
 263
 264        if (n_ptr->link_cnt >= MAX_BEARERS) {
 265                tipc_addr_string_fill(addr_string, n_ptr->addr);
 266                pr_err("Attempt to establish %uth link to %s. Max %u allowed.\n",
 267                        n_ptr->link_cnt, addr_string, MAX_BEARERS);
 268                return NULL;
 269        }
 270
 271        if (n_ptr->links[b_ptr->identity]) {
 272                tipc_addr_string_fill(addr_string, n_ptr->addr);
 273                pr_err("Attempt to establish second link on <%s> to %s\n",
 274                       b_ptr->name, addr_string);
 275                return NULL;
 276        }
 277
 278        l_ptr = kzalloc(sizeof(*l_ptr), GFP_ATOMIC);
 279        if (!l_ptr) {
 280                pr_warn("Link creation failed, no memory\n");
 281                return NULL;
 282        }
 283        kref_init(&l_ptr->ref);
 284        l_ptr->addr = peer;
 285        if_name = strchr(b_ptr->name, ':') + 1;
 286        sprintf(l_ptr->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
 287                tipc_zone(tn->own_addr), tipc_cluster(tn->own_addr),
 288                tipc_node(tn->own_addr),
 289                if_name,
 290                tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
 291                /* note: peer i/f name is updated by reset/activate message */
 292        memcpy(&l_ptr->media_addr, media_addr, sizeof(*media_addr));
 293        l_ptr->owner = n_ptr;
 294        l_ptr->checkpoint = 1;
 295        l_ptr->peer_session = INVALID_SESSION;
 296        l_ptr->bearer_id = b_ptr->identity;
 297        link_set_supervision_props(l_ptr, b_ptr->tolerance);
 298        l_ptr->state = RESET_UNKNOWN;
 299
 300        l_ptr->pmsg = (struct tipc_msg *)&l_ptr->proto_msg;
 301        msg = l_ptr->pmsg;
 302        tipc_msg_init(tn->own_addr, msg, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE,
 303                      l_ptr->addr);
 304        msg_set_size(msg, sizeof(l_ptr->proto_msg));
 305        msg_set_session(msg, (tn->random & 0xffff));
 306        msg_set_bearer_id(msg, b_ptr->identity);
 307        strcpy((char *)msg_data(msg), if_name);
 308
 309        l_ptr->priority = b_ptr->priority;
 310        tipc_link_set_queue_limits(l_ptr, b_ptr->window);
 311
 312        l_ptr->net_plane = b_ptr->net_plane;
 313        link_init_max_pkt(l_ptr);
 314
 315        l_ptr->next_out_no = 1;
 316        __skb_queue_head_init(&l_ptr->outqueue);
 317        __skb_queue_head_init(&l_ptr->deferred_queue);
 318        skb_queue_head_init(&l_ptr->wakeupq);
 319        skb_queue_head_init(&l_ptr->inputq);
 320        skb_queue_head_init(&l_ptr->namedq);
 321        link_reset_statistics(l_ptr);
 322        tipc_node_attach_link(n_ptr, l_ptr);
 323        setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr);
 324        link_state_event(l_ptr, STARTING_EVT);
 325
 326        return l_ptr;
 327}
 328
 329/**
 330 * link_delete - Conditional deletion of link.
 331 *               If timer still running, real delete is done when it expires
 332 * @link: link to be deleted
 333 */
 334void tipc_link_delete(struct tipc_link *link)
 335{
 336        tipc_link_reset_fragments(link);
 337        tipc_node_detach_link(link->owner, link);
 338        tipc_link_put(link);
 339}
 340
 341void tipc_link_delete_list(struct net *net, unsigned int bearer_id,
 342                           bool shutting_down)
 343{
 344        struct tipc_net *tn = net_generic(net, tipc_net_id);
 345        struct tipc_link *link;
 346        struct tipc_node *node;
 347
 348        rcu_read_lock();
 349        list_for_each_entry_rcu(node, &tn->node_list, list) {
 350                tipc_node_lock(node);
 351                link = node->links[bearer_id];
 352                if (!link) {
 353                        tipc_node_unlock(node);
 354                        continue;
 355                }
 356                tipc_link_reset(link);
 357                if (del_timer(&link->timer))
 358                        tipc_link_put(link);
 359                link->flags |= LINK_STOPPED;
 360                /* Delete link now, or when failover is finished: */
 361                if (shutting_down || !tipc_node_is_up(node))
 362                        tipc_link_delete(link);
 363                tipc_node_unlock(node);
 364        }
 365        rcu_read_unlock();
 366}
 367
 368/**
 369 * link_schedule_user - schedule user for wakeup after congestion
 370 * @link: congested link
 371 * @oport: sending port
 372 * @chain_sz: size of buffer chain that was attempted sent
 373 * @imp: importance of message attempted sent
 374 * Create pseudo msg to send back to user when congestion abates
 375 */
 376static bool link_schedule_user(struct tipc_link *link, u32 oport,
 377                               uint chain_sz, uint imp)
 378{
 379        struct sk_buff *buf;
 380
 381        buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0,
 382                              link_own_addr(link), link_own_addr(link),
 383                              oport, 0, 0);
 384        if (!buf)
 385                return false;
 386        TIPC_SKB_CB(buf)->chain_sz = chain_sz;
 387        TIPC_SKB_CB(buf)->chain_imp = imp;
 388        skb_queue_tail(&link->wakeupq, buf);
 389        link->stats.link_congs++;
 390        return true;
 391}
 392
 393/**
 394 * link_prepare_wakeup - prepare users for wakeup after congestion
 395 * @link: congested link
 396 * Move a number of waiting users, as permitted by available space in
 397 * the send queue, from link wait queue to node wait queue for wakeup
 398 */
 399void link_prepare_wakeup(struct tipc_link *link)
 400{
 401        uint pend_qsz = skb_queue_len(&link->outqueue);
 402        struct sk_buff *skb, *tmp;
 403
 404        skb_queue_walk_safe(&link->wakeupq, skb, tmp) {
 405                if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp])
 406                        break;
 407                pend_qsz += TIPC_SKB_CB(skb)->chain_sz;
 408                skb_unlink(skb, &link->wakeupq);
 409                skb_queue_tail(&link->inputq, skb);
 410                link->owner->inputq = &link->inputq;
 411                link->owner->action_flags |= TIPC_MSG_EVT;
 412        }
 413}
 414
 415/**
 416 * tipc_link_reset_fragments - purge link's inbound message fragments queue
 417 * @l_ptr: pointer to link
 418 */
 419void tipc_link_reset_fragments(struct tipc_link *l_ptr)
 420{
 421        kfree_skb(l_ptr->reasm_buf);
 422        l_ptr->reasm_buf = NULL;
 423}
 424
 425/**
 426 * tipc_link_purge_queues - purge all pkt queues associated with link
 427 * @l_ptr: pointer to link
 428 */
 429void tipc_link_purge_queues(struct tipc_link *l_ptr)
 430{
 431        __skb_queue_purge(&l_ptr->deferred_queue);
 432        __skb_queue_purge(&l_ptr->outqueue);
 433        tipc_link_reset_fragments(l_ptr);
 434}
 435
 436void tipc_link_reset(struct tipc_link *l_ptr)
 437{
 438        u32 prev_state = l_ptr->state;
 439        u32 checkpoint = l_ptr->next_in_no;
 440        int was_active_link = tipc_link_is_active(l_ptr);
 441        struct tipc_node *owner = l_ptr->owner;
 442
 443        msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
 444
 445        /* Link is down, accept any session */
 446        l_ptr->peer_session = INVALID_SESSION;
 447
 448        /* Prepare for max packet size negotiation */
 449        link_init_max_pkt(l_ptr);
 450
 451        l_ptr->state = RESET_UNKNOWN;
 452
 453        if ((prev_state == RESET_UNKNOWN) || (prev_state == RESET_RESET))
 454                return;
 455
 456        tipc_node_link_down(l_ptr->owner, l_ptr);
 457        tipc_bearer_remove_dest(owner->net, l_ptr->bearer_id, l_ptr->addr);
 458
 459        if (was_active_link && tipc_node_active_links(l_ptr->owner)) {
 460                l_ptr->reset_checkpoint = checkpoint;
 461                l_ptr->exp_msg_count = START_CHANGEOVER;
 462        }
 463
 464        /* Clean up all queues, except inputq: */
 465        __skb_queue_purge(&l_ptr->outqueue);
 466        __skb_queue_purge(&l_ptr->deferred_queue);
 467        if (!owner->inputq)
 468                owner->inputq = &l_ptr->inputq;
 469        skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
 470        if (!skb_queue_empty(owner->inputq))
 471                owner->action_flags |= TIPC_MSG_EVT;
 472        l_ptr->next_out = NULL;
 473        l_ptr->unacked_window = 0;
 474        l_ptr->checkpoint = 1;
 475        l_ptr->next_out_no = 1;
 476        l_ptr->fsm_msg_cnt = 0;
 477        l_ptr->stale_count = 0;
 478        link_reset_statistics(l_ptr);
 479}
 480
 481void tipc_link_reset_list(struct net *net, unsigned int bearer_id)
 482{
 483        struct tipc_net *tn = net_generic(net, tipc_net_id);
 484        struct tipc_link *l_ptr;
 485        struct tipc_node *n_ptr;
 486
 487        rcu_read_lock();
 488        list_for_each_entry_rcu(n_ptr, &tn->node_list, list) {
 489                tipc_node_lock(n_ptr);
 490                l_ptr = n_ptr->links[bearer_id];
 491                if (l_ptr)
 492                        tipc_link_reset(l_ptr);
 493                tipc_node_unlock(n_ptr);
 494        }
 495        rcu_read_unlock();
 496}
 497
 498static void link_activate(struct tipc_link *link)
 499{
 500        struct tipc_node *node = link->owner;
 501
 502        link->next_in_no = 1;
 503        link->stats.recv_info = 1;
 504        tipc_node_link_up(node, link);
 505        tipc_bearer_add_dest(node->net, link->bearer_id, link->addr);
 506}
 507
 508/**
 509 * link_state_event - link finite state machine
 510 * @l_ptr: pointer to link
 511 * @event: state machine event to process
 512 */
 513static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
 514{
 515        struct tipc_link *other;
 516        unsigned long cont_intv = l_ptr->cont_intv;
 517
 518        if (l_ptr->flags & LINK_STOPPED)
 519                return;
 520
 521        if (!(l_ptr->flags & LINK_STARTED) && (event != STARTING_EVT))
 522                return;         /* Not yet. */
 523
 524        /* Check whether changeover is going on */
 525        if (l_ptr->exp_msg_count) {
 526                if (event == TIMEOUT_EVT)
 527                        link_set_timer(l_ptr, cont_intv);
 528                return;
 529        }
 530
 531        switch (l_ptr->state) {
 532        case WORKING_WORKING:
 533                switch (event) {
 534                case TRAFFIC_MSG_EVT:
 535                case ACTIVATE_MSG:
 536                        break;
 537                case TIMEOUT_EVT:
 538                        if (l_ptr->next_in_no != l_ptr->checkpoint) {
 539                                l_ptr->checkpoint = l_ptr->next_in_no;
 540                                if (tipc_bclink_acks_missing(l_ptr->owner)) {
 541                                        tipc_link_proto_xmit(l_ptr, STATE_MSG,
 542                                                             0, 0, 0, 0, 0);
 543                                        l_ptr->fsm_msg_cnt++;
 544                                } else if (l_ptr->max_pkt < l_ptr->max_pkt_target) {
 545                                        tipc_link_proto_xmit(l_ptr, STATE_MSG,
 546                                                             1, 0, 0, 0, 0);
 547                                        l_ptr->fsm_msg_cnt++;
 548                                }
 549                                link_set_timer(l_ptr, cont_intv);
 550                                break;
 551                        }
 552                        l_ptr->state = WORKING_UNKNOWN;
 553                        l_ptr->fsm_msg_cnt = 0;
 554                        tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
 555                        l_ptr->fsm_msg_cnt++;
 556                        link_set_timer(l_ptr, cont_intv / 4);
 557                        break;
 558                case RESET_MSG:
 559                        pr_debug("%s<%s>, requested by peer\n",
 560                                 link_rst_msg, l_ptr->name);
 561                        tipc_link_reset(l_ptr);
 562                        l_ptr->state = RESET_RESET;
 563                        l_ptr->fsm_msg_cnt = 0;
 564                        tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
 565                                             0, 0, 0, 0, 0);
 566                        l_ptr->fsm_msg_cnt++;
 567                        link_set_timer(l_ptr, cont_intv);
 568                        break;
 569                default:
 570                        pr_debug("%s%u in WW state\n", link_unk_evt, event);
 571                }
 572                break;
 573        case WORKING_UNKNOWN:
 574                switch (event) {
 575                case TRAFFIC_MSG_EVT:
 576                case ACTIVATE_MSG:
 577                        l_ptr->state = WORKING_WORKING;
 578                        l_ptr->fsm_msg_cnt = 0;
 579                        link_set_timer(l_ptr, cont_intv);
 580                        break;
 581                case RESET_MSG:
 582                        pr_debug("%s<%s>, requested by peer while probing\n",
 583                                 link_rst_msg, l_ptr->name);
 584                        tipc_link_reset(l_ptr);
 585                        l_ptr->state = RESET_RESET;
 586                        l_ptr->fsm_msg_cnt = 0;
 587                        tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
 588                                             0, 0, 0, 0, 0);
 589                        l_ptr->fsm_msg_cnt++;
 590                        link_set_timer(l_ptr, cont_intv);
 591                        break;
 592                case TIMEOUT_EVT:
 593                        if (l_ptr->next_in_no != l_ptr->checkpoint) {
 594                                l_ptr->state = WORKING_WORKING;
 595                                l_ptr->fsm_msg_cnt = 0;
 596                                l_ptr->checkpoint = l_ptr->next_in_no;
 597                                if (tipc_bclink_acks_missing(l_ptr->owner)) {
 598                                        tipc_link_proto_xmit(l_ptr, STATE_MSG,
 599                                                             0, 0, 0, 0, 0);
 600                                        l_ptr->fsm_msg_cnt++;
 601                                }
 602                                link_set_timer(l_ptr, cont_intv);
 603                        } else if (l_ptr->fsm_msg_cnt < l_ptr->abort_limit) {
 604                                tipc_link_proto_xmit(l_ptr, STATE_MSG,
 605                                                     1, 0, 0, 0, 0);
 606                                l_ptr->fsm_msg_cnt++;
 607                                link_set_timer(l_ptr, cont_intv / 4);
 608                        } else {        /* Link has failed */
 609                                pr_debug("%s<%s>, peer not responding\n",
 610                                         link_rst_msg, l_ptr->name);
 611                                tipc_link_reset(l_ptr);
 612                                l_ptr->state = RESET_UNKNOWN;
 613                                l_ptr->fsm_msg_cnt = 0;
 614                                tipc_link_proto_xmit(l_ptr, RESET_MSG,
 615                                                     0, 0, 0, 0, 0);
 616                                l_ptr->fsm_msg_cnt++;
 617                                link_set_timer(l_ptr, cont_intv);
 618                        }
 619                        break;
 620                default:
 621                        pr_err("%s%u in WU state\n", link_unk_evt, event);
 622                }
 623                break;
 624        case RESET_UNKNOWN:
 625                switch (event) {
 626                case TRAFFIC_MSG_EVT:
 627                        break;
 628                case ACTIVATE_MSG:
 629                        other = l_ptr->owner->active_links[0];
 630                        if (other && link_working_unknown(other))
 631                                break;
 632                        l_ptr->state = WORKING_WORKING;
 633                        l_ptr->fsm_msg_cnt = 0;
 634                        link_activate(l_ptr);
 635                        tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
 636                        l_ptr->fsm_msg_cnt++;
 637                        if (l_ptr->owner->working_links == 1)
 638                                tipc_link_sync_xmit(l_ptr);
 639                        link_set_timer(l_ptr, cont_intv);
 640                        break;
 641                case RESET_MSG:
 642                        l_ptr->state = RESET_RESET;
 643                        l_ptr->fsm_msg_cnt = 0;
 644                        tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
 645                                             1, 0, 0, 0, 0);
 646                        l_ptr->fsm_msg_cnt++;
 647                        link_set_timer(l_ptr, cont_intv);
 648                        break;
 649                case STARTING_EVT:
 650                        l_ptr->flags |= LINK_STARTED;
 651                        l_ptr->fsm_msg_cnt++;
 652                        link_set_timer(l_ptr, cont_intv);
 653                        break;
 654                case TIMEOUT_EVT:
 655                        tipc_link_proto_xmit(l_ptr, RESET_MSG, 0, 0, 0, 0, 0);
 656                        l_ptr->fsm_msg_cnt++;
 657                        link_set_timer(l_ptr, cont_intv);
 658                        break;
 659                default:
 660                        pr_err("%s%u in RU state\n", link_unk_evt, event);
 661                }
 662                break;
 663        case RESET_RESET:
 664                switch (event) {
 665                case TRAFFIC_MSG_EVT:
 666                case ACTIVATE_MSG:
 667                        other = l_ptr->owner->active_links[0];
 668                        if (other && link_working_unknown(other))
 669                                break;
 670                        l_ptr->state = WORKING_WORKING;
 671                        l_ptr->fsm_msg_cnt = 0;
 672                        link_activate(l_ptr);
 673                        tipc_link_proto_xmit(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
 674                        l_ptr->fsm_msg_cnt++;
 675                        if (l_ptr->owner->working_links == 1)
 676                                tipc_link_sync_xmit(l_ptr);
 677                        link_set_timer(l_ptr, cont_intv);
 678                        break;
 679                case RESET_MSG:
 680                        break;
 681                case TIMEOUT_EVT:
 682                        tipc_link_proto_xmit(l_ptr, ACTIVATE_MSG,
 683                                             0, 0, 0, 0, 0);
 684                        l_ptr->fsm_msg_cnt++;
 685                        link_set_timer(l_ptr, cont_intv);
 686                        break;
 687                default:
 688                        pr_err("%s%u in RR state\n", link_unk_evt, event);
 689                }
 690                break;
 691        default:
 692                pr_err("Unknown link state %u/%u\n", l_ptr->state, event);
 693        }
 694}
 695
 696/* tipc_link_cong: determine return value and how to treat the
 697 * sent buffer during link congestion.
 698 * - For plain, errorless user data messages we keep the buffer and
 699 *   return -ELINKONG.
 700 * - For all other messages we discard the buffer and return -EHOSTUNREACH
 701 * - For TIPC internal messages we also reset the link
 702 */
 703static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
 704{
 705        struct sk_buff *skb = skb_peek(list);
 706        struct tipc_msg *msg = buf_msg(skb);
 707        uint imp = tipc_msg_tot_importance(msg);
 708        u32 oport = msg_tot_origport(msg);
 709
 710        if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
 711                pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
 712                tipc_link_reset(link);
 713                goto drop;
 714        }
 715        if (unlikely(msg_errcode(msg)))
 716                goto drop;
 717        if (unlikely(msg_reroute_cnt(msg)))
 718                goto drop;
 719        if (TIPC_SKB_CB(skb)->wakeup_pending)
 720                return -ELINKCONG;
 721        if (link_schedule_user(link, oport, skb_queue_len(list), imp))
 722                return -ELINKCONG;
 723drop:
 724        __skb_queue_purge(list);
 725        return -EHOSTUNREACH;
 726}
 727
 728/**
 729 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
 730 * @link: link to use
 731 * @list: chain of buffers containing message
 732 *
 733 * Consumes the buffer chain, except when returning -ELINKCONG
 734 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
 735 * user data messages) or -EHOSTUNREACH (all other messages/senders)
 736 * Only the socket functions tipc_send_stream() and tipc_send_packet() need
 737 * to act on the return value, since they may need to do more send attempts.
 738 */
 739int __tipc_link_xmit(struct net *net, struct tipc_link *link,
 740                     struct sk_buff_head *list)
 741{
 742        struct tipc_msg *msg = buf_msg(skb_peek(list));
 743        uint psz = msg_size(msg);
 744        uint sndlim = link->queue_limit[0];
 745        uint imp = tipc_msg_tot_importance(msg);
 746        uint mtu = link->max_pkt;
 747        uint ack = mod(link->next_in_no - 1);
 748        uint seqno = link->next_out_no;
 749        uint bc_last_in = link->owner->bclink.last_in;
 750        struct tipc_media_addr *addr = &link->media_addr;
 751        struct sk_buff_head *outqueue = &link->outqueue;
 752        struct sk_buff *skb, *tmp;
 753
 754        /* Match queue limits against msg importance: */
 755        if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp]))
 756                return tipc_link_cong(link, list);
 757
 758        /* Has valid packet limit been used ? */
 759        if (unlikely(psz > mtu)) {
 760                __skb_queue_purge(list);
 761                return -EMSGSIZE;
 762        }
 763
 764        /* Prepare each packet for sending, and add to outqueue: */
 765        skb_queue_walk_safe(list, skb, tmp) {
 766                __skb_unlink(skb, list);
 767                msg = buf_msg(skb);
 768                msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
 769                msg_set_bcast_ack(msg, bc_last_in);
 770
 771                if (skb_queue_len(outqueue) < sndlim) {
 772                        __skb_queue_tail(outqueue, skb);
 773                        tipc_bearer_send(net, link->bearer_id,
 774                                         skb, addr);
 775                        link->next_out = NULL;
 776                        link->unacked_window = 0;
 777                } else if (tipc_msg_bundle(outqueue, skb, mtu)) {
 778                        link->stats.sent_bundled++;
 779                        continue;
 780                } else if (tipc_msg_make_bundle(outqueue, skb, mtu,
 781                                                link->addr)) {
 782                        link->stats.sent_bundled++;
 783                        link->stats.sent_bundles++;
 784                        if (!link->next_out)
 785                                link->next_out = skb_peek_tail(outqueue);
 786                } else {
 787                        __skb_queue_tail(outqueue, skb);
 788                        if (!link->next_out)
 789                                link->next_out = skb;
 790                }
 791                seqno++;
 792        }
 793        link->next_out_no = seqno;
 794        return 0;
 795}
 796
 797static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
 798{
 799        skb_queue_head_init(list);
 800        __skb_queue_tail(list, skb);
 801}
 802
 803static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
 804{
 805        struct sk_buff_head head;
 806
 807        skb2list(skb, &head);
 808        return __tipc_link_xmit(link->owner->net, link, &head);
 809}
 810
 811int tipc_link_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
 812                       u32 selector)
 813{
 814        struct sk_buff_head head;
 815
 816        skb2list(skb, &head);
 817        return tipc_link_xmit(net, &head, dnode, selector);
 818}
 819
 820/**
 821 * tipc_link_xmit() is the general link level function for message sending
 822 * @net: the applicable net namespace
 823 * @list: chain of buffers containing message
 824 * @dsz: amount of user data to be sent
 825 * @dnode: address of destination node
 826 * @selector: a number used for deterministic link selection
 827 * Consumes the buffer chain, except when returning -ELINKCONG
 828 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
 829 */
 830int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
 831                   u32 selector)
 832{
 833        struct tipc_link *link = NULL;
 834        struct tipc_node *node;
 835        int rc = -EHOSTUNREACH;
 836
 837        node = tipc_node_find(net, dnode);
 838        if (node) {
 839                tipc_node_lock(node);
 840                link = node->active_links[selector & 1];
 841                if (link)
 842                        rc = __tipc_link_xmit(net, link, list);
 843                tipc_node_unlock(node);
 844        }
 845        if (link)
 846                return rc;
 847
 848        if (likely(in_own_node(net, dnode)))
 849                return tipc_sk_rcv(net, list);
 850
 851        __skb_queue_purge(list);
 852        return rc;
 853}
 854
 855/*
 856 * tipc_link_sync_xmit - synchronize broadcast link endpoints.
 857 *
 858 * Give a newly added peer node the sequence number where it should
 859 * start receiving and acking broadcast packets.
 860 *
 861 * Called with node locked
 862 */
 863static void tipc_link_sync_xmit(struct tipc_link *link)
 864{
 865        struct sk_buff *skb;
 866        struct tipc_msg *msg;
 867
 868        skb = tipc_buf_acquire(INT_H_SIZE);
 869        if (!skb)
 870                return;
 871
 872        msg = buf_msg(skb);
 873        tipc_msg_init(link_own_addr(link), msg, BCAST_PROTOCOL, STATE_MSG,
 874                      INT_H_SIZE, link->addr);
 875        msg_set_last_bcast(msg, link->owner->bclink.acked);
 876        __tipc_link_xmit_skb(link, skb);
 877}
 878
 879/*
 880 * tipc_link_sync_rcv - synchronize broadcast link endpoints.
 881 * Receive the sequence number where we should start receiving and
 882 * acking broadcast packets from a newly added peer node, and open
 883 * up for reception of such packets.
 884 *
 885 * Called with node locked
 886 */
 887static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
 888{
 889        struct tipc_msg *msg = buf_msg(buf);
 890
 891        n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
 892        n->bclink.recv_permitted = true;
 893        kfree_skb(buf);
 894}
 895
 896struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
 897                                    const struct sk_buff *skb)
 898{
 899        if (skb_queue_is_last(list, skb))
 900                return NULL;
 901        return skb->next;
 902}
 903
 904/*
 905 * tipc_link_push_packets - push unsent packets to bearer
 906 *
 907 * Push out the unsent messages of a link where congestion
 908 * has abated. Node is locked.
 909 *
 910 * Called with node locked
 911 */
 912void tipc_link_push_packets(struct tipc_link *l_ptr)
 913{
 914        struct sk_buff_head *outqueue = &l_ptr->outqueue;
 915        struct sk_buff *skb = l_ptr->next_out;
 916        struct tipc_msg *msg;
 917        u32 next, first;
 918
 919        skb_queue_walk_from(outqueue, skb) {
 920                msg = buf_msg(skb);
 921                next = msg_seqno(msg);
 922                first = buf_seqno(skb_peek(outqueue));
 923
 924                if (mod(next - first) < l_ptr->queue_limit[0]) {
 925                        msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
 926                        msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
 927                        if (msg_user(msg) == MSG_BUNDLER)
 928                                TIPC_SKB_CB(skb)->bundling = false;
 929                        tipc_bearer_send(l_ptr->owner->net,
 930                                         l_ptr->bearer_id, skb,
 931                                         &l_ptr->media_addr);
 932                        l_ptr->next_out = tipc_skb_queue_next(outqueue, skb);
 933                } else {
 934                        break;
 935                }
 936        }
 937}
 938
 939void tipc_link_reset_all(struct tipc_node *node)
 940{
 941        char addr_string[16];
 942        u32 i;
 943
 944        tipc_node_lock(node);
 945
 946        pr_warn("Resetting all links to %s\n",
 947                tipc_addr_string_fill(addr_string, node->addr));
 948
 949        for (i = 0; i < MAX_BEARERS; i++) {
 950                if (node->links[i]) {
 951                        link_print(node->links[i], "Resetting link\n");
 952                        tipc_link_reset(node->links[i]);
 953                }
 954        }
 955
 956        tipc_node_unlock(node);
 957}
 958
 959static void link_retransmit_failure(struct tipc_link *l_ptr,
 960                                    struct sk_buff *buf)
 961{
 962        struct tipc_msg *msg = buf_msg(buf);
 963        struct net *net = l_ptr->owner->net;
 964
 965        pr_warn("Retransmission failure on link <%s>\n", l_ptr->name);
 966
 967        if (l_ptr->addr) {
 968                /* Handle failure on standard link */
 969                link_print(l_ptr, "Resetting link\n");
 970                tipc_link_reset(l_ptr);
 971
 972        } else {
 973                /* Handle failure on broadcast link */
 974                struct tipc_node *n_ptr;
 975                char addr_string[16];
 976
 977                pr_info("Msg seq number: %u,  ", msg_seqno(msg));
 978                pr_cont("Outstanding acks: %lu\n",
 979                        (unsigned long) TIPC_SKB_CB(buf)->handle);
 980
 981                n_ptr = tipc_bclink_retransmit_to(net);
 982                tipc_node_lock(n_ptr);
 983
 984                tipc_addr_string_fill(addr_string, n_ptr->addr);
 985                pr_info("Broadcast link info for %s\n", addr_string);
 986                pr_info("Reception permitted: %d,  Acked: %u\n",
 987                        n_ptr->bclink.recv_permitted,
 988                        n_ptr->bclink.acked);
 989                pr_info("Last in: %u,  Oos state: %u,  Last sent: %u\n",
 990                        n_ptr->bclink.last_in,
 991                        n_ptr->bclink.oos_state,
 992                        n_ptr->bclink.last_sent);
 993
 994                tipc_node_unlock(n_ptr);
 995
 996                tipc_bclink_set_flags(net, TIPC_BCLINK_RESET);
 997                l_ptr->stale_count = 0;
 998        }
 999}
1000
1001void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
1002                          u32 retransmits)
1003{
1004        struct tipc_msg *msg;
1005
1006        if (!skb)
1007                return;
1008
1009        msg = buf_msg(skb);
1010
1011        /* Detect repeated retransmit failures */
1012        if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1013                if (++l_ptr->stale_count > 100) {
1014                        link_retransmit_failure(l_ptr, skb);
1015                        return;
1016                }
1017        } else {
1018                l_ptr->last_retransmitted = msg_seqno(msg);
1019                l_ptr->stale_count = 1;
1020        }
1021
1022        skb_queue_walk_from(&l_ptr->outqueue, skb) {
1023                if (!retransmits || skb == l_ptr->next_out)
1024                        break;
1025                msg = buf_msg(skb);
1026                msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1027                msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1028                tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, skb,
1029                                 &l_ptr->media_addr);
1030                retransmits--;
1031                l_ptr->stats.retransmitted++;
1032        }
1033}
1034
1035static void link_retrieve_defq(struct tipc_link *link,
1036                               struct sk_buff_head *list)
1037{
1038        u32 seq_no;
1039
1040        if (skb_queue_empty(&link->deferred_queue))
1041                return;
1042
1043        seq_no = buf_seqno(skb_peek(&link->deferred_queue));
1044        if (seq_no == mod(link->next_in_no))
1045                skb_queue_splice_tail_init(&link->deferred_queue, list);
1046}
1047
1048/**
1049 * link_recv_buf_validate - validate basic format of received message
1050 *
1051 * This routine ensures a TIPC message has an acceptable header, and at least
1052 * as much data as the header indicates it should.  The routine also ensures
1053 * that the entire message header is stored in the main fragment of the message
1054 * buffer, to simplify future access to message header fields.
1055 *
1056 * Note: Having extra info present in the message header or data areas is OK.
1057 * TIPC will ignore the excess, under the assumption that it is optional info
1058 * introduced by a later release of the protocol.
1059 */
1060static int link_recv_buf_validate(struct sk_buff *buf)
1061{
1062        static u32 min_data_hdr_size[8] = {
1063                SHORT_H_SIZE, MCAST_H_SIZE, NAMED_H_SIZE, BASIC_H_SIZE,
1064                MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
1065                };
1066
1067        struct tipc_msg *msg;
1068        u32 tipc_hdr[2];
1069        u32 size;
1070        u32 hdr_size;
1071        u32 min_hdr_size;
1072
1073        /* If this packet comes from the defer queue, the skb has already
1074         * been validated
1075         */
1076        if (unlikely(TIPC_SKB_CB(buf)->deferred))
1077                return 1;
1078
1079        if (unlikely(buf->len < MIN_H_SIZE))
1080                return 0;
1081
1082        msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
1083        if (msg == NULL)
1084                return 0;
1085
1086        if (unlikely(msg_version(msg) != TIPC_VERSION))
1087                return 0;
1088
1089        size = msg_size(msg);
1090        hdr_size = msg_hdr_sz(msg);
1091        min_hdr_size = msg_isdata(msg) ?
1092                min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
1093
1094        if (unlikely((hdr_size < min_hdr_size) ||
1095                     (size < hdr_size) ||
1096                     (buf->len < size) ||
1097                     (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
1098                return 0;
1099
1100        return pskb_may_pull(buf, hdr_size);
1101}
1102
1103/**
1104 * tipc_rcv - process TIPC packets/messages arriving from off-node
1105 * @net: the applicable net namespace
1106 * @skb: TIPC packet
1107 * @b_ptr: pointer to bearer message arrived on
1108 *
1109 * Invoked with no locks held.  Bearer pointer must point to a valid bearer
1110 * structure (i.e. cannot be NULL), but bearer can be inactive.
1111 */
1112void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1113{
1114        struct tipc_net *tn = net_generic(net, tipc_net_id);
1115        struct sk_buff_head head;
1116        struct tipc_node *n_ptr;
1117        struct tipc_link *l_ptr;
1118        struct sk_buff *skb1, *tmp;
1119        struct tipc_msg *msg;
1120        u32 seq_no;
1121        u32 ackd;
1122        u32 released;
1123
1124        skb2list(skb, &head);
1125
1126        while ((skb = __skb_dequeue(&head))) {
1127                /* Ensure message is well-formed */
1128                if (unlikely(!link_recv_buf_validate(skb)))
1129                        goto discard;
1130
1131                /* Ensure message data is a single contiguous unit */
1132                if (unlikely(skb_linearize(skb)))
1133                        goto discard;
1134
1135                /* Handle arrival of a non-unicast link message */
1136                msg = buf_msg(skb);
1137
1138                if (unlikely(msg_non_seq(msg))) {
1139                        if (msg_user(msg) ==  LINK_CONFIG)
1140                                tipc_disc_rcv(net, skb, b_ptr);
1141                        else
1142                                tipc_bclink_rcv(net, skb);
1143                        continue;
1144                }
1145
1146                /* Discard unicast link messages destined for another node */
1147                if (unlikely(!msg_short(msg) &&
1148                             (msg_destnode(msg) != tn->own_addr)))
1149                        goto discard;
1150
1151                /* Locate neighboring node that sent message */
1152                n_ptr = tipc_node_find(net, msg_prevnode(msg));
1153                if (unlikely(!n_ptr))
1154                        goto discard;
1155                tipc_node_lock(n_ptr);
1156
1157                /* Locate unicast link endpoint that should handle message */
1158                l_ptr = n_ptr->links[b_ptr->identity];
1159                if (unlikely(!l_ptr))
1160                        goto unlock;
1161
1162                /* Verify that communication with node is currently allowed */
1163                if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) &&
1164                    msg_user(msg) == LINK_PROTOCOL &&
1165                    (msg_type(msg) == RESET_MSG ||
1166                    msg_type(msg) == ACTIVATE_MSG) &&
1167                    !msg_redundant_link(msg))
1168                        n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN;
1169
1170                if (tipc_node_blocked(n_ptr))
1171                        goto unlock;
1172
1173                /* Validate message sequence number info */
1174                seq_no = msg_seqno(msg);
1175                ackd = msg_ack(msg);
1176
1177                /* Release acked messages */
1178                if (n_ptr->bclink.recv_permitted)
1179                        tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1180
1181                released = 0;
1182                skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) {
1183                        if (skb1 == l_ptr->next_out ||
1184                            more(buf_seqno(skb1), ackd))
1185                                break;
1186                         __skb_unlink(skb1, &l_ptr->outqueue);
1187                         kfree_skb(skb1);
1188                         released = 1;
1189                }
1190
1191                /* Try sending any messages link endpoint has pending */
1192                if (unlikely(l_ptr->next_out))
1193                        tipc_link_push_packets(l_ptr);
1194
1195                if (released && !skb_queue_empty(&l_ptr->wakeupq))
1196                        link_prepare_wakeup(l_ptr);
1197
1198                /* Process the incoming packet */
1199                if (unlikely(!link_working_working(l_ptr))) {
1200                        if (msg_user(msg) == LINK_PROTOCOL) {
1201                                tipc_link_proto_rcv(l_ptr, skb);
1202                                link_retrieve_defq(l_ptr, &head);
1203                                skb = NULL;
1204                                goto unlock;
1205                        }
1206
1207                        /* Traffic message. Conditionally activate link */
1208                        link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1209
1210                        if (link_working_working(l_ptr)) {
1211                                /* Re-insert buffer in front of queue */
1212                                __skb_queue_head(&head, skb);
1213                                skb = NULL;
1214                                goto unlock;
1215                        }
1216                        goto unlock;
1217                }
1218
1219                /* Link is now in state WORKING_WORKING */
1220                if (unlikely(seq_no != mod(l_ptr->next_in_no))) {
1221                        link_handle_out_of_seq_msg(l_ptr, skb);
1222                        link_retrieve_defq(l_ptr, &head);
1223                        skb = NULL;
1224                        goto unlock;
1225                }
1226                l_ptr->next_in_no++;
1227                if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue)))
1228                        link_retrieve_defq(l_ptr, &head);
1229
1230                if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1231                        l_ptr->stats.sent_acks++;
1232                        tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1233                }
1234                tipc_link_input(l_ptr, skb);
1235                skb = NULL;
1236unlock:
1237                tipc_node_unlock(n_ptr);
1238discard:
1239                if (unlikely(skb))
1240                        kfree_skb(skb);
1241        }
1242}
1243
1244/* tipc_data_input - deliver data and name distr msgs to upper layer
1245 *
1246 * Consumes buffer if message is of right type
1247 * Node lock must be held
1248 */
1249static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
1250{
1251        struct tipc_node *node = link->owner;
1252        struct tipc_msg *msg = buf_msg(skb);
1253        u32 dport = msg_destport(msg);
1254
1255        switch (msg_user(msg)) {
1256        case TIPC_LOW_IMPORTANCE:
1257        case TIPC_MEDIUM_IMPORTANCE:
1258        case TIPC_HIGH_IMPORTANCE:
1259        case TIPC_CRITICAL_IMPORTANCE:
1260        case CONN_MANAGER:
1261                if (tipc_skb_queue_tail(&link->inputq, skb, dport)) {
1262                        node->inputq = &link->inputq;
1263                        node->action_flags |= TIPC_MSG_EVT;
1264                }
1265                return true;
1266        case NAME_DISTRIBUTOR:
1267                node->bclink.recv_permitted = true;
1268                node->namedq = &link->namedq;
1269                skb_queue_tail(&link->namedq, skb);
1270                if (skb_queue_len(&link->namedq) == 1)
1271                        node->action_flags |= TIPC_NAMED_MSG_EVT;
1272                return true;
1273        case MSG_BUNDLER:
1274        case CHANGEOVER_PROTOCOL:
1275        case MSG_FRAGMENTER:
1276        case BCAST_PROTOCOL:
1277                return false;
1278        default:
1279                pr_warn("Dropping received illegal msg type\n");
1280                kfree_skb(skb);
1281                return false;
1282        };
1283}
1284
1285/* tipc_link_input - process packet that has passed link protocol check
1286 *
1287 * Consumes buffer
1288 * Node lock must be held
1289 */
1290static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
1291{
1292        struct tipc_node *node = link->owner;
1293        struct tipc_msg *msg = buf_msg(skb);
1294        struct sk_buff *iskb;
1295        int pos = 0;
1296
1297        if (likely(tipc_data_input(link, skb)))
1298                return;
1299
1300        switch (msg_user(msg)) {
1301        case CHANGEOVER_PROTOCOL:
1302                if (!tipc_link_tunnel_rcv(node, &skb))
1303                        break;
1304                if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
1305                        tipc_data_input(link, skb);
1306                        break;
1307                }
1308        case MSG_BUNDLER:
1309                link->stats.recv_bundles++;
1310                link->stats.recv_bundled += msg_msgcnt(msg);
1311
1312                while (tipc_msg_extract(skb, &iskb, &pos))
1313                        tipc_data_input(link, iskb);
1314                break;
1315        case MSG_FRAGMENTER:
1316                link->stats.recv_fragments++;
1317                if (tipc_buf_append(&link->reasm_buf, &skb)) {
1318                        link->stats.recv_fragmented++;
1319                        tipc_data_input(link, skb);
1320                } else if (!link->reasm_buf) {
1321                        tipc_link_reset(link);
1322                }
1323                break;
1324        case BCAST_PROTOCOL:
1325                tipc_link_sync_rcv(node, skb);
1326                break;
1327        default:
1328                break;
1329        };
1330}
1331
1332/**
1333 * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1334 *
1335 * Returns increase in queue length (i.e. 0 or 1)
1336 */
1337u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
1338{
1339        struct sk_buff *skb1;
1340        u32 seq_no = buf_seqno(skb);
1341
1342        /* Empty queue ? */
1343        if (skb_queue_empty(list)) {
1344                __skb_queue_tail(list, skb);
1345                return 1;
1346        }
1347
1348        /* Last ? */
1349        if (less(buf_seqno(skb_peek_tail(list)), seq_no)) {
1350                __skb_queue_tail(list, skb);
1351                return 1;
1352        }
1353
1354        /* Locate insertion point in queue, then insert; discard if duplicate */
1355        skb_queue_walk(list, skb1) {
1356                u32 curr_seqno = buf_seqno(skb1);
1357
1358                if (seq_no == curr_seqno) {
1359                        kfree_skb(skb);
1360                        return 0;
1361                }
1362
1363                if (less(seq_no, curr_seqno))
1364                        break;
1365        }
1366
1367        __skb_queue_before(list, skb1, skb);
1368        return 1;
1369}
1370
1371/*
1372 * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
1373 */
1374static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1375                                       struct sk_buff *buf)
1376{
1377        u32 seq_no = buf_seqno(buf);
1378
1379        if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
1380                tipc_link_proto_rcv(l_ptr, buf);
1381                return;
1382        }
1383
1384        /* Record OOS packet arrival (force mismatch on next timeout) */
1385        l_ptr->checkpoint--;
1386
1387        /*
1388         * Discard packet if a duplicate; otherwise add it to deferred queue
1389         * and notify peer of gap as per protocol specification
1390         */
1391        if (less(seq_no, mod(l_ptr->next_in_no))) {
1392                l_ptr->stats.duplicates++;
1393                kfree_skb(buf);
1394                return;
1395        }
1396
1397        if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) {
1398                l_ptr->stats.deferred_recv++;
1399                TIPC_SKB_CB(buf)->deferred = true;
1400                if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1)
1401                        tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1402        } else {
1403                l_ptr->stats.duplicates++;
1404        }
1405}
1406
1407/*
1408 * Send protocol message to the other endpoint.
1409 */
1410void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1411                          u32 gap, u32 tolerance, u32 priority, u32 ack_mtu)
1412{
1413        struct sk_buff *buf = NULL;
1414        struct tipc_msg *msg = l_ptr->pmsg;
1415        u32 msg_size = sizeof(l_ptr->proto_msg);
1416        int r_flag;
1417
1418        /* Don't send protocol message during link changeover */
1419        if (l_ptr->exp_msg_count)
1420                return;
1421
1422        /* Abort non-RESET send if communication with node is prohibited */
1423        if ((tipc_node_blocked(l_ptr->owner)) && (msg_typ != RESET_MSG))
1424                return;
1425
1426        /* Create protocol message with "out-of-sequence" sequence number */
1427        msg_set_type(msg, msg_typ);
1428        msg_set_net_plane(msg, l_ptr->net_plane);
1429        msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1430        msg_set_last_bcast(msg, tipc_bclink_get_last_sent(l_ptr->owner->net));
1431
1432        if (msg_typ == STATE_MSG) {
1433                u32 next_sent = mod(l_ptr->next_out_no);
1434
1435                if (!tipc_link_is_up(l_ptr))
1436                        return;
1437                if (l_ptr->next_out)
1438                        next_sent = buf_seqno(l_ptr->next_out);
1439                msg_set_next_sent(msg, next_sent);
1440                if (!skb_queue_empty(&l_ptr->deferred_queue)) {
1441                        u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue));
1442                        gap = mod(rec - mod(l_ptr->next_in_no));
1443                }
1444                msg_set_seq_gap(msg, gap);
1445                if (gap)
1446                        l_ptr->stats.sent_nacks++;
1447                msg_set_link_tolerance(msg, tolerance);
1448                msg_set_linkprio(msg, priority);
1449                msg_set_max_pkt(msg, ack_mtu);
1450                msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1451                msg_set_probe(msg, probe_msg != 0);
1452                if (probe_msg) {
1453                        u32 mtu = l_ptr->max_pkt;
1454
1455                        if ((mtu < l_ptr->max_pkt_target) &&
1456                            link_working_working(l_ptr) &&
1457                            l_ptr->fsm_msg_cnt) {
1458                                msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1459                                if (l_ptr->max_pkt_probes == 10) {
1460                                        l_ptr->max_pkt_target = (msg_size - 4);
1461                                        l_ptr->max_pkt_probes = 0;
1462                                        msg_size = (mtu + (l_ptr->max_pkt_target - mtu)/2 + 2) & ~3;
1463                                }
1464                                l_ptr->max_pkt_probes++;
1465                        }
1466
1467                        l_ptr->stats.sent_probes++;
1468                }
1469                l_ptr->stats.sent_states++;
1470        } else {                /* RESET_MSG or ACTIVATE_MSG */
1471                msg_set_ack(msg, mod(l_ptr->reset_checkpoint - 1));
1472                msg_set_seq_gap(msg, 0);
1473                msg_set_next_sent(msg, 1);
1474                msg_set_probe(msg, 0);
1475                msg_set_link_tolerance(msg, l_ptr->tolerance);
1476                msg_set_linkprio(msg, l_ptr->priority);
1477                msg_set_max_pkt(msg, l_ptr->max_pkt_target);
1478        }
1479
1480        r_flag = (l_ptr->owner->working_links > tipc_link_is_up(l_ptr));
1481        msg_set_redundant_link(msg, r_flag);
1482        msg_set_linkprio(msg, l_ptr->priority);
1483        msg_set_size(msg, msg_size);
1484
1485        msg_set_seqno(msg, mod(l_ptr->next_out_no + (0xffff/2)));
1486
1487        buf = tipc_buf_acquire(msg_size);
1488        if (!buf)
1489                return;
1490
1491        skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1492        buf->priority = TC_PRIO_CONTROL;
1493
1494        tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf,
1495                         &l_ptr->media_addr);
1496        l_ptr->unacked_window = 0;
1497        kfree_skb(buf);
1498}
1499
1500/*
1501 * Receive protocol message :
1502 * Note that network plane id propagates through the network, and may
1503 * change at any time. The node with lowest address rules
1504 */
1505static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
1506                                struct sk_buff *buf)
1507{
1508        u32 rec_gap = 0;
1509        u32 max_pkt_info;
1510        u32 max_pkt_ack;
1511        u32 msg_tol;
1512        struct tipc_msg *msg = buf_msg(buf);
1513
1514        /* Discard protocol message during link changeover */
1515        if (l_ptr->exp_msg_count)
1516                goto exit;
1517
1518        if (l_ptr->net_plane != msg_net_plane(msg))
1519                if (link_own_addr(l_ptr) > msg_prevnode(msg))
1520                        l_ptr->net_plane = msg_net_plane(msg);
1521
1522        switch (msg_type(msg)) {
1523
1524        case RESET_MSG:
1525                if (!link_working_unknown(l_ptr) &&
1526                    (l_ptr->peer_session != INVALID_SESSION)) {
1527                        if (less_eq(msg_session(msg), l_ptr->peer_session))
1528                                break; /* duplicate or old reset: ignore */
1529                }
1530
1531                if (!msg_redundant_link(msg) && (link_working_working(l_ptr) ||
1532                                link_working_unknown(l_ptr))) {
1533                        /*
1534                         * peer has lost contact -- don't allow peer's links
1535                         * to reactivate before we recognize loss & clean up
1536                         */
1537                        l_ptr->owner->action_flags |= TIPC_WAIT_OWN_LINKS_DOWN;
1538                }
1539
1540                link_state_event(l_ptr, RESET_MSG);
1541
1542                /* fall thru' */
1543        case ACTIVATE_MSG:
1544                /* Update link settings according other endpoint's values */
1545                strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
1546
1547                msg_tol = msg_link_tolerance(msg);
1548                if (msg_tol > l_ptr->tolerance)
1549                        link_set_supervision_props(l_ptr, msg_tol);
1550
1551                if (msg_linkprio(msg) > l_ptr->priority)
1552                        l_ptr->priority = msg_linkprio(msg);
1553
1554                max_pkt_info = msg_max_pkt(msg);
1555                if (max_pkt_info) {
1556                        if (max_pkt_info < l_ptr->max_pkt_target)
1557                                l_ptr->max_pkt_target = max_pkt_info;
1558                        if (l_ptr->max_pkt > l_ptr->max_pkt_target)
1559                                l_ptr->max_pkt = l_ptr->max_pkt_target;
1560                } else {
1561                        l_ptr->max_pkt = l_ptr->max_pkt_target;
1562                }
1563
1564                /* Synchronize broadcast link info, if not done previously */
1565                if (!tipc_node_is_up(l_ptr->owner)) {
1566                        l_ptr->owner->bclink.last_sent =
1567                                l_ptr->owner->bclink.last_in =
1568                                msg_last_bcast(msg);
1569                        l_ptr->owner->bclink.oos_state = 0;
1570                }
1571
1572                l_ptr->peer_session = msg_session(msg);
1573                l_ptr->peer_bearer_id = msg_bearer_id(msg);
1574
1575                if (msg_type(msg) == ACTIVATE_MSG)
1576                        link_state_event(l_ptr, ACTIVATE_MSG);
1577                break;
1578        case STATE_MSG:
1579
1580                msg_tol = msg_link_tolerance(msg);
1581                if (msg_tol)
1582                        link_set_supervision_props(l_ptr, msg_tol);
1583
1584                if (msg_linkprio(msg) &&
1585                    (msg_linkprio(msg) != l_ptr->priority)) {
1586                        pr_debug("%s<%s>, priority change %u->%u\n",
1587                                 link_rst_msg, l_ptr->name,
1588                                 l_ptr->priority, msg_linkprio(msg));
1589                        l_ptr->priority = msg_linkprio(msg);
1590                        tipc_link_reset(l_ptr); /* Enforce change to take effect */
1591                        break;
1592                }
1593
1594                /* Record reception; force mismatch at next timeout: */
1595                l_ptr->checkpoint--;
1596
1597                link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1598                l_ptr->stats.recv_states++;
1599                if (link_reset_unknown(l_ptr))
1600                        break;
1601
1602                if (less_eq(mod(l_ptr->next_in_no), msg_next_sent(msg))) {
1603                        rec_gap = mod(msg_next_sent(msg) -
1604                                      mod(l_ptr->next_in_no));
1605                }
1606
1607                max_pkt_ack = msg_max_pkt(msg);
1608                if (max_pkt_ack > l_ptr->max_pkt) {
1609                        l_ptr->max_pkt = max_pkt_ack;
1610                        l_ptr->max_pkt_probes = 0;
1611                }
1612
1613                max_pkt_ack = 0;
1614                if (msg_probe(msg)) {
1615                        l_ptr->stats.recv_probes++;
1616                        if (msg_size(msg) > sizeof(l_ptr->proto_msg))
1617                                max_pkt_ack = msg_size(msg);
1618                }
1619
1620                /* Protocol message before retransmits, reduce loss risk */
1621                if (l_ptr->owner->bclink.recv_permitted)
1622                        tipc_bclink_update_link_state(l_ptr->owner,
1623                                                      msg_last_bcast(msg));
1624
1625                if (rec_gap || (msg_probe(msg))) {
1626                        tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, rec_gap, 0,
1627                                             0, max_pkt_ack);
1628                }
1629                if (msg_seq_gap(msg)) {
1630                        l_ptr->stats.recv_nacks++;
1631                        tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue),
1632                                             msg_seq_gap(msg));
1633                }
1634                break;
1635        }
1636exit:
1637        kfree_skb(buf);
1638}
1639
1640
1641/* tipc_link_tunnel_xmit(): Tunnel one packet via a link belonging to
1642 * a different bearer. Owner node is locked.
1643 */
1644static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1645                                  struct tipc_msg *tunnel_hdr,
1646                                  struct tipc_msg *msg,
1647                                  u32 selector)
1648{
1649        struct tipc_link *tunnel;
1650        struct sk_buff *skb;
1651        u32 length = msg_size(msg);
1652
1653        tunnel = l_ptr->owner->active_links[selector & 1];
1654        if (!tipc_link_is_up(tunnel)) {
1655                pr_warn("%stunnel link no longer available\n", link_co_err);
1656                return;
1657        }
1658        msg_set_size(tunnel_hdr, length + INT_H_SIZE);
1659        skb = tipc_buf_acquire(length + INT_H_SIZE);
1660        if (!skb) {
1661                pr_warn("%sunable to send tunnel msg\n", link_co_err);
1662                return;
1663        }
1664        skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE);
1665        skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length);
1666        __tipc_link_xmit_skb(tunnel, skb);
1667}
1668
1669
1670/* tipc_link_failover_send_queue(): A link has gone down, but a second
1671 * link is still active. We can do failover. Tunnel the failing link's
1672 * whole send queue via the remaining link. This way, we don't lose
1673 * any packets, and sequence order is preserved for subsequent traffic
1674 * sent over the remaining link. Owner node is locked.
1675 */
1676void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1677{
1678        u32 msgcount = skb_queue_len(&l_ptr->outqueue);
1679        struct tipc_link *tunnel = l_ptr->owner->active_links[0];
1680        struct tipc_msg tunnel_hdr;
1681        struct sk_buff *skb;
1682        int split_bundles;
1683
1684        if (!tunnel)
1685                return;
1686
1687        tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
1688                      ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
1689        msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1690        msg_set_msgcnt(&tunnel_hdr, msgcount);
1691
1692        if (skb_queue_empty(&l_ptr->outqueue)) {
1693                skb = tipc_buf_acquire(INT_H_SIZE);
1694                if (skb) {
1695                        skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
1696                        msg_set_size(&tunnel_hdr, INT_H_SIZE);
1697                        __tipc_link_xmit_skb(tunnel, skb);
1698                } else {
1699                        pr_warn("%sunable to send changeover msg\n",
1700                                link_co_err);
1701                }
1702                return;
1703        }
1704
1705        split_bundles = (l_ptr->owner->active_links[0] !=
1706                         l_ptr->owner->active_links[1]);
1707
1708        skb_queue_walk(&l_ptr->outqueue, skb) {
1709                struct tipc_msg *msg = buf_msg(skb);
1710
1711                if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
1712                        struct tipc_msg *m = msg_get_wrapped(msg);
1713                        unchar *pos = (unchar *)m;
1714
1715                        msgcount = msg_msgcnt(msg);
1716                        while (msgcount--) {
1717                                msg_set_seqno(m, msg_seqno(msg));
1718                                tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, m,
1719                                                      msg_link_selector(m));
1720                                pos += align(msg_size(m));
1721                                m = (struct tipc_msg *)pos;
1722                        }
1723                } else {
1724                        tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
1725                                              msg_link_selector(msg));
1726                }
1727        }
1728}
1729
1730/* tipc_link_dup_queue_xmit(): A second link has become active. Tunnel a
1731 * duplicate of the first link's send queue via the new link. This way, we
1732 * are guaranteed that currently queued packets from a socket are delivered
1733 * before future traffic from the same socket, even if this is using the
1734 * new link. The last arriving copy of each duplicate packet is dropped at
1735 * the receiving end by the regular protocol check, so packet cardinality
1736 * and sequence order is preserved per sender/receiver socket pair.
1737 * Owner node is locked.
1738 */
1739void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1740                              struct tipc_link *tunnel)
1741{
1742        struct sk_buff *skb;
1743        struct tipc_msg tunnel_hdr;
1744
1745        tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
1746                      DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
1747        msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue));
1748        msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1749        skb_queue_walk(&l_ptr->outqueue, skb) {
1750                struct sk_buff *outskb;
1751                struct tipc_msg *msg = buf_msg(skb);
1752                u32 length = msg_size(msg);
1753
1754                if (msg_user(msg) == MSG_BUNDLER)
1755                        msg_set_type(msg, CLOSED_MSG);
1756                msg_set_ack(msg, mod(l_ptr->next_in_no - 1));   /* Update */
1757                msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1758                msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
1759                outskb = tipc_buf_acquire(length + INT_H_SIZE);
1760                if (outskb == NULL) {
1761                        pr_warn("%sunable to send duplicate msg\n",
1762                                link_co_err);
1763                        return;
1764                }
1765                skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE);
1766                skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data,
1767                                               length);
1768                __tipc_link_xmit_skb(tunnel, outskb);
1769                if (!tipc_link_is_up(l_ptr))
1770                        return;
1771        }
1772}
1773
1774/**
1775 * buf_extract - extracts embedded TIPC message from another message
1776 * @skb: encapsulating message buffer
1777 * @from_pos: offset to extract from
1778 *
1779 * Returns a new message buffer containing an embedded message.  The
1780 * encapsulating buffer is left unchanged.
1781 */
1782static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
1783{
1784        struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
1785        u32 size = msg_size(msg);
1786        struct sk_buff *eb;
1787
1788        eb = tipc_buf_acquire(size);
1789        if (eb)
1790                skb_copy_to_linear_data(eb, msg, size);
1791        return eb;
1792}
1793
1794/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
1795 * Owner node is locked.
1796 */
1797static void tipc_link_dup_rcv(struct tipc_link *l_ptr,
1798                              struct sk_buff *t_buf)
1799{
1800        struct sk_buff *buf;
1801
1802        if (!tipc_link_is_up(l_ptr))
1803                return;
1804
1805        buf = buf_extract(t_buf, INT_H_SIZE);
1806        if (buf == NULL) {
1807                pr_warn("%sfailed to extract inner dup pkt\n", link_co_err);
1808                return;
1809        }
1810
1811        /* Add buffer to deferred queue, if applicable: */
1812        link_handle_out_of_seq_msg(l_ptr, buf);
1813}
1814
1815/*  tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet
1816 *  Owner node is locked.
1817 */
1818static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
1819                                              struct sk_buff *t_buf)
1820{
1821        struct tipc_msg *t_msg = buf_msg(t_buf);
1822        struct sk_buff *buf = NULL;
1823        struct tipc_msg *msg;
1824
1825        if (tipc_link_is_up(l_ptr))
1826                tipc_link_reset(l_ptr);
1827
1828        /* First failover packet? */
1829        if (l_ptr->exp_msg_count == START_CHANGEOVER)
1830                l_ptr->exp_msg_count = msg_msgcnt(t_msg);
1831
1832        /* Should there be an inner packet? */
1833        if (l_ptr->exp_msg_count) {
1834                l_ptr->exp_msg_count--;
1835                buf = buf_extract(t_buf, INT_H_SIZE);
1836                if (buf == NULL) {
1837                        pr_warn("%sno inner failover pkt\n", link_co_err);
1838                        goto exit;
1839                }
1840                msg = buf_msg(buf);
1841
1842                if (less(msg_seqno(msg), l_ptr->reset_checkpoint)) {
1843                        kfree_skb(buf);
1844                        buf = NULL;
1845                        goto exit;
1846                }
1847                if (msg_user(msg) == MSG_FRAGMENTER) {
1848                        l_ptr->stats.recv_fragments++;
1849                        tipc_buf_append(&l_ptr->reasm_buf, &buf);
1850                }
1851        }
1852exit:
1853        if ((!l_ptr->exp_msg_count) && (l_ptr->flags & LINK_STOPPED))
1854                tipc_link_delete(l_ptr);
1855        return buf;
1856}
1857
1858/*  tipc_link_tunnel_rcv(): Receive a tunnelled packet, sent
1859 *  via other link as result of a failover (ORIGINAL_MSG) or
1860 *  a new active link (DUPLICATE_MSG). Failover packets are
1861 *  returned to the active link for delivery upwards.
1862 *  Owner node is locked.
1863 */
1864static int tipc_link_tunnel_rcv(struct tipc_node *n_ptr,
1865                                struct sk_buff **buf)
1866{
1867        struct sk_buff *t_buf = *buf;
1868        struct tipc_link *l_ptr;
1869        struct tipc_msg *t_msg = buf_msg(t_buf);
1870        u32 bearer_id = msg_bearer_id(t_msg);
1871
1872        *buf = NULL;
1873
1874        if (bearer_id >= MAX_BEARERS)
1875                goto exit;
1876
1877        l_ptr = n_ptr->links[bearer_id];
1878        if (!l_ptr)
1879                goto exit;
1880
1881        if (msg_type(t_msg) == DUPLICATE_MSG)
1882                tipc_link_dup_rcv(l_ptr, t_buf);
1883        else if (msg_type(t_msg) == ORIGINAL_MSG)
1884                *buf = tipc_link_failover_rcv(l_ptr, t_buf);
1885        else
1886                pr_warn("%sunknown tunnel pkt received\n", link_co_err);
1887exit:
1888        kfree_skb(t_buf);
1889        return *buf != NULL;
1890}
1891
1892static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
1893{
1894        unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
1895
1896        if ((tol < TIPC_MIN_LINK_TOL) || (tol > TIPC_MAX_LINK_TOL))
1897                return;
1898
1899        l_ptr->tolerance = tol;
1900        l_ptr->cont_intv = msecs_to_jiffies(intv);
1901        l_ptr->abort_limit = tol / (jiffies_to_msecs(l_ptr->cont_intv) / 4);
1902}
1903
1904void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window)
1905{
1906        /* Data messages from this node, inclusive FIRST_FRAGM */
1907        l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window;
1908        l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4;
1909        l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5;
1910        l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6;
1911        /* Transiting data messages,inclusive FIRST_FRAGM */
1912        l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300;
1913        l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600;
1914        l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
1915        l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
1916        l_ptr->queue_limit[CONN_MANAGER] = 1200;
1917        l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
1918        l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
1919        /* FRAGMENT and LAST_FRAGMENT packets */
1920        l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
1921}
1922
1923/* tipc_link_find_owner - locate owner node of link by link's name
1924 * @net: the applicable net namespace
1925 * @name: pointer to link name string
1926 * @bearer_id: pointer to index in 'node->links' array where the link was found.
1927 *
1928 * Returns pointer to node owning the link, or 0 if no matching link is found.
1929 */
1930static struct tipc_node *tipc_link_find_owner(struct net *net,
1931                                              const char *link_name,
1932                                              unsigned int *bearer_id)
1933{
1934        struct tipc_net *tn = net_generic(net, tipc_net_id);
1935        struct tipc_link *l_ptr;
1936        struct tipc_node *n_ptr;
1937        struct tipc_node *found_node = NULL;
1938        int i;
1939
1940        *bearer_id = 0;
1941        rcu_read_lock();
1942        list_for_each_entry_rcu(n_ptr, &tn->node_list, list) {
1943                tipc_node_lock(n_ptr);
1944                for (i = 0; i < MAX_BEARERS; i++) {
1945                        l_ptr = n_ptr->links[i];
1946                        if (l_ptr && !strcmp(l_ptr->name, link_name)) {
1947                                *bearer_id = i;
1948                                found_node = n_ptr;
1949                                break;
1950                        }
1951                }
1952                tipc_node_unlock(n_ptr);
1953                if (found_node)
1954                        break;
1955        }
1956        rcu_read_unlock();
1957
1958        return found_node;
1959}
1960
1961/**
1962 * link_reset_statistics - reset link statistics
1963 * @l_ptr: pointer to link
1964 */
1965static void link_reset_statistics(struct tipc_link *l_ptr)
1966{
1967        memset(&l_ptr->stats, 0, sizeof(l_ptr->stats));
1968        l_ptr->stats.sent_info = l_ptr->next_out_no;
1969        l_ptr->stats.recv_info = l_ptr->next_in_no;
1970}
1971
1972static void link_print(struct tipc_link *l_ptr, const char *str)
1973{
1974        struct tipc_net *tn = net_generic(l_ptr->owner->net, tipc_net_id);
1975        struct tipc_bearer *b_ptr;
1976
1977        rcu_read_lock();
1978        b_ptr = rcu_dereference_rtnl(tn->bearer_list[l_ptr->bearer_id]);
1979        if (b_ptr)
1980                pr_info("%s Link %x<%s>:", str, l_ptr->addr, b_ptr->name);
1981        rcu_read_unlock();
1982
1983        if (link_working_unknown(l_ptr))
1984                pr_cont(":WU\n");
1985        else if (link_reset_reset(l_ptr))
1986                pr_cont(":RR\n");
1987        else if (link_reset_unknown(l_ptr))
1988                pr_cont(":RU\n");
1989        else if (link_working_working(l_ptr))
1990                pr_cont(":WW\n");
1991        else
1992                pr_cont("\n");
1993}
1994
1995/* Parse and validate nested (link) properties valid for media, bearer and link
1996 */
1997int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[])
1998{
1999        int err;
2000
2001        err = nla_parse_nested(props, TIPC_NLA_PROP_MAX, prop,
2002                               tipc_nl_prop_policy);
2003        if (err)
2004                return err;
2005
2006        if (props[TIPC_NLA_PROP_PRIO]) {
2007                u32 prio;
2008
2009                prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
2010                if (prio > TIPC_MAX_LINK_PRI)
2011                        return -EINVAL;
2012        }
2013
2014        if (props[TIPC_NLA_PROP_TOL]) {
2015                u32 tol;
2016
2017                tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
2018                if ((tol < TIPC_MIN_LINK_TOL) || (tol > TIPC_MAX_LINK_TOL))
2019                        return -EINVAL;
2020        }
2021
2022        if (props[TIPC_NLA_PROP_WIN]) {
2023                u32 win;
2024
2025                win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
2026                if ((win < TIPC_MIN_LINK_WIN) || (win > TIPC_MAX_LINK_WIN))
2027                        return -EINVAL;
2028        }
2029
2030        return 0;
2031}
2032
2033int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
2034{
2035        int err;
2036        int res = 0;
2037        int bearer_id;
2038        char *name;
2039        struct tipc_link *link;
2040        struct tipc_node *node;
2041        struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
2042        struct net *net = sock_net(skb->sk);
2043
2044        if (!info->attrs[TIPC_NLA_LINK])
2045                return -EINVAL;
2046
2047        err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
2048                               info->attrs[TIPC_NLA_LINK],
2049                               tipc_nl_link_policy);
2050        if (err)
2051                return err;
2052
2053        if (!attrs[TIPC_NLA_LINK_NAME])
2054                return -EINVAL;
2055
2056        name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
2057
2058        node = tipc_link_find_owner(net, name, &bearer_id);
2059        if (!node)
2060                return -EINVAL;
2061
2062        tipc_node_lock(node);
2063
2064        link = node->links[bearer_id];
2065        if (!link) {
2066                res = -EINVAL;
2067                goto out;
2068        }
2069
2070        if (attrs[TIPC_NLA_LINK_PROP]) {
2071                struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
2072
2073                err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP],
2074                                              props);
2075                if (err) {
2076                        res = err;
2077                        goto out;
2078                }
2079
2080                if (props[TIPC_NLA_PROP_TOL]) {
2081                        u32 tol;
2082
2083                        tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
2084                        link_set_supervision_props(link, tol);
2085                        tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0, 0);
2086                }
2087                if (props[TIPC_NLA_PROP_PRIO]) {
2088                        u32 prio;
2089
2090                        prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
2091                        link->priority = prio;
2092                        tipc_link_proto_xmit(link, STATE_MSG, 0, 0, 0, prio, 0);
2093                }
2094                if (props[TIPC_NLA_PROP_WIN]) {
2095                        u32 win;
2096
2097                        win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
2098                        tipc_link_set_queue_limits(link, win);
2099                }
2100        }
2101
2102out:
2103        tipc_node_unlock(node);
2104
2105        return res;
2106}
2107
2108static int __tipc_nl_add_stats(struct sk_buff *skb, struct tipc_stats *s)
2109{
2110        int i;
2111        struct nlattr *stats;
2112
2113        struct nla_map {
2114                u32 key;
2115                u32 val;
2116        };
2117
2118        struct nla_map map[] = {
2119                {TIPC_NLA_STATS_RX_INFO, s->recv_info},
2120                {TIPC_NLA_STATS_RX_FRAGMENTS, s->recv_fragments},
2121                {TIPC_NLA_STATS_RX_FRAGMENTED, s->recv_fragmented},
2122                {TIPC_NLA_STATS_RX_BUNDLES, s->recv_bundles},
2123                {TIPC_NLA_STATS_RX_BUNDLED, s->recv_bundled},
2124                {TIPC_NLA_STATS_TX_INFO, s->sent_info},
2125                {TIPC_NLA_STATS_TX_FRAGMENTS, s->sent_fragments},
2126                {TIPC_NLA_STATS_TX_FRAGMENTED, s->sent_fragmented},
2127                {TIPC_NLA_STATS_TX_BUNDLES, s->sent_bundles},
2128                {TIPC_NLA_STATS_TX_BUNDLED, s->sent_bundled},
2129                {TIPC_NLA_STATS_MSG_PROF_TOT, (s->msg_length_counts) ?
2130                        s->msg_length_counts : 1},
2131                {TIPC_NLA_STATS_MSG_LEN_CNT, s->msg_length_counts},
2132                {TIPC_NLA_STATS_MSG_LEN_TOT, s->msg_lengths_total},
2133                {TIPC_NLA_STATS_MSG_LEN_P0, s->msg_length_profile[0]},
2134                {TIPC_NLA_STATS_MSG_LEN_P1, s->msg_length_profile[1]},
2135                {TIPC_NLA_STATS_MSG_LEN_P2, s->msg_length_profile[2]},
2136                {TIPC_NLA_STATS_MSG_LEN_P3, s->msg_length_profile[3]},
2137                {TIPC_NLA_STATS_MSG_LEN_P4, s->msg_length_profile[4]},
2138                {TIPC_NLA_STATS_MSG_LEN_P5, s->msg_length_profile[5]},
2139                {TIPC_NLA_STATS_MSG_LEN_P6, s->msg_length_profile[6]},
2140                {TIPC_NLA_STATS_RX_STATES, s->recv_states},
2141                {TIPC_NLA_STATS_RX_PROBES, s->recv_probes},
2142                {TIPC_NLA_STATS_RX_NACKS, s->recv_nacks},
2143                {TIPC_NLA_STATS_RX_DEFERRED, s->deferred_recv},
2144                {TIPC_NLA_STATS_TX_STATES, s->sent_states},
2145                {TIPC_NLA_STATS_TX_PROBES, s->sent_probes},
2146                {TIPC_NLA_STATS_TX_NACKS, s->sent_nacks},
2147                {TIPC_NLA_STATS_TX_ACKS, s->sent_acks},
2148                {TIPC_NLA_STATS_RETRANSMITTED, s->retransmitted},
2149                {TIPC_NLA_STATS_DUPLICATES, s->duplicates},
2150                {TIPC_NLA_STATS_LINK_CONGS, s->link_congs},
2151                {TIPC_NLA_STATS_MAX_QUEUE, s->max_queue_sz},
2152                {TIPC_NLA_STATS_AVG_QUEUE, s->queue_sz_counts ?
2153                        (s->accu_queue_sz / s->queue_sz_counts) : 0}
2154        };
2155
2156        stats = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
2157        if (!stats)
2158                return -EMSGSIZE;
2159
2160        for (i = 0; i <  ARRAY_SIZE(map); i++)
2161                if (nla_put_u32(skb, map[i].key, map[i].val))
2162                        goto msg_full;
2163
2164        nla_nest_end(skb, stats);
2165
2166        return 0;
2167msg_full:
2168        nla_nest_cancel(skb, stats);
2169
2170        return -EMSGSIZE;
2171}
2172
2173/* Caller should hold appropriate locks to protect the link */
2174static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg,
2175                              struct tipc_link *link)
2176{
2177        int err;
2178        void *hdr;
2179        struct nlattr *attrs;
2180        struct nlattr *prop;
2181        struct tipc_net *tn = net_generic(net, tipc_net_id);
2182
2183        hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
2184                          NLM_F_MULTI, TIPC_NL_LINK_GET);
2185        if (!hdr)
2186                return -EMSGSIZE;
2187
2188        attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
2189        if (!attrs)
2190                goto msg_full;
2191
2192        if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, link->name))
2193                goto attr_msg_full;
2194        if (nla_put_u32(msg->skb, TIPC_NLA_LINK_DEST,
2195                        tipc_cluster_mask(tn->own_addr)))
2196                goto attr_msg_full;
2197        if (nla_put_u32(msg->skb, TIPC_NLA_LINK_MTU, link->max_pkt))
2198                goto attr_msg_full;
2199        if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, link->next_in_no))
2200                goto attr_msg_full;
2201        if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, link->next_out_no))
2202                goto attr_msg_full;
2203
2204        if (tipc_link_is_up(link))
2205                if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
2206                        goto attr_msg_full;
2207        if (tipc_link_is_active(link))
2208                if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE))
2209                        goto attr_msg_full;
2210
2211        prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
2212        if (!prop)
2213                goto attr_msg_full;
2214        if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
2215                goto prop_msg_full;
2216        if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, link->tolerance))
2217                goto prop_msg_full;
2218        if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN,
2219                        link->queue_limit[TIPC_LOW_IMPORTANCE]))
2220                goto prop_msg_full;
2221        if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
2222                goto prop_msg_full;
2223        nla_nest_end(msg->skb, prop);
2224
2225        err = __tipc_nl_add_stats(msg->skb, &link->stats);
2226        if (err)
2227                goto attr_msg_full;
2228
2229        nla_nest_end(msg->skb, attrs);
2230        genlmsg_end(msg->skb, hdr);
2231
2232        return 0;
2233
2234prop_msg_full:
2235        nla_nest_cancel(msg->skb, prop);
2236attr_msg_full:
2237        nla_nest_cancel(msg->skb, attrs);
2238msg_full:
2239        genlmsg_cancel(msg->skb, hdr);
2240
2241        return -EMSGSIZE;
2242}
2243
2244/* Caller should hold node lock  */
2245static int __tipc_nl_add_node_links(struct net *net, struct tipc_nl_msg *msg,
2246                                    struct tipc_node *node, u32 *prev_link)
2247{
2248        u32 i;
2249        int err;
2250
2251        for (i = *prev_link; i < MAX_BEARERS; i++) {
2252                *prev_link = i;
2253
2254                if (!node->links[i])
2255                        continue;
2256
2257                err = __tipc_nl_add_link(net, msg, node->links[i]);
2258                if (err)
2259                        return err;
2260        }
2261        *prev_link = 0;
2262
2263        return 0;
2264}
2265
2266int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)
2267{
2268        struct net *net = sock_net(skb->sk);
2269        struct tipc_net *tn = net_generic(net, tipc_net_id);
2270        struct tipc_node *node;
2271        struct tipc_nl_msg msg;
2272        u32 prev_node = cb->args[0];
2273        u32 prev_link = cb->args[1];
2274        int done = cb->args[2];
2275        int err;
2276
2277        if (done)
2278                return 0;
2279
2280        msg.skb = skb;
2281        msg.portid = NETLINK_CB(cb->skb).portid;
2282        msg.seq = cb->nlh->nlmsg_seq;
2283
2284        rcu_read_lock();
2285
2286        if (prev_node) {
2287                node = tipc_node_find(net, prev_node);
2288                if (!node) {
2289                        /* We never set seq or call nl_dump_check_consistent()
2290                         * this means that setting prev_seq here will cause the
2291                         * consistence check to fail in the netlink callback
2292                         * handler. Resulting in the last NLMSG_DONE message
2293                         * having the NLM_F_DUMP_INTR flag set.
2294                         */
2295                        cb->prev_seq = 1;
2296                        goto out;
2297                }
2298
2299                list_for_each_entry_continue_rcu(node, &tn->node_list,
2300                                                 list) {
2301                        tipc_node_lock(node);
2302                        err = __tipc_nl_add_node_links(net, &msg, node,
2303                                                       &prev_link);
2304                        tipc_node_unlock(node);
2305                        if (err)
2306                                goto out;
2307
2308                        prev_node = node->addr;
2309                }
2310        } else {
2311                err = tipc_nl_add_bc_link(net, &msg);
2312                if (err)
2313                        goto out;
2314
2315                list_for_each_entry_rcu(node, &tn->node_list, list) {
2316                        tipc_node_lock(node);
2317                        err = __tipc_nl_add_node_links(net, &msg, node,
2318                                                       &prev_link);
2319                        tipc_node_unlock(node);
2320                        if (err)
2321                                goto out;
2322
2323                        prev_node = node->addr;
2324                }
2325        }
2326        done = 1;
2327out:
2328        rcu_read_unlock();
2329
2330        cb->args[0] = prev_node;
2331        cb->args[1] = prev_link;
2332        cb->args[2] = done;
2333
2334        return skb->len;
2335}
2336
2337int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info)
2338{
2339        struct net *net = genl_info_net(info);
2340        struct sk_buff *ans_skb;
2341        struct tipc_nl_msg msg;
2342        struct tipc_link *link;
2343        struct tipc_node *node;
2344        char *name;
2345        int bearer_id;
2346        int err;
2347
2348        if (!info->attrs[TIPC_NLA_LINK_NAME])
2349                return -EINVAL;
2350
2351        name = nla_data(info->attrs[TIPC_NLA_LINK_NAME]);
2352        node = tipc_link_find_owner(net, name, &bearer_id);
2353        if (!node)
2354                return -EINVAL;
2355
2356        ans_skb = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL);
2357        if (!ans_skb)
2358                return -ENOMEM;
2359
2360        msg.skb = ans_skb;
2361        msg.portid = info->snd_portid;
2362        msg.seq = info->snd_seq;
2363
2364        tipc_node_lock(node);
2365        link = node->links[bearer_id];
2366        if (!link) {
2367                err = -EINVAL;
2368                goto err_out;
2369        }
2370
2371        err = __tipc_nl_add_link(net, &msg, link);
2372        if (err)
2373                goto err_out;
2374
2375        tipc_node_unlock(node);
2376
2377        return genlmsg_reply(ans_skb, info);
2378
2379err_out:
2380        tipc_node_unlock(node);
2381        nlmsg_free(ans_skb);
2382
2383        return err;
2384}
2385
2386int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
2387{
2388        int err;
2389        char *link_name;
2390        unsigned int bearer_id;
2391        struct tipc_link *link;
2392        struct tipc_node *node;
2393        struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
2394        struct net *net = sock_net(skb->sk);
2395
2396        if (!info->attrs[TIPC_NLA_LINK])
2397                return -EINVAL;
2398
2399        err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
2400                               info->attrs[TIPC_NLA_LINK],
2401                               tipc_nl_link_policy);
2402        if (err)
2403                return err;
2404
2405        if (!attrs[TIPC_NLA_LINK_NAME])
2406                return -EINVAL;
2407
2408        link_name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
2409
2410        if (strcmp(link_name, tipc_bclink_name) == 0) {
2411                err = tipc_bclink_reset_stats(net);
2412                if (err)
2413                        return err;
2414                return 0;
2415        }
2416
2417        node = tipc_link_find_owner(net, link_name, &bearer_id);
2418        if (!node)
2419                return -EINVAL;
2420
2421        tipc_node_lock(node);
2422
2423        link = node->links[bearer_id];
2424        if (!link) {
2425                tipc_node_unlock(node);
2426                return -EINVAL;
2427        }
2428
2429        link_reset_statistics(link);
2430
2431        tipc_node_unlock(node);
2432
2433        return 0;
2434}
2435