linux/net/tipc/socket.c
<<
>>
Prefs
   1/*
   2 * net/tipc/socket.c: TIPC socket API
   3 *
   4 * Copyright (c) 2001-2007, 2012-2019, Ericsson AB
   5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
   6 * Copyright (c) 2020, Red Hat Inc
   7 * All rights reserved.
   8 *
   9 * Redistribution and use in source and binary forms, with or without
  10 * modification, are permitted provided that the following conditions are met:
  11 *
  12 * 1. Redistributions of source code must retain the above copyright
  13 *    notice, this list of conditions and the following disclaimer.
  14 * 2. Redistributions in binary form must reproduce the above copyright
  15 *    notice, this list of conditions and the following disclaimer in the
  16 *    documentation and/or other materials provided with the distribution.
  17 * 3. Neither the names of the copyright holders nor the names of its
  18 *    contributors may be used to endorse or promote products derived from
  19 *    this software without specific prior written permission.
  20 *
  21 * Alternatively, this software may be distributed under the terms of the
  22 * GNU General Public License ("GPL") version 2 as published by the Free
  23 * Software Foundation.
  24 *
  25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  35 * POSSIBILITY OF SUCH DAMAGE.
  36 */
  37
  38#include <linux/rhashtable.h>
  39#include <linux/sched/signal.h>
  40
  41#include "core.h"
  42#include "name_table.h"
  43#include "node.h"
  44#include "link.h"
  45#include "name_distr.h"
  46#include "socket.h"
  47#include "bcast.h"
  48#include "netlink.h"
  49#include "group.h"
  50#include "trace.h"
  51
  52#define NAGLE_START_INIT        4
  53#define NAGLE_START_MAX         1024
  54#define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
  55#define CONN_PROBING_INTV       msecs_to_jiffies(3600000)  /* [ms] => 1 h */
  56#define TIPC_MAX_PORT           0xffffffff
  57#define TIPC_MIN_PORT           1
  58#define TIPC_ACK_RATE           4       /* ACK at 1/4 of rcv window size */
  59
  60enum {
  61        TIPC_LISTEN = TCP_LISTEN,
  62        TIPC_ESTABLISHED = TCP_ESTABLISHED,
  63        TIPC_OPEN = TCP_CLOSE,
  64        TIPC_DISCONNECTING = TCP_CLOSE_WAIT,
  65        TIPC_CONNECTING = TCP_SYN_SENT,
  66};
  67
  68struct sockaddr_pair {
  69        struct sockaddr_tipc sock;
  70        struct sockaddr_tipc member;
  71};
  72
  73/**
  74 * struct tipc_sock - TIPC socket structure
  75 * @sk: socket - interacts with 'port' and with user via the socket API
  76 * @conn_type: TIPC type used when connection was established
  77 * @conn_instance: TIPC instance used when connection was established
  78 * @published: non-zero if port has one or more associated names
  79 * @max_pkt: maximum packet size "hint" used when building messages sent by port
  80 * @maxnagle: maximum size of msg which can be subject to nagle
  81 * @portid: unique port identity in TIPC socket hash table
  82 * @phdr: preformatted message header used when sending messages
  83 * @cong_links: list of congested links
  84 * @publications: list of publications for port
  85 * @blocking_link: address of the congested link we are currently sleeping on
  86 * @pub_count: total # of publications port has made during its lifetime
  87 * @conn_timeout: the time we can wait for an unresponded setup request
  88 * @probe_unacked: probe has not received ack yet
  89 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
  90 * @cong_link_cnt: number of congested links
  91 * @snt_unacked: # messages sent by socket, and not yet acked by peer
  92 * @snd_win: send window size
  93 * @peer_caps: peer capabilities mask
  94 * @rcv_unacked: # messages read by user, but not yet acked back to peer
  95 * @rcv_win: receive window size
  96 * @peer: 'connected' peer for dgram/rdm
  97 * @node: hash table node
  98 * @mc_method: cookie for use between socket and broadcast layer
  99 * @rcu: rcu struct for tipc_sock
 100 * @group: TIPC communications group
 101 * @oneway: message count in one direction (FIXME)
 102 * @nagle_start: current nagle value
 103 * @snd_backlog: send backlog count
 104 * @msg_acc: messages accepted; used in managing backlog and nagle
 105 * @pkt_cnt: TIPC socket packet count
 106 * @expect_ack: whether this TIPC socket is expecting an ack
 107 * @nodelay: setsockopt() TIPC_NODELAY setting
 108 * @group_is_open: TIPC socket group is fully open (FIXME)
 109 */
 110struct tipc_sock {
 111        struct sock sk;
 112        u32 conn_type;
 113        u32 conn_instance;
 114        int published;
 115        u32 max_pkt;
 116        u32 maxnagle;
 117        u32 portid;
 118        struct tipc_msg phdr;
 119        struct list_head cong_links;
 120        struct list_head publications;
 121        u32 pub_count;
 122        atomic_t dupl_rcvcnt;
 123        u16 conn_timeout;
 124        bool probe_unacked;
 125        u16 cong_link_cnt;
 126        u16 snt_unacked;
 127        u16 snd_win;
 128        u16 peer_caps;
 129        u16 rcv_unacked;
 130        u16 rcv_win;
 131        struct sockaddr_tipc peer;
 132        struct rhash_head node;
 133        struct tipc_mc_method mc_method;
 134        struct rcu_head rcu;
 135        struct tipc_group *group;
 136        u32 oneway;
 137        u32 nagle_start;
 138        u16 snd_backlog;
 139        u16 msg_acc;
 140        u16 pkt_cnt;
 141        bool expect_ack;
 142        bool nodelay;
 143        bool group_is_open;
 144};
 145
 146static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb);
 147static void tipc_data_ready(struct sock *sk);
 148static void tipc_write_space(struct sock *sk);
 149static void tipc_sock_destruct(struct sock *sk);
 150static int tipc_release(struct socket *sock);
 151static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags,
 152                       bool kern);
 153static void tipc_sk_timeout(struct timer_list *t);
 154static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
 155                           struct tipc_service_range const *seq);
 156static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
 157                            struct tipc_service_range const *seq);
 158static int tipc_sk_leave(struct tipc_sock *tsk);
 159static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
 160static int tipc_sk_insert(struct tipc_sock *tsk);
 161static void tipc_sk_remove(struct tipc_sock *tsk);
 162static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
 163static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
 164static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack);
 165
 166static const struct proto_ops packet_ops;
 167static const struct proto_ops stream_ops;
 168static const struct proto_ops msg_ops;
 169static struct proto tipc_proto;
 170static const struct rhashtable_params tsk_rht_params;
 171
 172static u32 tsk_own_node(struct tipc_sock *tsk)
 173{
 174        return msg_prevnode(&tsk->phdr);
 175}
 176
 177static u32 tsk_peer_node(struct tipc_sock *tsk)
 178{
 179        return msg_destnode(&tsk->phdr);
 180}
 181
 182static u32 tsk_peer_port(struct tipc_sock *tsk)
 183{
 184        return msg_destport(&tsk->phdr);
 185}
 186
 187static  bool tsk_unreliable(struct tipc_sock *tsk)
 188{
 189        return msg_src_droppable(&tsk->phdr) != 0;
 190}
 191
 192static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
 193{
 194        msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
 195}
 196
 197static bool tsk_unreturnable(struct tipc_sock *tsk)
 198{
 199        return msg_dest_droppable(&tsk->phdr) != 0;
 200}
 201
 202static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
 203{
 204        msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
 205}
 206
 207static int tsk_importance(struct tipc_sock *tsk)
 208{
 209        return msg_importance(&tsk->phdr);
 210}
 211
 212static struct tipc_sock *tipc_sk(const struct sock *sk)
 213{
 214        return container_of(sk, struct tipc_sock, sk);
 215}
 216
 217int tsk_set_importance(struct sock *sk, int imp)
 218{
 219        if (imp > TIPC_CRITICAL_IMPORTANCE)
 220                return -EINVAL;
 221        msg_set_importance(&tipc_sk(sk)->phdr, (u32)imp);
 222        return 0;
 223}
 224
 225static bool tsk_conn_cong(struct tipc_sock *tsk)
 226{
 227        return tsk->snt_unacked > tsk->snd_win;
 228}
 229
 230static u16 tsk_blocks(int len)
 231{
 232        return ((len / FLOWCTL_BLK_SZ) + 1);
 233}
 234
 235/* tsk_blocks(): translate a buffer size in bytes to number of
 236 * advertisable blocks, taking into account the ratio truesize(len)/len
 237 * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
 238 */
 239static u16 tsk_adv_blocks(int len)
 240{
 241        return len / FLOWCTL_BLK_SZ / 4;
 242}
 243
 244/* tsk_inc(): increment counter for sent or received data
 245 * - If block based flow control is not supported by peer we
 246 *   fall back to message based ditto, incrementing the counter
 247 */
 248static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
 249{
 250        if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
 251                return ((msglen / FLOWCTL_BLK_SZ) + 1);
 252        return 1;
 253}
 254
 255/* tsk_set_nagle - enable/disable nagle property by manipulating maxnagle
 256 */
 257static void tsk_set_nagle(struct tipc_sock *tsk)
 258{
 259        struct sock *sk = &tsk->sk;
 260
 261        tsk->maxnagle = 0;
 262        if (sk->sk_type != SOCK_STREAM)
 263                return;
 264        if (tsk->nodelay)
 265                return;
 266        if (!(tsk->peer_caps & TIPC_NAGLE))
 267                return;
 268        /* Limit node local buffer size to avoid receive queue overflow */
 269        if (tsk->max_pkt == MAX_MSG_SIZE)
 270                tsk->maxnagle = 1500;
 271        else
 272                tsk->maxnagle = tsk->max_pkt;
 273}
 274
 275/**
 276 * tsk_advance_rx_queue - discard first buffer in socket receive queue
 277 * @sk: network socket
 278 *
 279 * Caller must hold socket lock
 280 */
 281static void tsk_advance_rx_queue(struct sock *sk)
 282{
 283        trace_tipc_sk_advance_rx(sk, NULL, TIPC_DUMP_SK_RCVQ, " ");
 284        kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
 285}
 286
 287/* tipc_sk_respond() : send response message back to sender
 288 */
 289static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
 290{
 291        u32 selector;
 292        u32 dnode;
 293        u32 onode = tipc_own_addr(sock_net(sk));
 294
 295        if (!tipc_msg_reverse(onode, &skb, err))
 296                return;
 297
 298        trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE, "@sk_respond!");
 299        dnode = msg_destnode(buf_msg(skb));
 300        selector = msg_origport(buf_msg(skb));
 301        tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
 302}
 303
 304/**
 305 * tsk_rej_rx_queue - reject all buffers in socket receive queue
 306 * @sk: network socket
 307 * @error: response error code
 308 *
 309 * Caller must hold socket lock
 310 */
 311static void tsk_rej_rx_queue(struct sock *sk, int error)
 312{
 313        struct sk_buff *skb;
 314
 315        while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
 316                tipc_sk_respond(sk, skb, error);
 317}
 318
 319static bool tipc_sk_connected(struct sock *sk)
 320{
 321        return sk->sk_state == TIPC_ESTABLISHED;
 322}
 323
 324/* tipc_sk_type_connectionless - check if the socket is datagram socket
 325 * @sk: socket
 326 *
 327 * Returns true if connection less, false otherwise
 328 */
 329static bool tipc_sk_type_connectionless(struct sock *sk)
 330{
 331        return sk->sk_type == SOCK_RDM || sk->sk_type == SOCK_DGRAM;
 332}
 333
 334/* tsk_peer_msg - verify if message was sent by connected port's peer
 335 *
 336 * Handles cases where the node's network address has changed from
 337 * the default of <0.0.0> to its configured setting.
 338 */
 339static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
 340{
 341        struct sock *sk = &tsk->sk;
 342        u32 self = tipc_own_addr(sock_net(sk));
 343        u32 peer_port = tsk_peer_port(tsk);
 344        u32 orig_node, peer_node;
 345
 346        if (unlikely(!tipc_sk_connected(sk)))
 347                return false;
 348
 349        if (unlikely(msg_origport(msg) != peer_port))
 350                return false;
 351
 352        orig_node = msg_orignode(msg);
 353        peer_node = tsk_peer_node(tsk);
 354
 355        if (likely(orig_node == peer_node))
 356                return true;
 357
 358        if (!orig_node && peer_node == self)
 359                return true;
 360
 361        if (!peer_node && orig_node == self)
 362                return true;
 363
 364        return false;
 365}
 366
 367/* tipc_set_sk_state - set the sk_state of the socket
 368 * @sk: socket
 369 *
 370 * Caller must hold socket lock
 371 *
 372 * Returns 0 on success, errno otherwise
 373 */
 374static int tipc_set_sk_state(struct sock *sk, int state)
 375{
 376        int oldsk_state = sk->sk_state;
 377        int res = -EINVAL;
 378
 379        switch (state) {
 380        case TIPC_OPEN:
 381                res = 0;
 382                break;
 383        case TIPC_LISTEN:
 384        case TIPC_CONNECTING:
 385                if (oldsk_state == TIPC_OPEN)
 386                        res = 0;
 387                break;
 388        case TIPC_ESTABLISHED:
 389                if (oldsk_state == TIPC_CONNECTING ||
 390                    oldsk_state == TIPC_OPEN)
 391                        res = 0;
 392                break;
 393        case TIPC_DISCONNECTING:
 394                if (oldsk_state == TIPC_CONNECTING ||
 395                    oldsk_state == TIPC_ESTABLISHED)
 396                        res = 0;
 397                break;
 398        }
 399
 400        if (!res)
 401                sk->sk_state = state;
 402
 403        return res;
 404}
 405
 406static int tipc_sk_sock_err(struct socket *sock, long *timeout)
 407{
 408        struct sock *sk = sock->sk;
 409        int err = sock_error(sk);
 410        int typ = sock->type;
 411
 412        if (err)
 413                return err;
 414        if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) {
 415                if (sk->sk_state == TIPC_DISCONNECTING)
 416                        return -EPIPE;
 417                else if (!tipc_sk_connected(sk))
 418                        return -ENOTCONN;
 419        }
 420        if (!*timeout)
 421                return -EAGAIN;
 422        if (signal_pending(current))
 423                return sock_intr_errno(*timeout);
 424
 425        return 0;
 426}
 427
 428#define tipc_wait_for_cond(sock_, timeo_, condition_)                          \
 429({                                                                             \
 430        DEFINE_WAIT_FUNC(wait_, woken_wake_function);                          \
 431        struct sock *sk_;                                                      \
 432        int rc_;                                                               \
 433                                                                               \
 434        while ((rc_ = !(condition_))) {                                        \
 435                /* coupled with smp_wmb() in tipc_sk_proto_rcv() */            \
 436                smp_rmb();                                                     \
 437                sk_ = (sock_)->sk;                                             \
 438                rc_ = tipc_sk_sock_err((sock_), timeo_);                       \
 439                if (rc_)                                                       \
 440                        break;                                                 \
 441                add_wait_queue(sk_sleep(sk_), &wait_);                         \
 442                release_sock(sk_);                                             \
 443                *(timeo_) = wait_woken(&wait_, TASK_INTERRUPTIBLE, *(timeo_)); \
 444                sched_annotate_sleep();                                        \
 445                lock_sock(sk_);                                                \
 446                remove_wait_queue(sk_sleep(sk_), &wait_);                      \
 447        }                                                                      \
 448        rc_;                                                                   \
 449})
 450
 451/**
 452 * tipc_sk_create - create a TIPC socket
 453 * @net: network namespace (must be default network)
 454 * @sock: pre-allocated socket structure
 455 * @protocol: protocol indicator (must be 0)
 456 * @kern: caused by kernel or by userspace?
 457 *
 458 * This routine creates additional data structures used by the TIPC socket,
 459 * initializes them, and links them together.
 460 *
 461 * Return: 0 on success, errno otherwise
 462 */
 463static int tipc_sk_create(struct net *net, struct socket *sock,
 464                          int protocol, int kern)
 465{
 466        const struct proto_ops *ops;
 467        struct sock *sk;
 468        struct tipc_sock *tsk;
 469        struct tipc_msg *msg;
 470
 471        /* Validate arguments */
 472        if (unlikely(protocol != 0))
 473                return -EPROTONOSUPPORT;
 474
 475        switch (sock->type) {
 476        case SOCK_STREAM:
 477                ops = &stream_ops;
 478                break;
 479        case SOCK_SEQPACKET:
 480                ops = &packet_ops;
 481                break;
 482        case SOCK_DGRAM:
 483        case SOCK_RDM:
 484                ops = &msg_ops;
 485                break;
 486        default:
 487                return -EPROTOTYPE;
 488        }
 489
 490        /* Allocate socket's protocol area */
 491        sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto, kern);
 492        if (sk == NULL)
 493                return -ENOMEM;
 494
 495        tsk = tipc_sk(sk);
 496        tsk->max_pkt = MAX_PKT_DEFAULT;
 497        tsk->maxnagle = 0;
 498        tsk->nagle_start = NAGLE_START_INIT;
 499        INIT_LIST_HEAD(&tsk->publications);
 500        INIT_LIST_HEAD(&tsk->cong_links);
 501        msg = &tsk->phdr;
 502
 503        /* Finish initializing socket data structures */
 504        sock->ops = ops;
 505        sock_init_data(sock, sk);
 506        tipc_set_sk_state(sk, TIPC_OPEN);
 507        if (tipc_sk_insert(tsk)) {
 508                pr_warn("Socket create failed; port number exhausted\n");
 509                return -EINVAL;
 510        }
 511
 512        /* Ensure tsk is visible before we read own_addr. */
 513        smp_mb();
 514
 515        tipc_msg_init(tipc_own_addr(net), msg, TIPC_LOW_IMPORTANCE,
 516                      TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
 517
 518        msg_set_origport(msg, tsk->portid);
 519        timer_setup(&sk->sk_timer, tipc_sk_timeout, 0);
 520        sk->sk_shutdown = 0;
 521        sk->sk_backlog_rcv = tipc_sk_backlog_rcv;
 522        sk->sk_rcvbuf = sysctl_tipc_rmem[1];
 523        sk->sk_data_ready = tipc_data_ready;
 524        sk->sk_write_space = tipc_write_space;
 525        sk->sk_destruct = tipc_sock_destruct;
 526        tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
 527        tsk->group_is_open = true;
 528        atomic_set(&tsk->dupl_rcvcnt, 0);
 529
 530        /* Start out with safe limits until we receive an advertised window */
 531        tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
 532        tsk->rcv_win = tsk->snd_win;
 533
 534        if (tipc_sk_type_connectionless(sk)) {
 535                tsk_set_unreturnable(tsk, true);
 536                if (sock->type == SOCK_DGRAM)
 537                        tsk_set_unreliable(tsk, true);
 538        }
 539        __skb_queue_head_init(&tsk->mc_method.deferredq);
 540        trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
 541        return 0;
 542}
 543
 544static void tipc_sk_callback(struct rcu_head *head)
 545{
 546        struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
 547
 548        sock_put(&tsk->sk);
 549}
 550
 551/* Caller should hold socket lock for the socket. */
 552static void __tipc_shutdown(struct socket *sock, int error)
 553{
 554        struct sock *sk = sock->sk;
 555        struct tipc_sock *tsk = tipc_sk(sk);
 556        struct net *net = sock_net(sk);
 557        long timeout = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT);
 558        u32 dnode = tsk_peer_node(tsk);
 559        struct sk_buff *skb;
 560
 561        /* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
 562        tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
 563                                            !tsk_conn_cong(tsk)));
 564
 565        /* Push out delayed messages if in Nagle mode */
 566        tipc_sk_push_backlog(tsk, false);
 567        /* Remove pending SYN */
 568        __skb_queue_purge(&sk->sk_write_queue);
 569
 570        /* Remove partially received buffer if any */
 571        skb = skb_peek(&sk->sk_receive_queue);
 572        if (skb && TIPC_SKB_CB(skb)->bytes_read) {
 573                __skb_unlink(skb, &sk->sk_receive_queue);
 574                kfree_skb(skb);
 575        }
 576
 577        /* Reject all unreceived messages if connectionless */
 578        if (tipc_sk_type_connectionless(sk)) {
 579                tsk_rej_rx_queue(sk, error);
 580                return;
 581        }
 582
 583        switch (sk->sk_state) {
 584        case TIPC_CONNECTING:
 585        case TIPC_ESTABLISHED:
 586                tipc_set_sk_state(sk, TIPC_DISCONNECTING);
 587                tipc_node_remove_conn(net, dnode, tsk->portid);
 588                /* Send a FIN+/- to its peer */
 589                skb = __skb_dequeue(&sk->sk_receive_queue);
 590                if (skb) {
 591                        __skb_queue_purge(&sk->sk_receive_queue);
 592                        tipc_sk_respond(sk, skb, error);
 593                        break;
 594                }
 595                skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
 596                                      TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
 597                                      tsk_own_node(tsk), tsk_peer_port(tsk),
 598                                      tsk->portid, error);
 599                if (skb)
 600                        tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
 601                break;
 602        case TIPC_LISTEN:
 603                /* Reject all SYN messages */
 604                tsk_rej_rx_queue(sk, error);
 605                break;
 606        default:
 607                __skb_queue_purge(&sk->sk_receive_queue);
 608                break;
 609        }
 610}
 611
 612/**
 613 * tipc_release - destroy a TIPC socket
 614 * @sock: socket to destroy
 615 *
 616 * This routine cleans up any messages that are still queued on the socket.
 617 * For DGRAM and RDM socket types, all queued messages are rejected.
 618 * For SEQPACKET and STREAM socket types, the first message is rejected
 619 * and any others are discarded.  (If the first message on a STREAM socket
 620 * is partially-read, it is discarded and the next one is rejected instead.)
 621 *
 622 * NOTE: Rejected messages are not necessarily returned to the sender!  They
 623 * are returned or discarded according to the "destination droppable" setting
 624 * specified for the message by the sender.
 625 *
 626 * Return: 0 on success, errno otherwise
 627 */
 628static int tipc_release(struct socket *sock)
 629{
 630        struct sock *sk = sock->sk;
 631        struct tipc_sock *tsk;
 632
 633        /*
 634         * Exit if socket isn't fully initialized (occurs when a failed accept()
 635         * releases a pre-allocated child socket that was never used)
 636         */
 637        if (sk == NULL)
 638                return 0;
 639
 640        tsk = tipc_sk(sk);
 641        lock_sock(sk);
 642
 643        trace_tipc_sk_release(sk, NULL, TIPC_DUMP_ALL, " ");
 644        __tipc_shutdown(sock, TIPC_ERR_NO_PORT);
 645        sk->sk_shutdown = SHUTDOWN_MASK;
 646        tipc_sk_leave(tsk);
 647        tipc_sk_withdraw(tsk, 0, NULL);
 648        __skb_queue_purge(&tsk->mc_method.deferredq);
 649        sk_stop_timer(sk, &sk->sk_timer);
 650        tipc_sk_remove(tsk);
 651
 652        sock_orphan(sk);
 653        /* Reject any messages that accumulated in backlog queue */
 654        release_sock(sk);
 655        tipc_dest_list_purge(&tsk->cong_links);
 656        tsk->cong_link_cnt = 0;
 657        call_rcu(&tsk->rcu, tipc_sk_callback);
 658        sock->sk = NULL;
 659
 660        return 0;
 661}
 662
 663/**
 664 * __tipc_bind - associate or disassocate TIPC name(s) with a socket
 665 * @sock: socket structure
 666 * @skaddr: socket address describing name(s) and desired operation
 667 * @alen: size of socket address data structure
 668 *
 669 * Name and name sequence binding is indicated using a positive scope value;
 670 * a negative scope value unbinds the specified name.  Specifying no name
 671 * (i.e. a socket address length of 0) unbinds all names from the socket.
 672 *
 673 * Return: 0 on success, errno otherwise
 674 *
 675 * NOTE: This routine doesn't need to take the socket lock since it doesn't
 676 *       access any non-constant socket information.
 677 */
 678static int __tipc_bind(struct socket *sock, struct sockaddr *skaddr, int alen)
 679{
 680        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)skaddr;
 681        struct tipc_sock *tsk = tipc_sk(sock->sk);
 682
 683        if (unlikely(!alen))
 684                return tipc_sk_withdraw(tsk, 0, NULL);
 685
 686        if (addr->addrtype == TIPC_SERVICE_ADDR)
 687                addr->addr.nameseq.upper = addr->addr.nameseq.lower;
 688
 689        if (tsk->group)
 690                return -EACCES;
 691
 692        if (addr->scope >= 0)
 693                return tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq);
 694        else
 695                return tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
 696}
 697
 698int tipc_sk_bind(struct socket *sock, struct sockaddr *skaddr, int alen)
 699{
 700        int res;
 701
 702        lock_sock(sock->sk);
 703        res = __tipc_bind(sock, skaddr, alen);
 704        release_sock(sock->sk);
 705        return res;
 706}
 707
 708static int tipc_bind(struct socket *sock, struct sockaddr *skaddr, int alen)
 709{
 710        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)skaddr;
 711
 712        if (alen) {
 713                if (alen < sizeof(struct sockaddr_tipc))
 714                        return -EINVAL;
 715                if (addr->family != AF_TIPC)
 716                        return -EAFNOSUPPORT;
 717                if (addr->addrtype > TIPC_SERVICE_ADDR)
 718                        return -EAFNOSUPPORT;
 719                if (addr->addr.nameseq.type < TIPC_RESERVED_TYPES) {
 720                        pr_warn_once("Can't bind to reserved service type %u\n",
 721                                     addr->addr.nameseq.type);
 722                        return -EACCES;
 723                }
 724        }
 725        return tipc_sk_bind(sock, skaddr, alen);
 726}
 727
 728/**
 729 * tipc_getname - get port ID of socket or peer socket
 730 * @sock: socket structure
 731 * @uaddr: area for returned socket address
 732 * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
 733 *
 734 * Return: 0 on success, errno otherwise
 735 *
 736 * NOTE: This routine doesn't need to take the socket lock since it only
 737 *       accesses socket information that is unchanging (or which changes in
 738 *       a completely predictable manner).
 739 */
 740static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
 741                        int peer)
 742{
 743        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
 744        struct sock *sk = sock->sk;
 745        struct tipc_sock *tsk = tipc_sk(sk);
 746
 747        memset(addr, 0, sizeof(*addr));
 748        if (peer) {
 749                if ((!tipc_sk_connected(sk)) &&
 750                    ((peer != 2) || (sk->sk_state != TIPC_DISCONNECTING)))
 751                        return -ENOTCONN;
 752                addr->addr.id.ref = tsk_peer_port(tsk);
 753                addr->addr.id.node = tsk_peer_node(tsk);
 754        } else {
 755                addr->addr.id.ref = tsk->portid;
 756                addr->addr.id.node = tipc_own_addr(sock_net(sk));
 757        }
 758
 759        addr->addrtype = TIPC_SOCKET_ADDR;
 760        addr->family = AF_TIPC;
 761        addr->scope = 0;
 762        addr->addr.name.domain = 0;
 763
 764        return sizeof(*addr);
 765}
 766
 767/**
 768 * tipc_poll - read and possibly block on pollmask
 769 * @file: file structure associated with the socket
 770 * @sock: socket for which to calculate the poll bits
 771 * @wait: ???
 772 *
 773 * Return: pollmask value
 774 *
 775 * COMMENTARY:
 776 * It appears that the usual socket locking mechanisms are not useful here
 777 * since the pollmask info is potentially out-of-date the moment this routine
 778 * exits.  TCP and other protocols seem to rely on higher level poll routines
 779 * to handle any preventable race conditions, so TIPC will do the same ...
 780 *
 781 * IMPORTANT: The fact that a read or write operation is indicated does NOT
 782 * imply that the operation will succeed, merely that it should be performed
 783 * and will not block.
 784 */
 785static __poll_t tipc_poll(struct file *file, struct socket *sock,
 786                              poll_table *wait)
 787{
 788        struct sock *sk = sock->sk;
 789        struct tipc_sock *tsk = tipc_sk(sk);
 790        __poll_t revents = 0;
 791
 792        sock_poll_wait(file, sock, wait);
 793        trace_tipc_sk_poll(sk, NULL, TIPC_DUMP_ALL, " ");
 794
 795        if (sk->sk_shutdown & RCV_SHUTDOWN)
 796                revents |= EPOLLRDHUP | EPOLLIN | EPOLLRDNORM;
 797        if (sk->sk_shutdown == SHUTDOWN_MASK)
 798                revents |= EPOLLHUP;
 799
 800        switch (sk->sk_state) {
 801        case TIPC_ESTABLISHED:
 802                if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
 803                        revents |= EPOLLOUT;
 804                fallthrough;
 805        case TIPC_LISTEN:
 806        case TIPC_CONNECTING:
 807                if (!skb_queue_empty_lockless(&sk->sk_receive_queue))
 808                        revents |= EPOLLIN | EPOLLRDNORM;
 809                break;
 810        case TIPC_OPEN:
 811                if (tsk->group_is_open && !tsk->cong_link_cnt)
 812                        revents |= EPOLLOUT;
 813                if (!tipc_sk_type_connectionless(sk))
 814                        break;
 815                if (skb_queue_empty_lockless(&sk->sk_receive_queue))
 816                        break;
 817                revents |= EPOLLIN | EPOLLRDNORM;
 818                break;
 819        case TIPC_DISCONNECTING:
 820                revents = EPOLLIN | EPOLLRDNORM | EPOLLHUP;
 821                break;
 822        }
 823        return revents;
 824}
 825
 826/**
 827 * tipc_sendmcast - send multicast message
 828 * @sock: socket structure
 829 * @seq: destination address
 830 * @msg: message to send
 831 * @dlen: length of data to send
 832 * @timeout: timeout to wait for wakeup
 833 *
 834 * Called from function tipc_sendmsg(), which has done all sanity checks
 835 * Return: the number of bytes sent on success, or errno
 836 */
 837static int tipc_sendmcast(struct  socket *sock, struct tipc_service_range *seq,
 838                          struct msghdr *msg, size_t dlen, long timeout)
 839{
 840        struct sock *sk = sock->sk;
 841        struct tipc_sock *tsk = tipc_sk(sk);
 842        struct tipc_msg *hdr = &tsk->phdr;
 843        struct net *net = sock_net(sk);
 844        int mtu = tipc_bcast_get_mtu(net);
 845        struct tipc_mc_method *method = &tsk->mc_method;
 846        struct sk_buff_head pkts;
 847        struct tipc_nlist dsts;
 848        int rc;
 849
 850        if (tsk->group)
 851                return -EACCES;
 852
 853        /* Block or return if any destination link is congested */
 854        rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
 855        if (unlikely(rc))
 856                return rc;
 857
 858        /* Lookup destination nodes */
 859        tipc_nlist_init(&dsts, tipc_own_addr(net));
 860        tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower,
 861                                      seq->upper, &dsts);
 862        if (!dsts.local && !dsts.remote)
 863                return -EHOSTUNREACH;
 864
 865        /* Build message header */
 866        msg_set_type(hdr, TIPC_MCAST_MSG);
 867        msg_set_hdr_sz(hdr, MCAST_H_SIZE);
 868        msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
 869        msg_set_destport(hdr, 0);
 870        msg_set_destnode(hdr, 0);
 871        msg_set_nametype(hdr, seq->type);
 872        msg_set_namelower(hdr, seq->lower);
 873        msg_set_nameupper(hdr, seq->upper);
 874
 875        /* Build message as chain of buffers */
 876        __skb_queue_head_init(&pkts);
 877        rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
 878
 879        /* Send message if build was successful */
 880        if (unlikely(rc == dlen)) {
 881                trace_tipc_sk_sendmcast(sk, skb_peek(&pkts),
 882                                        TIPC_DUMP_SK_SNDQ, " ");
 883                rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
 884                                     &tsk->cong_link_cnt);
 885        }
 886
 887        tipc_nlist_purge(&dsts);
 888
 889        return rc ? rc : dlen;
 890}
 891
 892/**
 893 * tipc_send_group_msg - send a message to a member in the group
 894 * @net: network namespace
 895 * @tsk: tipc socket
 896 * @m: message to send
 897 * @mb: group member
 898 * @dnode: destination node
 899 * @dport: destination port
 900 * @dlen: total length of message data
 901 */
 902static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
 903                               struct msghdr *m, struct tipc_member *mb,
 904                               u32 dnode, u32 dport, int dlen)
 905{
 906        u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group);
 907        struct tipc_mc_method *method = &tsk->mc_method;
 908        int blks = tsk_blocks(GROUP_H_SIZE + dlen);
 909        struct tipc_msg *hdr = &tsk->phdr;
 910        struct sk_buff_head pkts;
 911        int mtu, rc;
 912
 913        /* Complete message header */
 914        msg_set_type(hdr, TIPC_GRP_UCAST_MSG);
 915        msg_set_hdr_sz(hdr, GROUP_H_SIZE);
 916        msg_set_destport(hdr, dport);
 917        msg_set_destnode(hdr, dnode);
 918        msg_set_grp_bc_seqno(hdr, bc_snd_nxt);
 919
 920        /* Build message as chain of buffers */
 921        __skb_queue_head_init(&pkts);
 922        mtu = tipc_node_get_mtu(net, dnode, tsk->portid, false);
 923        rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
 924        if (unlikely(rc != dlen))
 925                return rc;
 926
 927        /* Send message */
 928        rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
 929        if (unlikely(rc == -ELINKCONG)) {
 930                tipc_dest_push(&tsk->cong_links, dnode, 0);
 931                tsk->cong_link_cnt++;
 932        }
 933
 934        /* Update send window */
 935        tipc_group_update_member(mb, blks);
 936
 937        /* A broadcast sent within next EXPIRE period must follow same path */
 938        method->rcast = true;
 939        method->mandatory = true;
 940        return dlen;
 941}
 942
 943/**
 944 * tipc_send_group_unicast - send message to a member in the group
 945 * @sock: socket structure
 946 * @m: message to send
 947 * @dlen: total length of message data
 948 * @timeout: timeout to wait for wakeup
 949 *
 950 * Called from function tipc_sendmsg(), which has done all sanity checks
 951 * Return: the number of bytes sent on success, or errno
 952 */
 953static int tipc_send_group_unicast(struct socket *sock, struct msghdr *m,
 954                                   int dlen, long timeout)
 955{
 956        struct sock *sk = sock->sk;
 957        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
 958        int blks = tsk_blocks(GROUP_H_SIZE + dlen);
 959        struct tipc_sock *tsk = tipc_sk(sk);
 960        struct net *net = sock_net(sk);
 961        struct tipc_member *mb = NULL;
 962        u32 node, port;
 963        int rc;
 964
 965        node = dest->addr.id.node;
 966        port = dest->addr.id.ref;
 967        if (!port && !node)
 968                return -EHOSTUNREACH;
 969
 970        /* Block or return if destination link or member is congested */
 971        rc = tipc_wait_for_cond(sock, &timeout,
 972                                !tipc_dest_find(&tsk->cong_links, node, 0) &&
 973                                tsk->group &&
 974                                !tipc_group_cong(tsk->group, node, port, blks,
 975                                                 &mb));
 976        if (unlikely(rc))
 977                return rc;
 978
 979        if (unlikely(!mb))
 980                return -EHOSTUNREACH;
 981
 982        rc = tipc_send_group_msg(net, tsk, m, mb, node, port, dlen);
 983
 984        return rc ? rc : dlen;
 985}
 986
 987/**
 988 * tipc_send_group_anycast - send message to any member with given identity
 989 * @sock: socket structure
 990 * @m: message to send
 991 * @dlen: total length of message data
 992 * @timeout: timeout to wait for wakeup
 993 *
 994 * Called from function tipc_sendmsg(), which has done all sanity checks
 995 * Return: the number of bytes sent on success, or errno
 996 */
 997static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
 998                                   int dlen, long timeout)
 999{
1000        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1001        struct sock *sk = sock->sk;
1002        struct tipc_sock *tsk = tipc_sk(sk);
1003        struct list_head *cong_links = &tsk->cong_links;
1004        int blks = tsk_blocks(GROUP_H_SIZE + dlen);
1005        struct tipc_msg *hdr = &tsk->phdr;
1006        struct tipc_member *first = NULL;
1007        struct tipc_member *mbr = NULL;
1008        struct net *net = sock_net(sk);
1009        u32 node, port, exclude;
1010        struct list_head dsts;
1011        u32 type, inst, scope;
1012        int lookups = 0;
1013        int dstcnt, rc;
1014        bool cong;
1015
1016        INIT_LIST_HEAD(&dsts);
1017
1018        type = msg_nametype(hdr);
1019        inst = dest->addr.name.name.instance;
1020        scope = msg_lookup_scope(hdr);
1021
1022        while (++lookups < 4) {
1023                exclude = tipc_group_exclude(tsk->group);
1024
1025                first = NULL;
1026
1027                /* Look for a non-congested destination member, if any */
1028                while (1) {
1029                        if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts,
1030                                                 &dstcnt, exclude, false))
1031                                return -EHOSTUNREACH;
1032                        tipc_dest_pop(&dsts, &node, &port);
1033                        cong = tipc_group_cong(tsk->group, node, port, blks,
1034                                               &mbr);
1035                        if (!cong)
1036                                break;
1037                        if (mbr == first)
1038                                break;
1039                        if (!first)
1040                                first = mbr;
1041                }
1042
1043                /* Start over if destination was not in member list */
1044                if (unlikely(!mbr))
1045                        continue;
1046
1047                if (likely(!cong && !tipc_dest_find(cong_links, node, 0)))
1048                        break;
1049
1050                /* Block or return if destination link or member is congested */
1051                rc = tipc_wait_for_cond(sock, &timeout,
1052                                        !tipc_dest_find(cong_links, node, 0) &&
1053                                        tsk->group &&
1054                                        !tipc_group_cong(tsk->group, node, port,
1055                                                         blks, &mbr));
1056                if (unlikely(rc))
1057                        return rc;
1058
1059                /* Send, unless destination disappeared while waiting */
1060                if (likely(mbr))
1061                        break;
1062        }
1063
1064        if (unlikely(lookups >= 4))
1065                return -EHOSTUNREACH;
1066
1067        rc = tipc_send_group_msg(net, tsk, m, mbr, node, port, dlen);
1068
1069        return rc ? rc : dlen;
1070}
1071
1072/**
1073 * tipc_send_group_bcast - send message to all members in communication group
1074 * @sock: socket structure
1075 * @m: message to send
1076 * @dlen: total length of message data
1077 * @timeout: timeout to wait for wakeup
1078 *
1079 * Called from function tipc_sendmsg(), which has done all sanity checks
1080 * Return: the number of bytes sent on success, or errno
1081 */
1082static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
1083                                 int dlen, long timeout)
1084{
1085        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1086        struct sock *sk = sock->sk;
1087        struct net *net = sock_net(sk);
1088        struct tipc_sock *tsk = tipc_sk(sk);
1089        struct tipc_nlist *dsts;
1090        struct tipc_mc_method *method = &tsk->mc_method;
1091        bool ack = method->mandatory && method->rcast;
1092        int blks = tsk_blocks(MCAST_H_SIZE + dlen);
1093        struct tipc_msg *hdr = &tsk->phdr;
1094        int mtu = tipc_bcast_get_mtu(net);
1095        struct sk_buff_head pkts;
1096        int rc = -EHOSTUNREACH;
1097
1098        /* Block or return if any destination link or member is congested */
1099        rc = tipc_wait_for_cond(sock, &timeout,
1100                                !tsk->cong_link_cnt && tsk->group &&
1101                                !tipc_group_bc_cong(tsk->group, blks));
1102        if (unlikely(rc))
1103                return rc;
1104
1105        dsts = tipc_group_dests(tsk->group);
1106        if (!dsts->local && !dsts->remote)
1107                return -EHOSTUNREACH;
1108
1109        /* Complete message header */
1110        if (dest) {
1111                msg_set_type(hdr, TIPC_GRP_MCAST_MSG);
1112                msg_set_nameinst(hdr, dest->addr.name.name.instance);
1113        } else {
1114                msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
1115                msg_set_nameinst(hdr, 0);
1116        }
1117        msg_set_hdr_sz(hdr, GROUP_H_SIZE);
1118        msg_set_destport(hdr, 0);
1119        msg_set_destnode(hdr, 0);
1120        msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(tsk->group));
1121
1122        /* Avoid getting stuck with repeated forced replicasts */
1123        msg_set_grp_bc_ack_req(hdr, ack);
1124
1125        /* Build message as chain of buffers */
1126        __skb_queue_head_init(&pkts);
1127        rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
1128        if (unlikely(rc != dlen))
1129                return rc;
1130
1131        /* Send message */
1132        rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt);
1133        if (unlikely(rc))
1134                return rc;
1135
1136        /* Update broadcast sequence number and send windows */
1137        tipc_group_update_bc_members(tsk->group, blks, ack);
1138
1139        /* Broadcast link is now free to choose method for next broadcast */
1140        method->mandatory = false;
1141        method->expires = jiffies;
1142
1143        return dlen;
1144}
1145
1146/**
1147 * tipc_send_group_mcast - send message to all members with given identity
1148 * @sock: socket structure
1149 * @m: message to send
1150 * @dlen: total length of message data
1151 * @timeout: timeout to wait for wakeup
1152 *
1153 * Called from function tipc_sendmsg(), which has done all sanity checks
1154 * Return: the number of bytes sent on success, or errno
1155 */
1156static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
1157                                 int dlen, long timeout)
1158{
1159        struct sock *sk = sock->sk;
1160        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1161        struct tipc_sock *tsk = tipc_sk(sk);
1162        struct tipc_group *grp = tsk->group;
1163        struct tipc_msg *hdr = &tsk->phdr;
1164        struct net *net = sock_net(sk);
1165        u32 type, inst, scope, exclude;
1166        struct list_head dsts;
1167        u32 dstcnt;
1168
1169        INIT_LIST_HEAD(&dsts);
1170
1171        type = msg_nametype(hdr);
1172        inst = dest->addr.name.name.instance;
1173        scope = msg_lookup_scope(hdr);
1174        exclude = tipc_group_exclude(grp);
1175
1176        if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts,
1177                                 &dstcnt, exclude, true))
1178                return -EHOSTUNREACH;
1179
1180        if (dstcnt == 1) {
1181                tipc_dest_pop(&dsts, &dest->addr.id.node, &dest->addr.id.ref);
1182                return tipc_send_group_unicast(sock, m, dlen, timeout);
1183        }
1184
1185        tipc_dest_list_purge(&dsts);
1186        return tipc_send_group_bcast(sock, m, dlen, timeout);
1187}
1188
1189/**
1190 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
1191 * @net: the associated network namespace
1192 * @arrvq: queue with arriving messages, to be cloned after destination lookup
1193 * @inputq: queue with cloned messages, delivered to socket after dest lookup
1194 *
1195 * Multi-threaded: parallel calls with reference to same queues may occur
1196 */
1197void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
1198                       struct sk_buff_head *inputq)
1199{
1200        u32 self = tipc_own_addr(net);
1201        u32 type, lower, upper, scope;
1202        struct sk_buff *skb, *_skb;
1203        u32 portid, onode;
1204        struct sk_buff_head tmpq;
1205        struct list_head dports;
1206        struct tipc_msg *hdr;
1207        int user, mtyp, hlen;
1208        bool exact;
1209
1210        __skb_queue_head_init(&tmpq);
1211        INIT_LIST_HEAD(&dports);
1212
1213        skb = tipc_skb_peek(arrvq, &inputq->lock);
1214        for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
1215                hdr = buf_msg(skb);
1216                user = msg_user(hdr);
1217                mtyp = msg_type(hdr);
1218                hlen = skb_headroom(skb) + msg_hdr_sz(hdr);
1219                onode = msg_orignode(hdr);
1220                type = msg_nametype(hdr);
1221
1222                if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
1223                        spin_lock_bh(&inputq->lock);
1224                        if (skb_peek(arrvq) == skb) {
1225                                __skb_dequeue(arrvq);
1226                                __skb_queue_tail(inputq, skb);
1227                        }
1228                        kfree_skb(skb);
1229                        spin_unlock_bh(&inputq->lock);
1230                        continue;
1231                }
1232
1233                /* Group messages require exact scope match */
1234                if (msg_in_group(hdr)) {
1235                        lower = 0;
1236                        upper = ~0;
1237                        scope = msg_lookup_scope(hdr);
1238                        exact = true;
1239                } else {
1240                        /* TIPC_NODE_SCOPE means "any scope" in this context */
1241                        if (onode == self)
1242                                scope = TIPC_NODE_SCOPE;
1243                        else
1244                                scope = TIPC_CLUSTER_SCOPE;
1245                        exact = false;
1246                        lower = msg_namelower(hdr);
1247                        upper = msg_nameupper(hdr);
1248                }
1249
1250                /* Create destination port list: */
1251                tipc_nametbl_mc_lookup(net, type, lower, upper,
1252                                       scope, exact, &dports);
1253
1254                /* Clone message per destination */
1255                while (tipc_dest_pop(&dports, NULL, &portid)) {
1256                        _skb = __pskb_copy(skb, hlen, GFP_ATOMIC);
1257                        if (_skb) {
1258                                msg_set_destport(buf_msg(_skb), portid);
1259                                __skb_queue_tail(&tmpq, _skb);
1260                                continue;
1261                        }
1262                        pr_warn("Failed to clone mcast rcv buffer\n");
1263                }
1264                /* Append to inputq if not already done by other thread */
1265                spin_lock_bh(&inputq->lock);
1266                if (skb_peek(arrvq) == skb) {
1267                        skb_queue_splice_tail_init(&tmpq, inputq);
1268                        kfree_skb(__skb_dequeue(arrvq));
1269                }
1270                spin_unlock_bh(&inputq->lock);
1271                __skb_queue_purge(&tmpq);
1272                kfree_skb(skb);
1273        }
1274        tipc_sk_rcv(net, inputq);
1275}
1276
1277/* tipc_sk_push_backlog(): send accumulated buffers in socket write queue
1278 *                         when socket is in Nagle mode
1279 */
1280static void tipc_sk_push_backlog(struct tipc_sock *tsk, bool nagle_ack)
1281{
1282        struct sk_buff_head *txq = &tsk->sk.sk_write_queue;
1283        struct sk_buff *skb = skb_peek_tail(txq);
1284        struct net *net = sock_net(&tsk->sk);
1285        u32 dnode = tsk_peer_node(tsk);
1286        int rc;
1287
1288        if (nagle_ack) {
1289                tsk->pkt_cnt += skb_queue_len(txq);
1290                if (!tsk->pkt_cnt || tsk->msg_acc / tsk->pkt_cnt < 2) {
1291                        tsk->oneway = 0;
1292                        if (tsk->nagle_start < NAGLE_START_MAX)
1293                                tsk->nagle_start *= 2;
1294                        tsk->expect_ack = false;
1295                        pr_debug("tsk %10u: bad nagle %u -> %u, next start %u!\n",
1296                                 tsk->portid, tsk->msg_acc, tsk->pkt_cnt,
1297                                 tsk->nagle_start);
1298                } else {
1299                        tsk->nagle_start = NAGLE_START_INIT;
1300                        if (skb) {
1301                                msg_set_ack_required(buf_msg(skb));
1302                                tsk->expect_ack = true;
1303                        } else {
1304                                tsk->expect_ack = false;
1305                        }
1306                }
1307                tsk->msg_acc = 0;
1308                tsk->pkt_cnt = 0;
1309        }
1310
1311        if (!skb || tsk->cong_link_cnt)
1312                return;
1313
1314        /* Do not send SYN again after congestion */
1315        if (msg_is_syn(buf_msg(skb)))
1316                return;
1317
1318        if (tsk->msg_acc)
1319                tsk->pkt_cnt += skb_queue_len(txq);
1320        tsk->snt_unacked += tsk->snd_backlog;
1321        tsk->snd_backlog = 0;
1322        rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
1323        if (rc == -ELINKCONG)
1324                tsk->cong_link_cnt = 1;
1325}
1326
1327/**
1328 * tipc_sk_conn_proto_rcv - receive a connection mng protocol message
1329 * @tsk: receiving socket
1330 * @skb: pointer to message buffer.
1331 * @inputq: buffer list containing the buffers
1332 * @xmitq: output message area
1333 */
1334static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
1335                                   struct sk_buff_head *inputq,
1336                                   struct sk_buff_head *xmitq)
1337{
1338        struct tipc_msg *hdr = buf_msg(skb);
1339        u32 onode = tsk_own_node(tsk);
1340        struct sock *sk = &tsk->sk;
1341        int mtyp = msg_type(hdr);
1342        bool was_cong;
1343
1344        /* Ignore if connection cannot be validated: */
1345        if (!tsk_peer_msg(tsk, hdr)) {
1346                trace_tipc_sk_drop_msg(sk, skb, TIPC_DUMP_NONE, "@proto_rcv!");
1347                goto exit;
1348        }
1349
1350        if (unlikely(msg_errcode(hdr))) {
1351                tipc_set_sk_state(sk, TIPC_DISCONNECTING);
1352                tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
1353                                      tsk_peer_port(tsk));
1354                sk->sk_state_change(sk);
1355
1356                /* State change is ignored if socket already awake,
1357                 * - convert msg to abort msg and add to inqueue
1358                 */
1359                msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
1360                msg_set_type(hdr, TIPC_CONN_MSG);
1361                msg_set_size(hdr, BASIC_H_SIZE);
1362                msg_set_hdr_sz(hdr, BASIC_H_SIZE);
1363                __skb_queue_tail(inputq, skb);
1364                return;
1365        }
1366
1367        tsk->probe_unacked = false;
1368
1369        if (mtyp == CONN_PROBE) {
1370                msg_set_type(hdr, CONN_PROBE_REPLY);
1371                if (tipc_msg_reverse(onode, &skb, TIPC_OK))
1372                        __skb_queue_tail(xmitq, skb);
1373                return;
1374        } else if (mtyp == CONN_ACK) {
1375                was_cong = tsk_conn_cong(tsk);
1376                tipc_sk_push_backlog(tsk, msg_nagle_ack(hdr));
1377                tsk->snt_unacked -= msg_conn_ack(hdr);
1378                if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
1379                        tsk->snd_win = msg_adv_win(hdr);
1380                if (was_cong && !tsk_conn_cong(tsk))
1381                        sk->sk_write_space(sk);
1382        } else if (mtyp != CONN_PROBE_REPLY) {
1383                pr_warn("Received unknown CONN_PROTO msg\n");
1384        }
1385exit:
1386        kfree_skb(skb);
1387}
1388
1389/**
1390 * tipc_sendmsg - send message in connectionless manner
1391 * @sock: socket structure
1392 * @m: message to send
1393 * @dsz: amount of user data to be sent
1394 *
1395 * Message must have an destination specified explicitly.
1396 * Used for SOCK_RDM and SOCK_DGRAM messages,
1397 * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
1398 * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
1399 *
1400 * Return: the number of bytes sent on success, or errno otherwise
1401 */
1402static int tipc_sendmsg(struct socket *sock,
1403                        struct msghdr *m, size_t dsz)
1404{
1405        struct sock *sk = sock->sk;
1406        int ret;
1407
1408        lock_sock(sk);
1409        ret = __tipc_sendmsg(sock, m, dsz);
1410        release_sock(sk);
1411
1412        return ret;
1413}
1414
1415static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
1416{
1417        struct sock *sk = sock->sk;
1418        struct net *net = sock_net(sk);
1419        struct tipc_sock *tsk = tipc_sk(sk);
1420        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1421        long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1422        struct list_head *clinks = &tsk->cong_links;
1423        bool syn = !tipc_sk_type_connectionless(sk);
1424        struct tipc_group *grp = tsk->group;
1425        struct tipc_msg *hdr = &tsk->phdr;
1426        struct tipc_service_range *seq;
1427        struct sk_buff_head pkts;
1428        u32 dport = 0, dnode = 0;
1429        u32 type = 0, inst = 0;
1430        int mtu, rc;
1431
1432        if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
1433                return -EMSGSIZE;
1434
1435        if (likely(dest)) {
1436                if (unlikely(m->msg_namelen < sizeof(*dest)))
1437                        return -EINVAL;
1438                if (unlikely(dest->family != AF_TIPC))
1439                        return -EINVAL;
1440        }
1441
1442        if (grp) {
1443                if (!dest)
1444                        return tipc_send_group_bcast(sock, m, dlen, timeout);
1445                if (dest->addrtype == TIPC_SERVICE_ADDR)
1446                        return tipc_send_group_anycast(sock, m, dlen, timeout);
1447                if (dest->addrtype == TIPC_SOCKET_ADDR)
1448                        return tipc_send_group_unicast(sock, m, dlen, timeout);
1449                if (dest->addrtype == TIPC_ADDR_MCAST)
1450                        return tipc_send_group_mcast(sock, m, dlen, timeout);
1451                return -EINVAL;
1452        }
1453
1454        if (unlikely(!dest)) {
1455                dest = &tsk->peer;
1456                if (!syn && dest->family != AF_TIPC)
1457                        return -EDESTADDRREQ;
1458        }
1459
1460        if (unlikely(syn)) {
1461                if (sk->sk_state == TIPC_LISTEN)
1462                        return -EPIPE;
1463                if (sk->sk_state != TIPC_OPEN)
1464                        return -EISCONN;
1465                if (tsk->published)
1466                        return -EOPNOTSUPP;
1467                if (dest->addrtype == TIPC_SERVICE_ADDR) {
1468                        tsk->conn_type = dest->addr.name.name.type;
1469                        tsk->conn_instance = dest->addr.name.name.instance;
1470                }
1471                msg_set_syn(hdr, 1);
1472        }
1473
1474        seq = &dest->addr.nameseq;
1475        if (dest->addrtype == TIPC_ADDR_MCAST)
1476                return tipc_sendmcast(sock, seq, m, dlen, timeout);
1477
1478        if (dest->addrtype == TIPC_SERVICE_ADDR) {
1479                type = dest->addr.name.name.type;
1480                inst = dest->addr.name.name.instance;
1481                dnode = dest->addr.name.domain;
1482                dport = tipc_nametbl_translate(net, type, inst, &dnode);
1483                if (unlikely(!dport && !dnode))
1484                        return -EHOSTUNREACH;
1485        } else if (dest->addrtype == TIPC_SOCKET_ADDR) {
1486                dnode = dest->addr.id.node;
1487        } else {
1488                return -EINVAL;
1489        }
1490
1491        /* Block or return if destination link is congested */
1492        rc = tipc_wait_for_cond(sock, &timeout,
1493                                !tipc_dest_find(clinks, dnode, 0));
1494        if (unlikely(rc))
1495                return rc;
1496
1497        if (dest->addrtype == TIPC_SERVICE_ADDR) {
1498                msg_set_type(hdr, TIPC_NAMED_MSG);
1499                msg_set_hdr_sz(hdr, NAMED_H_SIZE);
1500                msg_set_nametype(hdr, type);
1501                msg_set_nameinst(hdr, inst);
1502                msg_set_lookup_scope(hdr, tipc_node2scope(dnode));
1503                msg_set_destnode(hdr, dnode);
1504                msg_set_destport(hdr, dport);
1505        } else { /* TIPC_SOCKET_ADDR */
1506                msg_set_type(hdr, TIPC_DIRECT_MSG);
1507                msg_set_lookup_scope(hdr, 0);
1508                msg_set_destnode(hdr, dnode);
1509                msg_set_destport(hdr, dest->addr.id.ref);
1510                msg_set_hdr_sz(hdr, BASIC_H_SIZE);
1511        }
1512
1513        __skb_queue_head_init(&pkts);
1514        mtu = tipc_node_get_mtu(net, dnode, tsk->portid, true);
1515        rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
1516        if (unlikely(rc != dlen))
1517                return rc;
1518        if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue))) {
1519                __skb_queue_purge(&pkts);
1520                return -ENOMEM;
1521        }
1522
1523        trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " ");
1524        rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
1525        if (unlikely(rc == -ELINKCONG)) {
1526                tipc_dest_push(clinks, dnode, 0);
1527                tsk->cong_link_cnt++;
1528                rc = 0;
1529        }
1530
1531        if (unlikely(syn && !rc))
1532                tipc_set_sk_state(sk, TIPC_CONNECTING);
1533
1534        return rc ? rc : dlen;
1535}
1536
1537/**
1538 * tipc_sendstream - send stream-oriented data
1539 * @sock: socket structure
1540 * @m: data to send
1541 * @dsz: total length of data to be transmitted
1542 *
1543 * Used for SOCK_STREAM data.
1544 *
1545 * Return: the number of bytes sent on success (or partial success),
1546 * or errno if no data sent
1547 */
1548static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
1549{
1550        struct sock *sk = sock->sk;
1551        int ret;
1552
1553        lock_sock(sk);
1554        ret = __tipc_sendstream(sock, m, dsz);
1555        release_sock(sk);
1556
1557        return ret;
1558}
1559
1560static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
1561{
1562        struct sock *sk = sock->sk;
1563        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1564        long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1565        struct sk_buff_head *txq = &sk->sk_write_queue;
1566        struct tipc_sock *tsk = tipc_sk(sk);
1567        struct tipc_msg *hdr = &tsk->phdr;
1568        struct net *net = sock_net(sk);
1569        struct sk_buff *skb;
1570        u32 dnode = tsk_peer_node(tsk);
1571        int maxnagle = tsk->maxnagle;
1572        int maxpkt = tsk->max_pkt;
1573        int send, sent = 0;
1574        int blocks, rc = 0;
1575
1576        if (unlikely(dlen > INT_MAX))
1577                return -EMSGSIZE;
1578
1579        /* Handle implicit connection setup */
1580        if (unlikely(dest)) {
1581                rc = __tipc_sendmsg(sock, m, dlen);
1582                if (dlen && dlen == rc) {
1583                        tsk->peer_caps = tipc_node_get_capabilities(net, dnode);
1584                        tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
1585                }
1586                return rc;
1587        }
1588
1589        do {
1590                rc = tipc_wait_for_cond(sock, &timeout,
1591                                        (!tsk->cong_link_cnt &&
1592                                         !tsk_conn_cong(tsk) &&
1593                                         tipc_sk_connected(sk)));
1594                if (unlikely(rc))
1595                        break;
1596                send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
1597                blocks = tsk->snd_backlog;
1598                if (tsk->oneway++ >= tsk->nagle_start && maxnagle &&
1599                    send <= maxnagle) {
1600                        rc = tipc_msg_append(hdr, m, send, maxnagle, txq);
1601                        if (unlikely(rc < 0))
1602                                break;
1603                        blocks += rc;
1604                        tsk->msg_acc++;
1605                        if (blocks <= 64 && tsk->expect_ack) {
1606                                tsk->snd_backlog = blocks;
1607                                sent += send;
1608                                break;
1609                        } else if (blocks > 64) {
1610                                tsk->pkt_cnt += skb_queue_len(txq);
1611                        } else {
1612                                skb = skb_peek_tail(txq);
1613                                if (skb) {
1614                                        msg_set_ack_required(buf_msg(skb));
1615                                        tsk->expect_ack = true;
1616                                } else {
1617                                        tsk->expect_ack = false;
1618                                }
1619                                tsk->msg_acc = 0;
1620                                tsk->pkt_cnt = 0;
1621                        }
1622                } else {
1623                        rc = tipc_msg_build(hdr, m, sent, send, maxpkt, txq);
1624                        if (unlikely(rc != send))
1625                                break;
1626                        blocks += tsk_inc(tsk, send + MIN_H_SIZE);
1627                }
1628                trace_tipc_sk_sendstream(sk, skb_peek(txq),
1629                                         TIPC_DUMP_SK_SNDQ, " ");
1630                rc = tipc_node_xmit(net, txq, dnode, tsk->portid);
1631                if (unlikely(rc == -ELINKCONG)) {
1632                        tsk->cong_link_cnt = 1;
1633                        rc = 0;
1634                }
1635                if (likely(!rc)) {
1636                        tsk->snt_unacked += blocks;
1637                        tsk->snd_backlog = 0;
1638                        sent += send;
1639                }
1640        } while (sent < dlen && !rc);
1641
1642        return sent ? sent : rc;
1643}
1644
1645/**
1646 * tipc_send_packet - send a connection-oriented message
1647 * @sock: socket structure
1648 * @m: message to send
1649 * @dsz: length of data to be transmitted
1650 *
1651 * Used for SOCK_SEQPACKET messages.
1652 *
1653 * Return: the number of bytes sent on success, or errno otherwise
1654 */
1655static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
1656{
1657        if (dsz > TIPC_MAX_USER_MSG_SIZE)
1658                return -EMSGSIZE;
1659
1660        return tipc_sendstream(sock, m, dsz);
1661}
1662
1663/* tipc_sk_finish_conn - complete the setup of a connection
1664 */
1665static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1666                                u32 peer_node)
1667{
1668        struct sock *sk = &tsk->sk;
1669        struct net *net = sock_net(sk);
1670        struct tipc_msg *msg = &tsk->phdr;
1671
1672        msg_set_syn(msg, 0);
1673        msg_set_destnode(msg, peer_node);
1674        msg_set_destport(msg, peer_port);
1675        msg_set_type(msg, TIPC_CONN_MSG);
1676        msg_set_lookup_scope(msg, 0);
1677        msg_set_hdr_sz(msg, SHORT_H_SIZE);
1678
1679        sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
1680        tipc_set_sk_state(sk, TIPC_ESTABLISHED);
1681        tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1682        tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid, true);
1683        tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
1684        tsk_set_nagle(tsk);
1685        __skb_queue_purge(&sk->sk_write_queue);
1686        if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
1687                return;
1688
1689        /* Fall back to message based flow control */
1690        tsk->rcv_win = FLOWCTL_MSG_WIN;
1691        tsk->snd_win = FLOWCTL_MSG_WIN;
1692}
1693
1694/**
1695 * tipc_sk_set_orig_addr - capture sender's address for received message
1696 * @m: descriptor for message info
1697 * @skb: received message
1698 *
1699 * Note: Address is not captured if not requested by receiver.
1700 */
1701static void tipc_sk_set_orig_addr(struct msghdr *m, struct sk_buff *skb)
1702{
1703        DECLARE_SOCKADDR(struct sockaddr_pair *, srcaddr, m->msg_name);
1704        struct tipc_msg *hdr = buf_msg(skb);
1705
1706        if (!srcaddr)
1707                return;
1708
1709        srcaddr->sock.family = AF_TIPC;
1710        srcaddr->sock.addrtype = TIPC_SOCKET_ADDR;
1711        srcaddr->sock.scope = 0;
1712        srcaddr->sock.addr.id.ref = msg_origport(hdr);
1713        srcaddr->sock.addr.id.node = msg_orignode(hdr);
1714        srcaddr->sock.addr.name.domain = 0;
1715        m->msg_namelen = sizeof(struct sockaddr_tipc);
1716
1717        if (!msg_in_group(hdr))
1718                return;
1719
1720        /* Group message users may also want to know sending member's id */
1721        srcaddr->member.family = AF_TIPC;
1722        srcaddr->member.addrtype = TIPC_SERVICE_ADDR;
1723        srcaddr->member.scope = 0;
1724        srcaddr->member.addr.name.name.type = msg_nametype(hdr);
1725        srcaddr->member.addr.name.name.instance = TIPC_SKB_CB(skb)->orig_member;
1726        srcaddr->member.addr.name.domain = 0;
1727        m->msg_namelen = sizeof(*srcaddr);
1728}
1729
1730/**
1731 * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1732 * @m: descriptor for message info
1733 * @skb: received message buffer
1734 * @tsk: TIPC port associated with message
1735 *
1736 * Note: Ancillary data is not captured if not requested by receiver.
1737 *
1738 * Return: 0 if successful, otherwise errno
1739 */
1740static int tipc_sk_anc_data_recv(struct msghdr *m, struct sk_buff *skb,
1741                                 struct tipc_sock *tsk)
1742{
1743        struct tipc_msg *msg;
1744        u32 anc_data[3];
1745        u32 err;
1746        u32 dest_type;
1747        int has_name;
1748        int res;
1749
1750        if (likely(m->msg_controllen == 0))
1751                return 0;
1752        msg = buf_msg(skb);
1753
1754        /* Optionally capture errored message object(s) */
1755        err = msg ? msg_errcode(msg) : 0;
1756        if (unlikely(err)) {
1757                anc_data[0] = err;
1758                anc_data[1] = msg_data_sz(msg);
1759                res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
1760                if (res)
1761                        return res;
1762                if (anc_data[1]) {
1763                        if (skb_linearize(skb))
1764                                return -ENOMEM;
1765                        msg = buf_msg(skb);
1766                        res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
1767                                       msg_data(msg));
1768                        if (res)
1769                                return res;
1770                }
1771        }
1772
1773        /* Optionally capture message destination object */
1774        dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
1775        switch (dest_type) {
1776        case TIPC_NAMED_MSG:
1777                has_name = 1;
1778                anc_data[0] = msg_nametype(msg);
1779                anc_data[1] = msg_namelower(msg);
1780                anc_data[2] = msg_namelower(msg);
1781                break;
1782        case TIPC_MCAST_MSG:
1783                has_name = 1;
1784                anc_data[0] = msg_nametype(msg);
1785                anc_data[1] = msg_namelower(msg);
1786                anc_data[2] = msg_nameupper(msg);
1787                break;
1788        case TIPC_CONN_MSG:
1789                has_name = (tsk->conn_type != 0);
1790                anc_data[0] = tsk->conn_type;
1791                anc_data[1] = tsk->conn_instance;
1792                anc_data[2] = tsk->conn_instance;
1793                break;
1794        default:
1795                has_name = 0;
1796        }
1797        if (has_name) {
1798                res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
1799                if (res)
1800                        return res;
1801        }
1802
1803        return 0;
1804}
1805
1806static struct sk_buff *tipc_sk_build_ack(struct tipc_sock *tsk)
1807{
1808        struct sock *sk = &tsk->sk;
1809        struct sk_buff *skb = NULL;
1810        struct tipc_msg *msg;
1811        u32 peer_port = tsk_peer_port(tsk);
1812        u32 dnode = tsk_peer_node(tsk);
1813
1814        if (!tipc_sk_connected(sk))
1815                return NULL;
1816        skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
1817                              dnode, tsk_own_node(tsk), peer_port,
1818                              tsk->portid, TIPC_OK);
1819        if (!skb)
1820                return NULL;
1821        msg = buf_msg(skb);
1822        msg_set_conn_ack(msg, tsk->rcv_unacked);
1823        tsk->rcv_unacked = 0;
1824
1825        /* Adjust to and advertize the correct window limit */
1826        if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
1827                tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
1828                msg_set_adv_win(msg, tsk->rcv_win);
1829        }
1830        return skb;
1831}
1832
1833static void tipc_sk_send_ack(struct tipc_sock *tsk)
1834{
1835        struct sk_buff *skb;
1836
1837        skb = tipc_sk_build_ack(tsk);
1838        if (!skb)
1839                return;
1840
1841        tipc_node_xmit_skb(sock_net(&tsk->sk), skb, tsk_peer_node(tsk),
1842                           msg_link_selector(buf_msg(skb)));
1843}
1844
1845static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1846{
1847        struct sock *sk = sock->sk;
1848        DEFINE_WAIT_FUNC(wait, woken_wake_function);
1849        long timeo = *timeop;
1850        int err = sock_error(sk);
1851
1852        if (err)
1853                return err;
1854
1855        for (;;) {
1856                if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1857                        if (sk->sk_shutdown & RCV_SHUTDOWN) {
1858                                err = -ENOTCONN;
1859                                break;
1860                        }
1861                        add_wait_queue(sk_sleep(sk), &wait);
1862                        release_sock(sk);
1863                        timeo = wait_woken(&wait, TASK_INTERRUPTIBLE, timeo);
1864                        sched_annotate_sleep();
1865                        lock_sock(sk);
1866                        remove_wait_queue(sk_sleep(sk), &wait);
1867                }
1868                err = 0;
1869                if (!skb_queue_empty(&sk->sk_receive_queue))
1870                        break;
1871                err = -EAGAIN;
1872                if (!timeo)
1873                        break;
1874                err = sock_intr_errno(timeo);
1875                if (signal_pending(current))
1876                        break;
1877
1878                err = sock_error(sk);
1879                if (err)
1880                        break;
1881        }
1882        *timeop = timeo;
1883        return err;
1884}
1885
1886/**
1887 * tipc_recvmsg - receive packet-oriented message
1888 * @sock: network socket
1889 * @m: descriptor for message info
1890 * @buflen: length of user buffer area
1891 * @flags: receive flags
1892 *
1893 * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1894 * If the complete message doesn't fit in user area, truncate it.
1895 *
1896 * Return: size of returned message data, errno otherwise
1897 */
1898static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1899                        size_t buflen,  int flags)
1900{
1901        struct sock *sk = sock->sk;
1902        bool connected = !tipc_sk_type_connectionless(sk);
1903        struct tipc_sock *tsk = tipc_sk(sk);
1904        int rc, err, hlen, dlen, copy;
1905        struct sk_buff_head xmitq;
1906        struct tipc_msg *hdr;
1907        struct sk_buff *skb;
1908        bool grp_evt;
1909        long timeout;
1910
1911        /* Catch invalid receive requests */
1912        if (unlikely(!buflen))
1913                return -EINVAL;
1914
1915        lock_sock(sk);
1916        if (unlikely(connected && sk->sk_state == TIPC_OPEN)) {
1917                rc = -ENOTCONN;
1918                goto exit;
1919        }
1920        timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1921
1922        /* Step rcv queue to first msg with data or error; wait if necessary */
1923        do {
1924                rc = tipc_wait_for_rcvmsg(sock, &timeout);
1925                if (unlikely(rc))
1926                        goto exit;
1927                skb = skb_peek(&sk->sk_receive_queue);
1928                hdr = buf_msg(skb);
1929                dlen = msg_data_sz(hdr);
1930                hlen = msg_hdr_sz(hdr);
1931                err = msg_errcode(hdr);
1932                grp_evt = msg_is_grp_evt(hdr);
1933                if (likely(dlen || err))
1934                        break;
1935                tsk_advance_rx_queue(sk);
1936        } while (1);
1937
1938        /* Collect msg meta data, including error code and rejected data */
1939        tipc_sk_set_orig_addr(m, skb);
1940        rc = tipc_sk_anc_data_recv(m, skb, tsk);
1941        if (unlikely(rc))
1942                goto exit;
1943        hdr = buf_msg(skb);
1944
1945        /* Capture data if non-error msg, otherwise just set return value */
1946        if (likely(!err)) {
1947                copy = min_t(int, dlen, buflen);
1948                if (unlikely(copy != dlen))
1949                        m->msg_flags |= MSG_TRUNC;
1950                rc = skb_copy_datagram_msg(skb, hlen, m, copy);
1951        } else {
1952                copy = 0;
1953                rc = 0;
1954                if (err != TIPC_CONN_SHUTDOWN && connected && !m->msg_control)
1955                        rc = -ECONNRESET;
1956        }
1957        if (unlikely(rc))
1958                goto exit;
1959
1960        /* Mark message as group event if applicable */
1961        if (unlikely(grp_evt)) {
1962                if (msg_grp_evt(hdr) == TIPC_WITHDRAWN)
1963                        m->msg_flags |= MSG_EOR;
1964                m->msg_flags |= MSG_OOB;
1965                copy = 0;
1966        }
1967
1968        /* Caption of data or error code/rejected data was successful */
1969        if (unlikely(flags & MSG_PEEK))
1970                goto exit;
1971
1972        /* Send group flow control advertisement when applicable */
1973        if (tsk->group && msg_in_group(hdr) && !grp_evt) {
1974                __skb_queue_head_init(&xmitq);
1975                tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
1976                                          msg_orignode(hdr), msg_origport(hdr),
1977                                          &xmitq);
1978                tipc_node_distr_xmit(sock_net(sk), &xmitq);
1979        }
1980
1981        tsk_advance_rx_queue(sk);
1982
1983        if (likely(!connected))
1984                goto exit;
1985
1986        /* Send connection flow control advertisement when applicable */
1987        tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
1988        if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
1989                tipc_sk_send_ack(tsk);
1990exit:
1991        release_sock(sk);
1992        return rc ? rc : copy;
1993}
1994
1995/**
1996 * tipc_recvstream - receive stream-oriented data
1997 * @sock: network socket
1998 * @m: descriptor for message info
1999 * @buflen: total size of user buffer area
2000 * @flags: receive flags
2001 *
2002 * Used for SOCK_STREAM messages only.  If not enough data is available
2003 * will optionally wait for more; never truncates data.
2004 *
2005 * Return: size of returned message data, errno otherwise
2006 */
2007static int tipc_recvstream(struct socket *sock, struct msghdr *m,
2008                           size_t buflen, int flags)
2009{
2010        struct sock *sk = sock->sk;
2011        struct tipc_sock *tsk = tipc_sk(sk);
2012        struct sk_buff *skb;
2013        struct tipc_msg *hdr;
2014        struct tipc_skb_cb *skb_cb;
2015        bool peek = flags & MSG_PEEK;
2016        int offset, required, copy, copied = 0;
2017        int hlen, dlen, err, rc;
2018        long timeout;
2019
2020        /* Catch invalid receive attempts */
2021        if (unlikely(!buflen))
2022                return -EINVAL;
2023
2024        lock_sock(sk);
2025
2026        if (unlikely(sk->sk_state == TIPC_OPEN)) {
2027                rc = -ENOTCONN;
2028                goto exit;
2029        }
2030        required = sock_rcvlowat(sk, flags & MSG_WAITALL, buflen);
2031        timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
2032
2033        do {
2034                /* Look at first msg in receive queue; wait if necessary */
2035                rc = tipc_wait_for_rcvmsg(sock, &timeout);
2036                if (unlikely(rc))
2037                        break;
2038                skb = skb_peek(&sk->sk_receive_queue);
2039                skb_cb = TIPC_SKB_CB(skb);
2040                hdr = buf_msg(skb);
2041                dlen = msg_data_sz(hdr);
2042                hlen = msg_hdr_sz(hdr);
2043                err = msg_errcode(hdr);
2044
2045                /* Discard any empty non-errored (SYN-) message */
2046                if (unlikely(!dlen && !err)) {
2047                        tsk_advance_rx_queue(sk);
2048                        continue;
2049                }
2050
2051                /* Collect msg meta data, incl. error code and rejected data */
2052                if (!copied) {
2053                        tipc_sk_set_orig_addr(m, skb);
2054                        rc = tipc_sk_anc_data_recv(m, skb, tsk);
2055                        if (rc)
2056                                break;
2057                        hdr = buf_msg(skb);
2058                }
2059
2060                /* Copy data if msg ok, otherwise return error/partial data */
2061                if (likely(!err)) {
2062                        offset = skb_cb->bytes_read;
2063                        copy = min_t(int, dlen - offset, buflen - copied);
2064                        rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
2065                        if (unlikely(rc))
2066                                break;
2067                        copied += copy;
2068                        offset += copy;
2069                        if (unlikely(offset < dlen)) {
2070                                if (!peek)
2071                                        skb_cb->bytes_read = offset;
2072                                break;
2073                        }
2074                } else {
2075                        rc = 0;
2076                        if ((err != TIPC_CONN_SHUTDOWN) && !m->msg_control)
2077                                rc = -ECONNRESET;
2078                        if (copied || rc)
2079                                break;
2080                }
2081
2082                if (unlikely(peek))
2083                        break;
2084
2085                tsk_advance_rx_queue(sk);
2086
2087                /* Send connection flow control advertisement when applicable */
2088                tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
2089                if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
2090                        tipc_sk_send_ack(tsk);
2091
2092                /* Exit if all requested data or FIN/error received */
2093                if (copied == buflen || err)
2094                        break;
2095
2096        } while (!skb_queue_empty(&sk->sk_receive_queue) || copied < required);
2097exit:
2098        release_sock(sk);
2099        return copied ? copied : rc;
2100}
2101
2102/**
2103 * tipc_write_space - wake up thread if port congestion is released
2104 * @sk: socket
2105 */
2106static void tipc_write_space(struct sock *sk)
2107{
2108        struct socket_wq *wq;
2109
2110        rcu_read_lock();
2111        wq = rcu_dereference(sk->sk_wq);
2112        if (skwq_has_sleeper(wq))
2113                wake_up_interruptible_sync_poll(&wq->wait, EPOLLOUT |
2114                                                EPOLLWRNORM | EPOLLWRBAND);
2115        rcu_read_unlock();
2116}
2117
2118/**
2119 * tipc_data_ready - wake up threads to indicate messages have been received
2120 * @sk: socket
2121 */
2122static void tipc_data_ready(struct sock *sk)
2123{
2124        struct socket_wq *wq;
2125
2126        rcu_read_lock();
2127        wq = rcu_dereference(sk->sk_wq);
2128        if (skwq_has_sleeper(wq))
2129                wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN |
2130                                                EPOLLRDNORM | EPOLLRDBAND);
2131        rcu_read_unlock();
2132}
2133
2134static void tipc_sock_destruct(struct sock *sk)
2135{
2136        __skb_queue_purge(&sk->sk_receive_queue);
2137}
2138
2139static void tipc_sk_proto_rcv(struct sock *sk,
2140                              struct sk_buff_head *inputq,
2141                              struct sk_buff_head *xmitq)
2142{
2143        struct sk_buff *skb = __skb_dequeue(inputq);
2144        struct tipc_sock *tsk = tipc_sk(sk);
2145        struct tipc_msg *hdr = buf_msg(skb);
2146        struct tipc_group *grp = tsk->group;
2147        bool wakeup = false;
2148
2149        switch (msg_user(hdr)) {
2150        case CONN_MANAGER:
2151                tipc_sk_conn_proto_rcv(tsk, skb, inputq, xmitq);
2152                return;
2153        case SOCK_WAKEUP:
2154                tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);
2155                /* coupled with smp_rmb() in tipc_wait_for_cond() */
2156                smp_wmb();
2157                tsk->cong_link_cnt--;
2158                wakeup = true;
2159                tipc_sk_push_backlog(tsk, false);
2160                break;
2161        case GROUP_PROTOCOL:
2162                tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
2163                break;
2164        case TOP_SRV:
2165                tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
2166                                      hdr, inputq, xmitq);
2167                break;
2168        default:
2169                break;
2170        }
2171
2172        if (wakeup)
2173                sk->sk_write_space(sk);
2174
2175        kfree_skb(skb);
2176}
2177
2178/**
2179 * tipc_sk_filter_connect - check incoming message for a connection-based socket
2180 * @tsk: TIPC socket
2181 * @skb: pointer to message buffer.
2182 * @xmitq: for Nagle ACK if any
2183 * Return: true if message should be added to receive queue, false otherwise
2184 */
2185static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb,
2186                                   struct sk_buff_head *xmitq)
2187{
2188        struct sock *sk = &tsk->sk;
2189        struct net *net = sock_net(sk);
2190        struct tipc_msg *hdr = buf_msg(skb);
2191        bool con_msg = msg_connected(hdr);
2192        u32 pport = tsk_peer_port(tsk);
2193        u32 pnode = tsk_peer_node(tsk);
2194        u32 oport = msg_origport(hdr);
2195        u32 onode = msg_orignode(hdr);
2196        int err = msg_errcode(hdr);
2197        unsigned long delay;
2198
2199        if (unlikely(msg_mcast(hdr)))
2200                return false;
2201        tsk->oneway = 0;
2202
2203        switch (sk->sk_state) {
2204        case TIPC_CONNECTING:
2205                /* Setup ACK */
2206                if (likely(con_msg)) {
2207                        if (err)
2208                                break;
2209                        tipc_sk_finish_conn(tsk, oport, onode);
2210                        msg_set_importance(&tsk->phdr, msg_importance(hdr));
2211                        /* ACK+ message with data is added to receive queue */
2212                        if (msg_data_sz(hdr))
2213                                return true;
2214                        /* Empty ACK-, - wake up sleeping connect() and drop */
2215                        sk->sk_state_change(sk);
2216                        msg_set_dest_droppable(hdr, 1);
2217                        return false;
2218                }
2219                /* Ignore connectionless message if not from listening socket */
2220                if (oport != pport || onode != pnode)
2221                        return false;
2222
2223                /* Rejected SYN */
2224                if (err != TIPC_ERR_OVERLOAD)
2225                        break;
2226
2227                /* Prepare for new setup attempt if we have a SYN clone */
2228                if (skb_queue_empty(&sk->sk_write_queue))
2229                        break;
2230                get_random_bytes(&delay, 2);
2231                delay %= (tsk->conn_timeout / 4);
2232                delay = msecs_to_jiffies(delay + 100);
2233                sk_reset_timer(sk, &sk->sk_timer, jiffies + delay);
2234                return false;
2235        case TIPC_OPEN:
2236        case TIPC_DISCONNECTING:
2237                return false;
2238        case TIPC_LISTEN:
2239                /* Accept only SYN message */
2240                if (!msg_is_syn(hdr) &&
2241                    tipc_node_get_capabilities(net, onode) & TIPC_SYN_BIT)
2242                        return false;
2243                if (!con_msg && !err)
2244                        return true;
2245                return false;
2246        case TIPC_ESTABLISHED:
2247                if (!skb_queue_empty(&sk->sk_write_queue))
2248                        tipc_sk_push_backlog(tsk, false);
2249                /* Accept only connection-based messages sent by peer */
2250                if (likely(con_msg && !err && pport == oport &&
2251                           pnode == onode)) {
2252                        if (msg_ack_required(hdr)) {
2253                                struct sk_buff *skb;
2254
2255                                skb = tipc_sk_build_ack(tsk);
2256                                if (skb) {
2257                                        msg_set_nagle_ack(buf_msg(skb));
2258                                        __skb_queue_tail(xmitq, skb);
2259                                }
2260                        }
2261                        return true;
2262                }
2263                if (!tsk_peer_msg(tsk, hdr))
2264                        return false;
2265                if (!err)
2266                        return true;
2267                tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2268                tipc_node_remove_conn(net, pnode, tsk->portid);
2269                sk->sk_state_change(sk);
2270                return true;
2271        default:
2272                pr_err("Unknown sk_state %u\n", sk->sk_state);
2273        }
2274        /* Abort connection setup attempt */
2275        tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2276        sk->sk_err = ECONNREFUSED;
2277        sk->sk_state_change(sk);
2278        return true;
2279}
2280
2281/**
2282 * rcvbuf_limit - get proper overload limit of socket receive queue
2283 * @sk: socket
2284 * @skb: message
2285 *
2286 * For connection oriented messages, irrespective of importance,
2287 * default queue limit is 2 MB.
2288 *
2289 * For connectionless messages, queue limits are based on message
2290 * importance as follows:
2291 *
2292 * TIPC_LOW_IMPORTANCE       (2 MB)
2293 * TIPC_MEDIUM_IMPORTANCE    (4 MB)
2294 * TIPC_HIGH_IMPORTANCE      (8 MB)
2295 * TIPC_CRITICAL_IMPORTANCE  (16 MB)
2296 *
2297 * Return: overload limit according to corresponding message importance
2298 */
2299static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
2300{
2301        struct tipc_sock *tsk = tipc_sk(sk);
2302        struct tipc_msg *hdr = buf_msg(skb);
2303
2304        if (unlikely(msg_in_group(hdr)))
2305                return READ_ONCE(sk->sk_rcvbuf);
2306
2307        if (unlikely(!msg_connected(hdr)))
2308                return READ_ONCE(sk->sk_rcvbuf) << msg_importance(hdr);
2309
2310        if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
2311                return READ_ONCE(sk->sk_rcvbuf);
2312
2313        return FLOWCTL_MSG_LIM;
2314}
2315
2316/**
2317 * tipc_sk_filter_rcv - validate incoming message
2318 * @sk: socket
2319 * @skb: pointer to message.
2320 * @xmitq: output message area (FIXME)
2321 *
2322 * Enqueues message on receive queue if acceptable; optionally handles
2323 * disconnect indication for a connected socket.
2324 *
2325 * Called with socket lock already taken
2326 */
2327static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
2328                               struct sk_buff_head *xmitq)
2329{
2330        bool sk_conn = !tipc_sk_type_connectionless(sk);
2331        struct tipc_sock *tsk = tipc_sk(sk);
2332        struct tipc_group *grp = tsk->group;
2333        struct tipc_msg *hdr = buf_msg(skb);
2334        struct net *net = sock_net(sk);
2335        struct sk_buff_head inputq;
2336        int mtyp = msg_type(hdr);
2337        int limit, err = TIPC_OK;
2338
2339        trace_tipc_sk_filter_rcv(sk, skb, TIPC_DUMP_ALL, " ");
2340        TIPC_SKB_CB(skb)->bytes_read = 0;
2341        __skb_queue_head_init(&inputq);
2342        __skb_queue_tail(&inputq, skb);
2343
2344        if (unlikely(!msg_isdata(hdr)))
2345                tipc_sk_proto_rcv(sk, &inputq, xmitq);
2346
2347        if (unlikely(grp))
2348                tipc_group_filter_msg(grp, &inputq, xmitq);
2349
2350        if (unlikely(!grp) && mtyp == TIPC_MCAST_MSG)
2351                tipc_mcast_filter_msg(net, &tsk->mc_method.deferredq, &inputq);
2352
2353        /* Validate and add to receive buffer if there is space */
2354        while ((skb = __skb_dequeue(&inputq))) {
2355                hdr = buf_msg(skb);
2356                limit = rcvbuf_limit(sk, skb);
2357                if ((sk_conn && !tipc_sk_filter_connect(tsk, skb, xmitq)) ||
2358                    (!sk_conn && msg_connected(hdr)) ||
2359                    (!grp && msg_in_group(hdr)))
2360                        err = TIPC_ERR_NO_PORT;
2361                else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) {
2362                        trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL,
2363                                           "err_overload2!");
2364                        atomic_inc(&sk->sk_drops);
2365                        err = TIPC_ERR_OVERLOAD;
2366                }
2367
2368                if (unlikely(err)) {
2369                        if (tipc_msg_reverse(tipc_own_addr(net), &skb, err)) {
2370                                trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE,
2371                                                      "@filter_rcv!");
2372                                __skb_queue_tail(xmitq, skb);
2373                        }
2374                        err = TIPC_OK;
2375                        continue;
2376                }
2377                __skb_queue_tail(&sk->sk_receive_queue, skb);
2378                skb_set_owner_r(skb, sk);
2379                trace_tipc_sk_overlimit2(sk, skb, TIPC_DUMP_ALL,
2380                                         "rcvq >90% allocated!");
2381                sk->sk_data_ready(sk);
2382        }
2383}
2384
2385/**
2386 * tipc_sk_backlog_rcv - handle incoming message from backlog queue
2387 * @sk: socket
2388 * @skb: message
2389 *
2390 * Caller must hold socket lock
2391 */
2392static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
2393{
2394        unsigned int before = sk_rmem_alloc_get(sk);
2395        struct sk_buff_head xmitq;
2396        unsigned int added;
2397
2398        __skb_queue_head_init(&xmitq);
2399
2400        tipc_sk_filter_rcv(sk, skb, &xmitq);
2401        added = sk_rmem_alloc_get(sk) - before;
2402        atomic_add(added, &tipc_sk(sk)->dupl_rcvcnt);
2403
2404        /* Send pending response/rejected messages, if any */
2405        tipc_node_distr_xmit(sock_net(sk), &xmitq);
2406        return 0;
2407}
2408
2409/**
2410 * tipc_sk_enqueue - extract all buffers with destination 'dport' from
2411 *                   inputq and try adding them to socket or backlog queue
2412 * @inputq: list of incoming buffers with potentially different destinations
2413 * @sk: socket where the buffers should be enqueued
2414 * @dport: port number for the socket
2415 * @xmitq: output queue
2416 *
2417 * Caller must hold socket lock
2418 */
2419static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
2420                            u32 dport, struct sk_buff_head *xmitq)
2421{
2422        unsigned long time_limit = jiffies + 2;
2423        struct sk_buff *skb;
2424        unsigned int lim;
2425        atomic_t *dcnt;
2426        u32 onode;
2427
2428        while (skb_queue_len(inputq)) {
2429                if (unlikely(time_after_eq(jiffies, time_limit)))
2430                        return;
2431
2432                skb = tipc_skb_dequeue(inputq, dport);
2433                if (unlikely(!skb))
2434                        return;
2435
2436                /* Add message directly to receive queue if possible */
2437                if (!sock_owned_by_user(sk)) {
2438                        tipc_sk_filter_rcv(sk, skb, xmitq);
2439                        continue;
2440                }
2441
2442                /* Try backlog, compensating for double-counted bytes */
2443                dcnt = &tipc_sk(sk)->dupl_rcvcnt;
2444                if (!sk->sk_backlog.len)
2445                        atomic_set(dcnt, 0);
2446                lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
2447                if (likely(!sk_add_backlog(sk, skb, lim))) {
2448                        trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL,
2449                                                 "bklg & rcvq >90% allocated!");
2450                        continue;
2451                }
2452
2453                trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, "err_overload!");
2454                /* Overload => reject message back to sender */
2455                onode = tipc_own_addr(sock_net(sk));
2456                atomic_inc(&sk->sk_drops);
2457                if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD)) {
2458                        trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_ALL,
2459                                              "@sk_enqueue!");
2460                        __skb_queue_tail(xmitq, skb);
2461                }
2462                break;
2463        }
2464}
2465
2466/**
2467 * tipc_sk_rcv - handle a chain of incoming buffers
2468 * @net: the associated network namespace
2469 * @inputq: buffer list containing the buffers
2470 * Consumes all buffers in list until inputq is empty
2471 * Note: may be called in multiple threads referring to the same queue
2472 */
2473void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
2474{
2475        struct sk_buff_head xmitq;
2476        u32 dnode, dport = 0;
2477        int err;
2478        struct tipc_sock *tsk;
2479        struct sock *sk;
2480        struct sk_buff *skb;
2481
2482        __skb_queue_head_init(&xmitq);
2483        while (skb_queue_len(inputq)) {
2484                dport = tipc_skb_peek_port(inputq, dport);
2485                tsk = tipc_sk_lookup(net, dport);
2486
2487                if (likely(tsk)) {
2488                        sk = &tsk->sk;
2489                        if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
2490                                tipc_sk_enqueue(inputq, sk, dport, &xmitq);
2491                                spin_unlock_bh(&sk->sk_lock.slock);
2492                        }
2493                        /* Send pending response/rejected messages, if any */
2494                        tipc_node_distr_xmit(sock_net(sk), &xmitq);
2495                        sock_put(sk);
2496                        continue;
2497                }
2498                /* No destination socket => dequeue skb if still there */
2499                skb = tipc_skb_dequeue(inputq, dport);
2500                if (!skb)
2501                        return;
2502
2503                /* Try secondary lookup if unresolved named message */
2504                err = TIPC_ERR_NO_PORT;
2505                if (tipc_msg_lookup_dest(net, skb, &err))
2506                        goto xmit;
2507
2508                /* Prepare for message rejection */
2509                if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
2510                        continue;
2511
2512                trace_tipc_sk_rej_msg(NULL, skb, TIPC_DUMP_NONE, "@sk_rcv!");
2513xmit:
2514                dnode = msg_destnode(buf_msg(skb));
2515                tipc_node_xmit_skb(net, skb, dnode, dport);
2516        }
2517}
2518
2519static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
2520{
2521        DEFINE_WAIT_FUNC(wait, woken_wake_function);
2522        struct sock *sk = sock->sk;
2523        int done;
2524
2525        do {
2526                int err = sock_error(sk);
2527                if (err)
2528                        return err;
2529                if (!*timeo_p)
2530                        return -ETIMEDOUT;
2531                if (signal_pending(current))
2532                        return sock_intr_errno(*timeo_p);
2533                if (sk->sk_state == TIPC_DISCONNECTING)
2534                        break;
2535
2536                add_wait_queue(sk_sleep(sk), &wait);
2537                done = sk_wait_event(sk, timeo_p, tipc_sk_connected(sk),
2538                                     &wait);
2539                remove_wait_queue(sk_sleep(sk), &wait);
2540        } while (!done);
2541        return 0;
2542}
2543
2544static bool tipc_sockaddr_is_sane(struct sockaddr_tipc *addr)
2545{
2546        if (addr->family != AF_TIPC)
2547                return false;
2548        if (addr->addrtype == TIPC_SERVICE_RANGE)
2549                return (addr->addr.nameseq.lower <= addr->addr.nameseq.upper);
2550        return (addr->addrtype == TIPC_SERVICE_ADDR ||
2551                addr->addrtype == TIPC_SOCKET_ADDR);
2552}
2553
2554/**
2555 * tipc_connect - establish a connection to another TIPC port
2556 * @sock: socket structure
2557 * @dest: socket address for destination port
2558 * @destlen: size of socket address data structure
2559 * @flags: file-related flags associated with socket
2560 *
2561 * Return: 0 on success, errno otherwise
2562 */
2563static int tipc_connect(struct socket *sock, struct sockaddr *dest,
2564                        int destlen, int flags)
2565{
2566        struct sock *sk = sock->sk;
2567        struct tipc_sock *tsk = tipc_sk(sk);
2568        struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
2569        struct msghdr m = {NULL,};
2570        long timeout = (flags & O_NONBLOCK) ? 0 : tsk->conn_timeout;
2571        int previous;
2572        int res = 0;
2573
2574        if (destlen != sizeof(struct sockaddr_tipc))
2575                return -EINVAL;
2576
2577        lock_sock(sk);
2578
2579        if (tsk->group) {
2580                res = -EINVAL;
2581                goto exit;
2582        }
2583
2584        if (dst->family == AF_UNSPEC) {
2585                memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
2586                if (!tipc_sk_type_connectionless(sk))
2587                        res = -EINVAL;
2588                goto exit;
2589        }
2590        if (!tipc_sockaddr_is_sane(dst)) {
2591                res = -EINVAL;
2592                goto exit;
2593        }
2594        /* DGRAM/RDM connect(), just save the destaddr */
2595        if (tipc_sk_type_connectionless(sk)) {
2596                memcpy(&tsk->peer, dest, destlen);
2597                goto exit;
2598        } else if (dst->addrtype == TIPC_SERVICE_RANGE) {
2599                res = -EINVAL;
2600                goto exit;
2601        }
2602
2603        previous = sk->sk_state;
2604
2605        switch (sk->sk_state) {
2606        case TIPC_OPEN:
2607                /* Send a 'SYN-' to destination */
2608                m.msg_name = dest;
2609                m.msg_namelen = destlen;
2610
2611                /* If connect is in non-blocking case, set MSG_DONTWAIT to
2612                 * indicate send_msg() is never blocked.
2613                 */
2614                if (!timeout)
2615                        m.msg_flags = MSG_DONTWAIT;
2616
2617                res = __tipc_sendmsg(sock, &m, 0);
2618                if ((res < 0) && (res != -EWOULDBLOCK))
2619                        goto exit;
2620
2621                /* Just entered TIPC_CONNECTING state; the only
2622                 * difference is that return value in non-blocking
2623                 * case is EINPROGRESS, rather than EALREADY.
2624                 */
2625                res = -EINPROGRESS;
2626                fallthrough;
2627        case TIPC_CONNECTING:
2628                if (!timeout) {
2629                        if (previous == TIPC_CONNECTING)
2630                                res = -EALREADY;
2631                        goto exit;
2632                }
2633                timeout = msecs_to_jiffies(timeout);
2634                /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
2635                res = tipc_wait_for_connect(sock, &timeout);
2636                break;
2637        case TIPC_ESTABLISHED:
2638                res = -EISCONN;
2639                break;
2640        default:
2641                res = -EINVAL;
2642        }
2643
2644exit:
2645        release_sock(sk);
2646        return res;
2647}
2648
2649/**
2650 * tipc_listen - allow socket to listen for incoming connections
2651 * @sock: socket structure
2652 * @len: (unused)
2653 *
2654 * Return: 0 on success, errno otherwise
2655 */
2656static int tipc_listen(struct socket *sock, int len)
2657{
2658        struct sock *sk = sock->sk;
2659        int res;
2660
2661        lock_sock(sk);
2662        res = tipc_set_sk_state(sk, TIPC_LISTEN);
2663        release_sock(sk);
2664
2665        return res;
2666}
2667
2668static int tipc_wait_for_accept(struct socket *sock, long timeo)
2669{
2670        struct sock *sk = sock->sk;
2671        DEFINE_WAIT(wait);
2672        int err;
2673
2674        /* True wake-one mechanism for incoming connections: only
2675         * one process gets woken up, not the 'whole herd'.
2676         * Since we do not 'race & poll' for established sockets
2677         * anymore, the common case will execute the loop only once.
2678        */
2679        for (;;) {
2680                prepare_to_wait_exclusive(sk_sleep(sk), &wait,
2681                                          TASK_INTERRUPTIBLE);
2682                if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
2683                        release_sock(sk);
2684                        timeo = schedule_timeout(timeo);
2685                        lock_sock(sk);
2686                }
2687                err = 0;
2688                if (!skb_queue_empty(&sk->sk_receive_queue))
2689                        break;
2690                err = -EAGAIN;
2691                if (!timeo)
2692                        break;
2693                err = sock_intr_errno(timeo);
2694                if (signal_pending(current))
2695                        break;
2696        }
2697        finish_wait(sk_sleep(sk), &wait);
2698        return err;
2699}
2700
2701/**
2702 * tipc_accept - wait for connection request
2703 * @sock: listening socket
2704 * @new_sock: new socket that is to be connected
2705 * @flags: file-related flags associated with socket
2706 * @kern: caused by kernel or by userspace?
2707 *
2708 * Return: 0 on success, errno otherwise
2709 */
2710static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags,
2711                       bool kern)
2712{
2713        struct sock *new_sk, *sk = sock->sk;
2714        struct sk_buff *buf;
2715        struct tipc_sock *new_tsock;
2716        struct tipc_msg *msg;
2717        long timeo;
2718        int res;
2719
2720        lock_sock(sk);
2721
2722        if (sk->sk_state != TIPC_LISTEN) {
2723                res = -EINVAL;
2724                goto exit;
2725        }
2726        timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
2727        res = tipc_wait_for_accept(sock, timeo);
2728        if (res)
2729                goto exit;
2730
2731        buf = skb_peek(&sk->sk_receive_queue);
2732
2733        res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, kern);
2734        if (res)
2735                goto exit;
2736        security_sk_clone(sock->sk, new_sock->sk);
2737
2738        new_sk = new_sock->sk;
2739        new_tsock = tipc_sk(new_sk);
2740        msg = buf_msg(buf);
2741
2742        /* we lock on new_sk; but lockdep sees the lock on sk */
2743        lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
2744
2745        /*
2746         * Reject any stray messages received by new socket
2747         * before the socket lock was taken (very, very unlikely)
2748         */
2749        tsk_rej_rx_queue(new_sk, TIPC_ERR_NO_PORT);
2750
2751        /* Connect new socket to it's peer */
2752        tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2753
2754        tsk_set_importance(new_sk, msg_importance(msg));
2755        if (msg_named(msg)) {
2756                new_tsock->conn_type = msg_nametype(msg);
2757                new_tsock->conn_instance = msg_nameinst(msg);
2758        }
2759
2760        /*
2761         * Respond to 'SYN-' by discarding it & returning 'ACK'-.
2762         * Respond to 'SYN+' by queuing it on new socket.
2763         */
2764        if (!msg_data_sz(msg)) {
2765                struct msghdr m = {NULL,};
2766
2767                tsk_advance_rx_queue(sk);
2768                __tipc_sendstream(new_sock, &m, 0);
2769        } else {
2770                __skb_dequeue(&sk->sk_receive_queue);
2771                __skb_queue_head(&new_sk->sk_receive_queue, buf);
2772                skb_set_owner_r(buf, new_sk);
2773        }
2774        release_sock(new_sk);
2775exit:
2776        release_sock(sk);
2777        return res;
2778}
2779
2780/**
2781 * tipc_shutdown - shutdown socket connection
2782 * @sock: socket structure
2783 * @how: direction to close (must be SHUT_RDWR)
2784 *
2785 * Terminates connection (if necessary), then purges socket's receive queue.
2786 *
2787 * Return: 0 on success, errno otherwise
2788 */
2789static int tipc_shutdown(struct socket *sock, int how)
2790{
2791        struct sock *sk = sock->sk;
2792        int res;
2793
2794        if (how != SHUT_RDWR)
2795                return -EINVAL;
2796
2797        lock_sock(sk);
2798
2799        trace_tipc_sk_shutdown(sk, NULL, TIPC_DUMP_ALL, " ");
2800        __tipc_shutdown(sock, TIPC_CONN_SHUTDOWN);
2801        sk->sk_shutdown = SHUTDOWN_MASK;
2802
2803        if (sk->sk_state == TIPC_DISCONNECTING) {
2804                /* Discard any unreceived messages */
2805                __skb_queue_purge(&sk->sk_receive_queue);
2806
2807                res = 0;
2808        } else {
2809                res = -ENOTCONN;
2810        }
2811        /* Wake up anyone sleeping in poll. */
2812        sk->sk_state_change(sk);
2813
2814        release_sock(sk);
2815        return res;
2816}
2817
2818static void tipc_sk_check_probing_state(struct sock *sk,
2819                                        struct sk_buff_head *list)
2820{
2821        struct tipc_sock *tsk = tipc_sk(sk);
2822        u32 pnode = tsk_peer_node(tsk);
2823        u32 pport = tsk_peer_port(tsk);
2824        u32 self = tsk_own_node(tsk);
2825        u32 oport = tsk->portid;
2826        struct sk_buff *skb;
2827
2828        if (tsk->probe_unacked) {
2829                tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2830                sk->sk_err = ECONNABORTED;
2831                tipc_node_remove_conn(sock_net(sk), pnode, pport);
2832                sk->sk_state_change(sk);
2833                return;
2834        }
2835        /* Prepare new probe */
2836        skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, 0,
2837                              pnode, self, pport, oport, TIPC_OK);
2838        if (skb)
2839                __skb_queue_tail(list, skb);
2840        tsk->probe_unacked = true;
2841        sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
2842}
2843
2844static void tipc_sk_retry_connect(struct sock *sk, struct sk_buff_head *list)
2845{
2846        struct tipc_sock *tsk = tipc_sk(sk);
2847
2848        /* Try again later if dest link is congested */
2849        if (tsk->cong_link_cnt) {
2850                sk_reset_timer(sk, &sk->sk_timer, msecs_to_jiffies(100));
2851                return;
2852        }
2853        /* Prepare SYN for retransmit */
2854        tipc_msg_skb_clone(&sk->sk_write_queue, list);
2855}
2856
2857static void tipc_sk_timeout(struct timer_list *t)
2858{
2859        struct sock *sk = from_timer(sk, t, sk_timer);
2860        struct tipc_sock *tsk = tipc_sk(sk);
2861        u32 pnode = tsk_peer_node(tsk);
2862        struct sk_buff_head list;
2863        int rc = 0;
2864
2865        __skb_queue_head_init(&list);
2866        bh_lock_sock(sk);
2867
2868        /* Try again later if socket is busy */
2869        if (sock_owned_by_user(sk)) {
2870                sk_reset_timer(sk, &sk->sk_timer, jiffies + HZ / 20);
2871                bh_unlock_sock(sk);
2872                sock_put(sk);
2873                return;
2874        }
2875
2876        if (sk->sk_state == TIPC_ESTABLISHED)
2877                tipc_sk_check_probing_state(sk, &list);
2878        else if (sk->sk_state == TIPC_CONNECTING)
2879                tipc_sk_retry_connect(sk, &list);
2880
2881        bh_unlock_sock(sk);
2882
2883        if (!skb_queue_empty(&list))
2884                rc = tipc_node_xmit(sock_net(sk), &list, pnode, tsk->portid);
2885
2886        /* SYN messages may cause link congestion */
2887        if (rc == -ELINKCONG) {
2888                tipc_dest_push(&tsk->cong_links, pnode, 0);
2889                tsk->cong_link_cnt = 1;
2890        }
2891        sock_put(sk);
2892}
2893
2894static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2895                           struct tipc_service_range const *seq)
2896{
2897        struct sock *sk = &tsk->sk;
2898        struct net *net = sock_net(sk);
2899        struct publication *publ;
2900        u32 key;
2901
2902        if (scope != TIPC_NODE_SCOPE)
2903                scope = TIPC_CLUSTER_SCOPE;
2904
2905        if (tipc_sk_connected(sk))
2906                return -EINVAL;
2907        key = tsk->portid + tsk->pub_count + 1;
2908        if (key == tsk->portid)
2909                return -EADDRINUSE;
2910
2911        publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
2912                                    scope, tsk->portid, key);
2913        if (unlikely(!publ))
2914                return -EINVAL;
2915
2916        list_add(&publ->binding_sock, &tsk->publications);
2917        tsk->pub_count++;
2918        tsk->published = 1;
2919        return 0;
2920}
2921
2922static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2923                            struct tipc_service_range const *seq)
2924{
2925        struct net *net = sock_net(&tsk->sk);
2926        struct publication *publ;
2927        struct publication *safe;
2928        int rc = -EINVAL;
2929
2930        if (scope != TIPC_NODE_SCOPE)
2931                scope = TIPC_CLUSTER_SCOPE;
2932
2933        list_for_each_entry_safe(publ, safe, &tsk->publications, binding_sock) {
2934                if (seq) {
2935                        if (publ->scope != scope)
2936                                continue;
2937                        if (publ->type != seq->type)
2938                                continue;
2939                        if (publ->lower != seq->lower)
2940                                continue;
2941                        if (publ->upper != seq->upper)
2942                                break;
2943                        tipc_nametbl_withdraw(net, publ->type, publ->lower,
2944                                              publ->upper, publ->key);
2945                        rc = 0;
2946                        break;
2947                }
2948                tipc_nametbl_withdraw(net, publ->type, publ->lower,
2949                                      publ->upper, publ->key);
2950                rc = 0;
2951        }
2952        if (list_empty(&tsk->publications))
2953                tsk->published = 0;
2954        return rc;
2955}
2956
2957/* tipc_sk_reinit: set non-zero address in all existing sockets
2958 *                 when we go from standalone to network mode.
2959 */
2960void tipc_sk_reinit(struct net *net)
2961{
2962        struct tipc_net *tn = net_generic(net, tipc_net_id);
2963        struct rhashtable_iter iter;
2964        struct tipc_sock *tsk;
2965        struct tipc_msg *msg;
2966
2967        rhashtable_walk_enter(&tn->sk_rht, &iter);
2968
2969        do {
2970                rhashtable_walk_start(&iter);
2971
2972                while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) {
2973                        sock_hold(&tsk->sk);
2974                        rhashtable_walk_stop(&iter);
2975                        lock_sock(&tsk->sk);
2976                        msg = &tsk->phdr;
2977                        msg_set_prevnode(msg, tipc_own_addr(net));
2978                        msg_set_orignode(msg, tipc_own_addr(net));
2979                        release_sock(&tsk->sk);
2980                        rhashtable_walk_start(&iter);
2981                        sock_put(&tsk->sk);
2982                }
2983
2984                rhashtable_walk_stop(&iter);
2985        } while (tsk == ERR_PTR(-EAGAIN));
2986
2987        rhashtable_walk_exit(&iter);
2988}
2989
2990static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2991{
2992        struct tipc_net *tn = net_generic(net, tipc_net_id);
2993        struct tipc_sock *tsk;
2994
2995        rcu_read_lock();
2996        tsk = rhashtable_lookup(&tn->sk_rht, &portid, tsk_rht_params);
2997        if (tsk)
2998                sock_hold(&tsk->sk);
2999        rcu_read_unlock();
3000
3001        return tsk;
3002}
3003
3004static int tipc_sk_insert(struct tipc_sock *tsk)
3005{
3006        struct sock *sk = &tsk->sk;
3007        struct net *net = sock_net(sk);
3008        struct tipc_net *tn = net_generic(net, tipc_net_id);
3009        u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
3010        u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
3011
3012        while (remaining--) {
3013                portid++;
3014                if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
3015                        portid = TIPC_MIN_PORT;
3016                tsk->portid = portid;
3017                sock_hold(&tsk->sk);
3018                if (!rhashtable_lookup_insert_fast(&tn->sk_rht, &tsk->node,
3019                                                   tsk_rht_params))
3020                        return 0;
3021                sock_put(&tsk->sk);
3022        }
3023
3024        return -1;
3025}
3026
3027static void tipc_sk_remove(struct tipc_sock *tsk)
3028{
3029        struct sock *sk = &tsk->sk;
3030        struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
3031
3032        if (!rhashtable_remove_fast(&tn->sk_rht, &tsk->node, tsk_rht_params)) {
3033                WARN_ON(refcount_read(&sk->sk_refcnt) == 1);
3034                __sock_put(sk);
3035        }
3036}
3037
3038static const struct rhashtable_params tsk_rht_params = {
3039        .nelem_hint = 192,
3040        .head_offset = offsetof(struct tipc_sock, node),
3041        .key_offset = offsetof(struct tipc_sock, portid),
3042        .key_len = sizeof(u32), /* portid */
3043        .max_size = 1048576,
3044        .min_size = 256,
3045        .automatic_shrinking = true,
3046};
3047
3048int tipc_sk_rht_init(struct net *net)
3049{
3050        struct tipc_net *tn = net_generic(net, tipc_net_id);
3051
3052        return rhashtable_init(&tn->sk_rht, &tsk_rht_params);
3053}
3054
3055void tipc_sk_rht_destroy(struct net *net)
3056{
3057        struct tipc_net *tn = net_generic(net, tipc_net_id);
3058
3059        /* Wait for socket readers to complete */
3060        synchronize_net();
3061
3062        rhashtable_destroy(&tn->sk_rht);
3063}
3064
3065static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
3066{
3067        struct net *net = sock_net(&tsk->sk);
3068        struct tipc_group *grp = tsk->group;
3069        struct tipc_msg *hdr = &tsk->phdr;
3070        struct tipc_service_range seq;
3071        int rc;
3072
3073        if (mreq->type < TIPC_RESERVED_TYPES)
3074                return -EACCES;
3075        if (mreq->scope > TIPC_NODE_SCOPE)
3076                return -EINVAL;
3077        if (grp)
3078                return -EACCES;
3079        grp = tipc_group_create(net, tsk->portid, mreq, &tsk->group_is_open);
3080        if (!grp)
3081                return -ENOMEM;
3082        tsk->group = grp;
3083        msg_set_lookup_scope(hdr, mreq->scope);
3084        msg_set_nametype(hdr, mreq->type);
3085        msg_set_dest_droppable(hdr, true);
3086        seq.type = mreq->type;
3087        seq.lower = mreq->instance;
3088        seq.upper = seq.lower;
3089        tipc_nametbl_build_group(net, grp, mreq->type, mreq->scope);
3090        rc = tipc_sk_publish(tsk, mreq->scope, &seq);
3091        if (rc) {
3092                tipc_group_delete(net, grp);
3093                tsk->group = NULL;
3094                return rc;
3095        }
3096        /* Eliminate any risk that a broadcast overtakes sent JOINs */
3097        tsk->mc_method.rcast = true;
3098        tsk->mc_method.mandatory = true;
3099        tipc_group_join(net, grp, &tsk->sk.sk_rcvbuf);
3100        return rc;
3101}
3102
3103static int tipc_sk_leave(struct tipc_sock *tsk)
3104{
3105        struct net *net = sock_net(&tsk->sk);
3106        struct tipc_group *grp = tsk->group;
3107        struct tipc_service_range seq;
3108        int scope;
3109
3110        if (!grp)
3111                return -EINVAL;
3112        tipc_group_self(grp, &seq, &scope);
3113        tipc_group_delete(net, grp);
3114        tsk->group = NULL;
3115        tipc_sk_withdraw(tsk, scope, &seq);
3116        return 0;
3117}
3118
3119/**
3120 * tipc_setsockopt - set socket option
3121 * @sock: socket structure
3122 * @lvl: option level
3123 * @opt: option identifier
3124 * @ov: pointer to new option value
3125 * @ol: length of option value
3126 *
3127 * For stream sockets only, accepts and ignores all IPPROTO_TCP options
3128 * (to ease compatibility).
3129 *
3130 * Return: 0 on success, errno otherwise
3131 */
3132static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
3133                           sockptr_t ov, unsigned int ol)
3134{
3135        struct sock *sk = sock->sk;
3136        struct tipc_sock *tsk = tipc_sk(sk);
3137        struct tipc_group_req mreq;
3138        u32 value = 0;
3139        int res = 0;
3140
3141        if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
3142                return 0;
3143        if (lvl != SOL_TIPC)
3144                return -ENOPROTOOPT;
3145
3146        switch (opt) {
3147        case TIPC_IMPORTANCE:
3148        case TIPC_SRC_DROPPABLE:
3149        case TIPC_DEST_DROPPABLE:
3150        case TIPC_CONN_TIMEOUT:
3151        case TIPC_NODELAY:
3152                if (ol < sizeof(value))
3153                        return -EINVAL;
3154                if (copy_from_sockptr(&value, ov, sizeof(u32)))
3155                        return -EFAULT;
3156                break;
3157        case TIPC_GROUP_JOIN:
3158                if (ol < sizeof(mreq))
3159                        return -EINVAL;
3160                if (copy_from_sockptr(&mreq, ov, sizeof(mreq)))
3161                        return -EFAULT;
3162                break;
3163        default:
3164                if (!sockptr_is_null(ov) || ol)
3165                        return -EINVAL;
3166        }
3167
3168        lock_sock(sk);
3169
3170        switch (opt) {
3171        case TIPC_IMPORTANCE:
3172                res = tsk_set_importance(sk, value);
3173                break;
3174        case TIPC_SRC_DROPPABLE:
3175                if (sock->type != SOCK_STREAM)
3176                        tsk_set_unreliable(tsk, value);
3177                else
3178                        res = -ENOPROTOOPT;
3179                break;
3180        case TIPC_DEST_DROPPABLE:
3181                tsk_set_unreturnable(tsk, value);
3182                break;
3183        case TIPC_CONN_TIMEOUT:
3184                tipc_sk(sk)->conn_timeout = value;
3185                break;
3186        case TIPC_MCAST_BROADCAST:
3187                tsk->mc_method.rcast = false;
3188                tsk->mc_method.mandatory = true;
3189                break;
3190        case TIPC_MCAST_REPLICAST:
3191                tsk->mc_method.rcast = true;
3192                tsk->mc_method.mandatory = true;
3193                break;
3194        case TIPC_GROUP_JOIN:
3195                res = tipc_sk_join(tsk, &mreq);
3196                break;
3197        case TIPC_GROUP_LEAVE:
3198                res = tipc_sk_leave(tsk);
3199                break;
3200        case TIPC_NODELAY:
3201                tsk->nodelay = !!value;
3202                tsk_set_nagle(tsk);
3203                break;
3204        default:
3205                res = -EINVAL;
3206        }
3207
3208        release_sock(sk);
3209
3210        return res;
3211}
3212
3213/**
3214 * tipc_getsockopt - get socket option
3215 * @sock: socket structure
3216 * @lvl: option level
3217 * @opt: option identifier
3218 * @ov: receptacle for option value
3219 * @ol: receptacle for length of option value
3220 *
3221 * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
3222 * (to ease compatibility).
3223 *
3224 * Return: 0 on success, errno otherwise
3225 */
3226static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
3227                           char __user *ov, int __user *ol)
3228{
3229        struct sock *sk = sock->sk;
3230        struct tipc_sock *tsk = tipc_sk(sk);
3231        struct tipc_service_range seq;
3232        int len, scope;
3233        u32 value;
3234        int res;
3235
3236        if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
3237                return put_user(0, ol);
3238        if (lvl != SOL_TIPC)
3239                return -ENOPROTOOPT;
3240        res = get_user(len, ol);
3241        if (res)
3242                return res;
3243
3244        lock_sock(sk);
3245
3246        switch (opt) {
3247        case TIPC_IMPORTANCE:
3248                value = tsk_importance(tsk);
3249                break;
3250        case TIPC_SRC_DROPPABLE:
3251                value = tsk_unreliable(tsk);
3252                break;
3253        case TIPC_DEST_DROPPABLE:
3254                value = tsk_unreturnable(tsk);
3255                break;
3256        case TIPC_CONN_TIMEOUT:
3257                value = tsk->conn_timeout;
3258                /* no need to set "res", since already 0 at this point */
3259                break;
3260        case TIPC_NODE_RECVQ_DEPTH:
3261                value = 0; /* was tipc_queue_size, now obsolete */
3262                break;
3263        case TIPC_SOCK_RECVQ_DEPTH:
3264                value = skb_queue_len(&sk->sk_receive_queue);
3265                break;
3266        case TIPC_SOCK_RECVQ_USED:
3267                value = sk_rmem_alloc_get(sk);
3268                break;
3269        case TIPC_GROUP_JOIN:
3270                seq.type = 0;
3271                if (tsk->group)
3272                        tipc_group_self(tsk->group, &seq, &scope);
3273                value = seq.type;
3274                break;
3275        default:
3276                res = -EINVAL;
3277        }
3278
3279        release_sock(sk);
3280
3281        if (res)
3282                return res;     /* "get" failed */
3283
3284        if (len < sizeof(value))
3285                return -EINVAL;
3286
3287        if (copy_to_user(ov, &value, sizeof(value)))
3288                return -EFAULT;
3289
3290        return put_user(sizeof(value), ol);
3291}
3292
3293static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
3294{
3295        struct net *net = sock_net(sock->sk);
3296        struct tipc_sioc_nodeid_req nr = {0};
3297        struct tipc_sioc_ln_req lnr;
3298        void __user *argp = (void __user *)arg;
3299
3300        switch (cmd) {
3301        case SIOCGETLINKNAME:
3302                if (copy_from_user(&lnr, argp, sizeof(lnr)))
3303                        return -EFAULT;
3304                if (!tipc_node_get_linkname(net,
3305                                            lnr.bearer_id & 0xffff, lnr.peer,
3306                                            lnr.linkname, TIPC_MAX_LINK_NAME)) {
3307                        if (copy_to_user(argp, &lnr, sizeof(lnr)))
3308                                return -EFAULT;
3309                        return 0;
3310                }
3311                return -EADDRNOTAVAIL;
3312        case SIOCGETNODEID:
3313                if (copy_from_user(&nr, argp, sizeof(nr)))
3314                        return -EFAULT;
3315                if (!tipc_node_get_id(net, nr.peer, nr.node_id))
3316                        return -EADDRNOTAVAIL;
3317                if (copy_to_user(argp, &nr, sizeof(nr)))
3318                        return -EFAULT;
3319                return 0;
3320        default:
3321                return -ENOIOCTLCMD;
3322        }
3323}
3324
3325static int tipc_socketpair(struct socket *sock1, struct socket *sock2)
3326{
3327        struct tipc_sock *tsk2 = tipc_sk(sock2->sk);
3328        struct tipc_sock *tsk1 = tipc_sk(sock1->sk);
3329        u32 onode = tipc_own_addr(sock_net(sock1->sk));
3330
3331        tsk1->peer.family = AF_TIPC;
3332        tsk1->peer.addrtype = TIPC_SOCKET_ADDR;
3333        tsk1->peer.scope = TIPC_NODE_SCOPE;
3334        tsk1->peer.addr.id.ref = tsk2->portid;
3335        tsk1->peer.addr.id.node = onode;
3336        tsk2->peer.family = AF_TIPC;
3337        tsk2->peer.addrtype = TIPC_SOCKET_ADDR;
3338        tsk2->peer.scope = TIPC_NODE_SCOPE;
3339        tsk2->peer.addr.id.ref = tsk1->portid;
3340        tsk2->peer.addr.id.node = onode;
3341
3342        tipc_sk_finish_conn(tsk1, tsk2->portid, onode);
3343        tipc_sk_finish_conn(tsk2, tsk1->portid, onode);
3344        return 0;
3345}
3346
3347/* Protocol switches for the various types of TIPC sockets */
3348
3349static const struct proto_ops msg_ops = {
3350        .owner          = THIS_MODULE,
3351        .family         = AF_TIPC,
3352        .release        = tipc_release,
3353        .bind           = tipc_bind,
3354        .connect        = tipc_connect,
3355        .socketpair     = tipc_socketpair,
3356        .accept         = sock_no_accept,
3357        .getname        = tipc_getname,
3358        .poll           = tipc_poll,
3359        .ioctl          = tipc_ioctl,
3360        .listen         = sock_no_listen,
3361        .shutdown       = tipc_shutdown,
3362        .setsockopt     = tipc_setsockopt,
3363        .getsockopt     = tipc_getsockopt,
3364        .sendmsg        = tipc_sendmsg,
3365        .recvmsg        = tipc_recvmsg,
3366        .mmap           = sock_no_mmap,
3367        .sendpage       = sock_no_sendpage
3368};
3369
3370static const struct proto_ops packet_ops = {
3371        .owner          = THIS_MODULE,
3372        .family         = AF_TIPC,
3373        .release        = tipc_release,
3374        .bind           = tipc_bind,
3375        .connect        = tipc_connect,
3376        .socketpair     = tipc_socketpair,
3377        .accept         = tipc_accept,
3378        .getname        = tipc_getname,
3379        .poll           = tipc_poll,
3380        .ioctl          = tipc_ioctl,
3381        .listen         = tipc_listen,
3382        .shutdown       = tipc_shutdown,
3383        .setsockopt     = tipc_setsockopt,
3384        .getsockopt     = tipc_getsockopt,
3385        .sendmsg        = tipc_send_packet,
3386        .recvmsg        = tipc_recvmsg,
3387        .mmap           = sock_no_mmap,
3388        .sendpage       = sock_no_sendpage
3389};
3390
3391static const struct proto_ops stream_ops = {
3392        .owner          = THIS_MODULE,
3393        .family         = AF_TIPC,
3394        .release        = tipc_release,
3395        .bind           = tipc_bind,
3396        .connect        = tipc_connect,
3397        .socketpair     = tipc_socketpair,
3398        .accept         = tipc_accept,
3399        .getname        = tipc_getname,
3400        .poll           = tipc_poll,
3401        .ioctl          = tipc_ioctl,
3402        .listen         = tipc_listen,
3403        .shutdown       = tipc_shutdown,
3404        .setsockopt     = tipc_setsockopt,
3405        .getsockopt     = tipc_getsockopt,
3406        .sendmsg        = tipc_sendstream,
3407        .recvmsg        = tipc_recvstream,
3408        .mmap           = sock_no_mmap,
3409        .sendpage       = sock_no_sendpage
3410};
3411
3412static const struct net_proto_family tipc_family_ops = {
3413        .owner          = THIS_MODULE,
3414        .family         = AF_TIPC,
3415        .create         = tipc_sk_create
3416};
3417
3418static struct proto tipc_proto = {
3419        .name           = "TIPC",
3420        .owner          = THIS_MODULE,
3421        .obj_size       = sizeof(struct tipc_sock),
3422        .sysctl_rmem    = sysctl_tipc_rmem
3423};
3424
3425/**
3426 * tipc_socket_init - initialize TIPC socket interface
3427 *
3428 * Return: 0 on success, errno otherwise
3429 */
3430int tipc_socket_init(void)
3431{
3432        int res;
3433
3434        res = proto_register(&tipc_proto, 1);
3435        if (res) {
3436                pr_err("Failed to register TIPC protocol type\n");
3437                goto out;
3438        }
3439
3440        res = sock_register(&tipc_family_ops);
3441        if (res) {
3442                pr_err("Failed to register TIPC socket type\n");
3443                proto_unregister(&tipc_proto);
3444                goto out;
3445        }
3446 out:
3447        return res;
3448}
3449
3450/**
3451 * tipc_socket_stop - stop TIPC socket interface
3452 */
3453void tipc_socket_stop(void)
3454{
3455        sock_unregister(tipc_family_ops.family);
3456        proto_unregister(&tipc_proto);
3457}
3458
3459/* Caller should hold socket lock for the passed tipc socket. */
3460static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
3461{
3462        u32 peer_node;
3463        u32 peer_port;
3464        struct nlattr *nest;
3465
3466        peer_node = tsk_peer_node(tsk);
3467        peer_port = tsk_peer_port(tsk);
3468
3469        nest = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_CON);
3470        if (!nest)
3471                return -EMSGSIZE;
3472
3473        if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
3474                goto msg_full;
3475        if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
3476                goto msg_full;
3477
3478        if (tsk->conn_type != 0) {
3479                if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
3480                        goto msg_full;
3481                if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
3482                        goto msg_full;
3483                if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
3484                        goto msg_full;
3485        }
3486        nla_nest_end(skb, nest);
3487
3488        return 0;
3489
3490msg_full:
3491        nla_nest_cancel(skb, nest);
3492
3493        return -EMSGSIZE;
3494}
3495
3496static int __tipc_nl_add_sk_info(struct sk_buff *skb, struct tipc_sock
3497                          *tsk)
3498{
3499        struct net *net = sock_net(skb->sk);
3500        struct sock *sk = &tsk->sk;
3501
3502        if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid) ||
3503            nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr(net)))
3504                return -EMSGSIZE;
3505
3506        if (tipc_sk_connected(sk)) {
3507                if (__tipc_nl_add_sk_con(skb, tsk))
3508                        return -EMSGSIZE;
3509        } else if (!list_empty(&tsk->publications)) {
3510                if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
3511                        return -EMSGSIZE;
3512        }
3513        return 0;
3514}
3515
3516/* Caller should hold socket lock for the passed tipc socket. */
3517static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
3518                            struct tipc_sock *tsk)
3519{
3520        struct nlattr *attrs;
3521        void *hdr;
3522
3523        hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
3524                          &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
3525        if (!hdr)
3526                goto msg_cancel;
3527
3528        attrs = nla_nest_start_noflag(skb, TIPC_NLA_SOCK);
3529        if (!attrs)
3530                goto genlmsg_cancel;
3531
3532        if (__tipc_nl_add_sk_info(skb, tsk))
3533                goto attr_msg_cancel;
3534
3535        nla_nest_end(skb, attrs);
3536        genlmsg_end(skb, hdr);
3537
3538        return 0;
3539
3540attr_msg_cancel:
3541        nla_nest_cancel(skb, attrs);
3542genlmsg_cancel:
3543        genlmsg_cancel(skb, hdr);
3544msg_cancel:
3545        return -EMSGSIZE;
3546}
3547
3548int tipc_nl_sk_walk(struct sk_buff *skb, struct netlink_callback *cb,
3549                    int (*skb_handler)(struct sk_buff *skb,
3550                                       struct netlink_callback *cb,
3551                                       struct tipc_sock *tsk))
3552{
3553        struct rhashtable_iter *iter = (void *)cb->args[4];
3554        struct tipc_sock *tsk;
3555        int err;
3556
3557        rhashtable_walk_start(iter);
3558        while ((tsk = rhashtable_walk_next(iter)) != NULL) {
3559                if (IS_ERR(tsk)) {
3560                        err = PTR_ERR(tsk);
3561                        if (err == -EAGAIN) {
3562                                err = 0;
3563                                continue;
3564                        }
3565                        break;
3566                }
3567
3568                sock_hold(&tsk->sk);
3569                rhashtable_walk_stop(iter);
3570                lock_sock(&tsk->sk);
3571                err = skb_handler(skb, cb, tsk);
3572                if (err) {
3573                        release_sock(&tsk->sk);
3574                        sock_put(&tsk->sk);
3575                        goto out;
3576                }
3577                release_sock(&tsk->sk);
3578                rhashtable_walk_start(iter);
3579                sock_put(&tsk->sk);
3580        }
3581        rhashtable_walk_stop(iter);
3582out:
3583        return skb->len;
3584}
3585EXPORT_SYMBOL(tipc_nl_sk_walk);
3586
3587int tipc_dump_start(struct netlink_callback *cb)
3588{
3589        return __tipc_dump_start(cb, sock_net(cb->skb->sk));
3590}
3591EXPORT_SYMBOL(tipc_dump_start);
3592
3593int __tipc_dump_start(struct netlink_callback *cb, struct net *net)
3594{
3595        /* tipc_nl_name_table_dump() uses cb->args[0...3]. */
3596        struct rhashtable_iter *iter = (void *)cb->args[4];
3597        struct tipc_net *tn = tipc_net(net);
3598
3599        if (!iter) {
3600                iter = kmalloc(sizeof(*iter), GFP_KERNEL);
3601                if (!iter)
3602                        return -ENOMEM;
3603
3604                cb->args[4] = (long)iter;
3605        }
3606
3607        rhashtable_walk_enter(&tn->sk_rht, iter);
3608        return 0;
3609}
3610
3611int tipc_dump_done(struct netlink_callback *cb)
3612{
3613        struct rhashtable_iter *hti = (void *)cb->args[4];
3614
3615        rhashtable_walk_exit(hti);
3616        kfree(hti);
3617        return 0;
3618}
3619EXPORT_SYMBOL(tipc_dump_done);
3620
3621int tipc_sk_fill_sock_diag(struct sk_buff *skb, struct netlink_callback *cb,
3622                           struct tipc_sock *tsk, u32 sk_filter_state,
3623                           u64 (*tipc_diag_gen_cookie)(struct sock *sk))
3624{
3625        struct sock *sk = &tsk->sk;
3626        struct nlattr *attrs;
3627        struct nlattr *stat;
3628
3629        /*filter response w.r.t sk_state*/
3630        if (!(sk_filter_state & (1 << sk->sk_state)))
3631                return 0;
3632
3633        attrs = nla_nest_start_noflag(skb, TIPC_NLA_SOCK);
3634        if (!attrs)
3635                goto msg_cancel;
3636
3637        if (__tipc_nl_add_sk_info(skb, tsk))
3638                goto attr_msg_cancel;
3639
3640        if (nla_put_u32(skb, TIPC_NLA_SOCK_TYPE, (u32)sk->sk_type) ||
3641            nla_put_u32(skb, TIPC_NLA_SOCK_TIPC_STATE, (u32)sk->sk_state) ||
3642            nla_put_u32(skb, TIPC_NLA_SOCK_INO, sock_i_ino(sk)) ||
3643            nla_put_u32(skb, TIPC_NLA_SOCK_UID,
3644                        from_kuid_munged(sk_user_ns(NETLINK_CB(cb->skb).sk),
3645                                         sock_i_uid(sk))) ||
3646            nla_put_u64_64bit(skb, TIPC_NLA_SOCK_COOKIE,
3647                              tipc_diag_gen_cookie(sk),
3648                              TIPC_NLA_SOCK_PAD))
3649                goto attr_msg_cancel;
3650
3651        stat = nla_nest_start_noflag(skb, TIPC_NLA_SOCK_STAT);
3652        if (!stat)
3653                goto attr_msg_cancel;
3654
3655        if (nla_put_u32(skb, TIPC_NLA_SOCK_STAT_RCVQ,
3656                        skb_queue_len(&sk->sk_receive_queue)) ||
3657            nla_put_u32(skb, TIPC_NLA_SOCK_STAT_SENDQ,
3658                        skb_queue_len(&sk->sk_write_queue)) ||
3659            nla_put_u32(skb, TIPC_NLA_SOCK_STAT_DROP,
3660                        atomic_read(&sk->sk_drops)))
3661                goto stat_msg_cancel;
3662
3663        if (tsk->cong_link_cnt &&
3664            nla_put_flag(skb, TIPC_NLA_SOCK_STAT_LINK_CONG))
3665                goto stat_msg_cancel;
3666
3667        if (tsk_conn_cong(tsk) &&
3668            nla_put_flag(skb, TIPC_NLA_SOCK_STAT_CONN_CONG))
3669                goto stat_msg_cancel;
3670
3671        nla_nest_end(skb, stat);
3672
3673        if (tsk->group)
3674                if (tipc_group_fill_sock_diag(tsk->group, skb))
3675                        goto stat_msg_cancel;
3676
3677        nla_nest_end(skb, attrs);
3678
3679        return 0;
3680
3681stat_msg_cancel:
3682        nla_nest_cancel(skb, stat);
3683attr_msg_cancel:
3684        nla_nest_cancel(skb, attrs);
3685msg_cancel:
3686        return -EMSGSIZE;
3687}
3688EXPORT_SYMBOL(tipc_sk_fill_sock_diag);
3689
3690int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
3691{
3692        return tipc_nl_sk_walk(skb, cb, __tipc_nl_add_sk);
3693}
3694
3695/* Caller should hold socket lock for the passed tipc socket. */
3696static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
3697                                 struct netlink_callback *cb,
3698                                 struct publication *publ)
3699{
3700        void *hdr;
3701        struct nlattr *attrs;
3702
3703        hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
3704                          &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
3705        if (!hdr)
3706                goto msg_cancel;
3707
3708        attrs = nla_nest_start_noflag(skb, TIPC_NLA_PUBL);
3709        if (!attrs)
3710                goto genlmsg_cancel;
3711
3712        if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
3713                goto attr_msg_cancel;
3714        if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
3715                goto attr_msg_cancel;
3716        if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
3717                goto attr_msg_cancel;
3718        if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
3719                goto attr_msg_cancel;
3720
3721        nla_nest_end(skb, attrs);
3722        genlmsg_end(skb, hdr);
3723
3724        return 0;
3725
3726attr_msg_cancel:
3727        nla_nest_cancel(skb, attrs);
3728genlmsg_cancel:
3729        genlmsg_cancel(skb, hdr);
3730msg_cancel:
3731        return -EMSGSIZE;
3732}
3733
3734/* Caller should hold socket lock for the passed tipc socket. */
3735static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
3736                                  struct netlink_callback *cb,
3737                                  struct tipc_sock *tsk, u32 *last_publ)
3738{
3739        int err;
3740        struct publication *p;
3741
3742        if (*last_publ) {
3743                list_for_each_entry(p, &tsk->publications, binding_sock) {
3744                        if (p->key == *last_publ)
3745                                break;
3746                }
3747                if (p->key != *last_publ) {
3748                        /* We never set seq or call nl_dump_check_consistent()
3749                         * this means that setting prev_seq here will cause the
3750                         * consistence check to fail in the netlink callback
3751                         * handler. Resulting in the last NLMSG_DONE message
3752                         * having the NLM_F_DUMP_INTR flag set.
3753                         */
3754                        cb->prev_seq = 1;
3755                        *last_publ = 0;
3756                        return -EPIPE;
3757                }
3758        } else {
3759                p = list_first_entry(&tsk->publications, struct publication,
3760                                     binding_sock);
3761        }
3762
3763        list_for_each_entry_from(p, &tsk->publications, binding_sock) {
3764                err = __tipc_nl_add_sk_publ(skb, cb, p);
3765                if (err) {
3766                        *last_publ = p->key;
3767                        return err;
3768                }
3769        }
3770        *last_publ = 0;
3771
3772        return 0;
3773}
3774
3775int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
3776{
3777        int err;
3778        u32 tsk_portid = cb->args[0];
3779        u32 last_publ = cb->args[1];
3780        u32 done = cb->args[2];
3781        struct net *net = sock_net(skb->sk);
3782        struct tipc_sock *tsk;
3783
3784        if (!tsk_portid) {
3785                struct nlattr **attrs = genl_dumpit_info(cb)->attrs;
3786                struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
3787
3788                if (!attrs[TIPC_NLA_SOCK])
3789                        return -EINVAL;
3790
3791                err = nla_parse_nested_deprecated(sock, TIPC_NLA_SOCK_MAX,
3792                                                  attrs[TIPC_NLA_SOCK],
3793                                                  tipc_nl_sock_policy, NULL);
3794                if (err)
3795                        return err;
3796
3797                if (!sock[TIPC_NLA_SOCK_REF])
3798                        return -EINVAL;
3799
3800                tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
3801        }
3802
3803        if (done)
3804                return 0;
3805
3806        tsk = tipc_sk_lookup(net, tsk_portid);
3807        if (!tsk)
3808                return -EINVAL;
3809
3810        lock_sock(&tsk->sk);
3811        err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
3812        if (!err)
3813                done = 1;
3814        release_sock(&tsk->sk);
3815        sock_put(&tsk->sk);
3816
3817        cb->args[0] = tsk_portid;
3818        cb->args[1] = last_publ;
3819        cb->args[2] = done;
3820
3821        return skb->len;
3822}
3823
3824/**
3825 * tipc_sk_filtering - check if a socket should be traced
3826 * @sk: the socket to be examined
3827 *
3828 * @sysctl_tipc_sk_filter is used as the socket tuple for filtering:
3829 * (portid, sock type, name type, name lower, name upper)
3830 *
3831 * Return: true if the socket meets the socket tuple data
3832 * (value 0 = 'any') or when there is no tuple set (all = 0),
3833 * otherwise false
3834 */
3835bool tipc_sk_filtering(struct sock *sk)
3836{
3837        struct tipc_sock *tsk;
3838        struct publication *p;
3839        u32 _port, _sktype, _type, _lower, _upper;
3840        u32 type = 0, lower = 0, upper = 0;
3841
3842        if (!sk)
3843                return true;
3844
3845        tsk = tipc_sk(sk);
3846
3847        _port = sysctl_tipc_sk_filter[0];
3848        _sktype = sysctl_tipc_sk_filter[1];
3849        _type = sysctl_tipc_sk_filter[2];
3850        _lower = sysctl_tipc_sk_filter[3];
3851        _upper = sysctl_tipc_sk_filter[4];
3852
3853        if (!_port && !_sktype && !_type && !_lower && !_upper)
3854                return true;
3855
3856        if (_port)
3857                return (_port == tsk->portid);
3858
3859        if (_sktype && _sktype != sk->sk_type)
3860                return false;
3861
3862        if (tsk->published) {
3863                p = list_first_entry_or_null(&tsk->publications,
3864                                             struct publication, binding_sock);
3865                if (p) {
3866                        type = p->type;
3867                        lower = p->lower;
3868                        upper = p->upper;
3869                }
3870        }
3871
3872        if (!tipc_sk_type_connectionless(sk)) {
3873                type = tsk->conn_type;
3874                lower = tsk->conn_instance;
3875                upper = tsk->conn_instance;
3876        }
3877
3878        if ((_type && _type != type) || (_lower && _lower != lower) ||
3879            (_upper && _upper != upper))
3880                return false;
3881
3882        return true;
3883}
3884
3885u32 tipc_sock_get_portid(struct sock *sk)
3886{
3887        return (sk) ? (tipc_sk(sk))->portid : 0;
3888}
3889
3890/**
3891 * tipc_sk_overlimit1 - check if socket rx queue is about to be overloaded,
3892 *                      both the rcv and backlog queues are considered
3893 * @sk: tipc sk to be checked
3894 * @skb: tipc msg to be checked
3895 *
3896 * Return: true if the socket rx queue allocation is > 90%, otherwise false
3897 */
3898
3899bool tipc_sk_overlimit1(struct sock *sk, struct sk_buff *skb)
3900{
3901        atomic_t *dcnt = &tipc_sk(sk)->dupl_rcvcnt;
3902        unsigned int lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
3903        unsigned int qsize = sk->sk_backlog.len + sk_rmem_alloc_get(sk);
3904
3905        return (qsize > lim * 90 / 100);
3906}
3907
3908/**
3909 * tipc_sk_overlimit2 - check if socket rx queue is about to be overloaded,
3910 *                      only the rcv queue is considered
3911 * @sk: tipc sk to be checked
3912 * @skb: tipc msg to be checked
3913 *
3914 * Return: true if the socket rx queue allocation is > 90%, otherwise false
3915 */
3916
3917bool tipc_sk_overlimit2(struct sock *sk, struct sk_buff *skb)
3918{
3919        unsigned int lim = rcvbuf_limit(sk, skb);
3920        unsigned int qsize = sk_rmem_alloc_get(sk);
3921
3922        return (qsize > lim * 90 / 100);
3923}
3924
3925/**
3926 * tipc_sk_dump - dump TIPC socket
3927 * @sk: tipc sk to be dumped
3928 * @dqueues: bitmask to decide if any socket queue to be dumped?
3929 *           - TIPC_DUMP_NONE: don't dump socket queues
3930 *           - TIPC_DUMP_SK_SNDQ: dump socket send queue
3931 *           - TIPC_DUMP_SK_RCVQ: dump socket rcv queue
3932 *           - TIPC_DUMP_SK_BKLGQ: dump socket backlog queue
3933 *           - TIPC_DUMP_ALL: dump all the socket queues above
3934 * @buf: returned buffer of dump data in format
3935 */
3936int tipc_sk_dump(struct sock *sk, u16 dqueues, char *buf)
3937{
3938        int i = 0;
3939        size_t sz = (dqueues) ? SK_LMAX : SK_LMIN;
3940        struct tipc_sock *tsk;
3941        struct publication *p;
3942        bool tsk_connected;
3943
3944        if (!sk) {
3945                i += scnprintf(buf, sz, "sk data: (null)\n");
3946                return i;
3947        }
3948
3949        tsk = tipc_sk(sk);
3950        tsk_connected = !tipc_sk_type_connectionless(sk);
3951
3952        i += scnprintf(buf, sz, "sk data: %u", sk->sk_type);
3953        i += scnprintf(buf + i, sz - i, " %d", sk->sk_state);
3954        i += scnprintf(buf + i, sz - i, " %x", tsk_own_node(tsk));
3955        i += scnprintf(buf + i, sz - i, " %u", tsk->portid);
3956        i += scnprintf(buf + i, sz - i, " | %u", tsk_connected);
3957        if (tsk_connected) {
3958                i += scnprintf(buf + i, sz - i, " %x", tsk_peer_node(tsk));
3959                i += scnprintf(buf + i, sz - i, " %u", tsk_peer_port(tsk));
3960                i += scnprintf(buf + i, sz - i, " %u", tsk->conn_type);
3961                i += scnprintf(buf + i, sz - i, " %u", tsk->conn_instance);
3962        }
3963        i += scnprintf(buf + i, sz - i, " | %u", tsk->published);
3964        if (tsk->published) {
3965                p = list_first_entry_or_null(&tsk->publications,
3966                                             struct publication, binding_sock);
3967                i += scnprintf(buf + i, sz - i, " %u", (p) ? p->type : 0);
3968                i += scnprintf(buf + i, sz - i, " %u", (p) ? p->lower : 0);
3969                i += scnprintf(buf + i, sz - i, " %u", (p) ? p->upper : 0);
3970        }
3971        i += scnprintf(buf + i, sz - i, " | %u", tsk->snd_win);
3972        i += scnprintf(buf + i, sz - i, " %u", tsk->rcv_win);
3973        i += scnprintf(buf + i, sz - i, " %u", tsk->max_pkt);
3974        i += scnprintf(buf + i, sz - i, " %x", tsk->peer_caps);
3975        i += scnprintf(buf + i, sz - i, " %u", tsk->cong_link_cnt);
3976        i += scnprintf(buf + i, sz - i, " %u", tsk->snt_unacked);
3977        i += scnprintf(buf + i, sz - i, " %u", tsk->rcv_unacked);
3978        i += scnprintf(buf + i, sz - i, " %u", atomic_read(&tsk->dupl_rcvcnt));
3979        i += scnprintf(buf + i, sz - i, " %u", sk->sk_shutdown);
3980        i += scnprintf(buf + i, sz - i, " | %d", sk_wmem_alloc_get(sk));
3981        i += scnprintf(buf + i, sz - i, " %d", sk->sk_sndbuf);
3982        i += scnprintf(buf + i, sz - i, " | %d", sk_rmem_alloc_get(sk));
3983        i += scnprintf(buf + i, sz - i, " %d", sk->sk_rcvbuf);
3984        i += scnprintf(buf + i, sz - i, " | %d\n", READ_ONCE(sk->sk_backlog.len));
3985
3986        if (dqueues & TIPC_DUMP_SK_SNDQ) {
3987                i += scnprintf(buf + i, sz - i, "sk_write_queue: ");
3988                i += tipc_list_dump(&sk->sk_write_queue, false, buf + i);
3989        }
3990
3991        if (dqueues & TIPC_DUMP_SK_RCVQ) {
3992                i += scnprintf(buf + i, sz - i, "sk_receive_queue: ");
3993                i += tipc_list_dump(&sk->sk_receive_queue, false, buf + i);
3994        }
3995
3996        if (dqueues & TIPC_DUMP_SK_BKLGQ) {
3997                i += scnprintf(buf + i, sz - i, "sk_backlog:\n  head ");
3998                i += tipc_skb_dump(sk->sk_backlog.head, false, buf + i);
3999                if (sk->sk_backlog.tail != sk->sk_backlog.head) {
4000                        i += scnprintf(buf + i, sz - i, "  tail ");
4001                        i += tipc_skb_dump(sk->sk_backlog.tail, false,
4002                                           buf + i);
4003                }
4004        }
4005
4006        return i;
4007}
4008