linux/net/tipc/socket.c
<<
>>
Prefs
   1/*
   2 * net/tipc/socket.c: TIPC socket API
   3 *
   4 * Copyright (c) 2001-2007, 2012-2017, Ericsson AB
   5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include <linux/rhashtable.h>
  38#include <linux/sched/signal.h>
  39
  40#include "core.h"
  41#include "name_table.h"
  42#include "node.h"
  43#include "link.h"
  44#include "name_distr.h"
  45#include "socket.h"
  46#include "bcast.h"
  47#include "netlink.h"
  48#include "group.h"
  49
  50#define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
  51#define CONN_PROBING_INTV       msecs_to_jiffies(3600000)  /* [ms] => 1 h */
  52#define TIPC_FWD_MSG            1
  53#define TIPC_MAX_PORT           0xffffffff
  54#define TIPC_MIN_PORT           1
  55#define TIPC_ACK_RATE           4       /* ACK at 1/4 of of rcv window size */
  56
  57enum {
  58        TIPC_LISTEN = TCP_LISTEN,
  59        TIPC_ESTABLISHED = TCP_ESTABLISHED,
  60        TIPC_OPEN = TCP_CLOSE,
  61        TIPC_DISCONNECTING = TCP_CLOSE_WAIT,
  62        TIPC_CONNECTING = TCP_SYN_SENT,
  63};
  64
  65struct sockaddr_pair {
  66        struct sockaddr_tipc sock;
  67        struct sockaddr_tipc member;
  68};
  69
  70/**
  71 * struct tipc_sock - TIPC socket structure
  72 * @sk: socket - interacts with 'port' and with user via the socket API
  73 * @conn_type: TIPC type used when connection was established
  74 * @conn_instance: TIPC instance used when connection was established
  75 * @published: non-zero if port has one or more associated names
  76 * @max_pkt: maximum packet size "hint" used when building messages sent by port
  77 * @portid: unique port identity in TIPC socket hash table
  78 * @phdr: preformatted message header used when sending messages
  79 * #cong_links: list of congested links
  80 * @publications: list of publications for port
  81 * @blocking_link: address of the congested link we are currently sleeping on
  82 * @pub_count: total # of publications port has made during its lifetime
  83 * @probing_state:
  84 * @conn_timeout: the time we can wait for an unresponded setup request
  85 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
  86 * @cong_link_cnt: number of congested links
  87 * @snt_unacked: # messages sent by socket, and not yet acked by peer
  88 * @rcv_unacked: # messages read by user, but not yet acked back to peer
  89 * @peer: 'connected' peer for dgram/rdm
  90 * @node: hash table node
  91 * @mc_method: cookie for use between socket and broadcast layer
  92 * @rcu: rcu struct for tipc_sock
  93 */
  94struct tipc_sock {
  95        struct sock sk;
  96        u32 conn_type;
  97        u32 conn_instance;
  98        int published;
  99        u32 max_pkt;
 100        u32 portid;
 101        struct tipc_msg phdr;
 102        struct list_head cong_links;
 103        struct list_head publications;
 104        u32 pub_count;
 105        uint conn_timeout;
 106        atomic_t dupl_rcvcnt;
 107        bool probe_unacked;
 108        u16 cong_link_cnt;
 109        u16 snt_unacked;
 110        u16 snd_win;
 111        u16 peer_caps;
 112        u16 rcv_unacked;
 113        u16 rcv_win;
 114        struct sockaddr_tipc peer;
 115        struct rhash_head node;
 116        struct tipc_mc_method mc_method;
 117        struct rcu_head rcu;
 118        struct tipc_group *group;
 119        bool group_is_open;
 120};
 121
 122static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb);
 123static void tipc_data_ready(struct sock *sk);
 124static void tipc_write_space(struct sock *sk);
 125static void tipc_sock_destruct(struct sock *sk);
 126static int tipc_release(struct socket *sock);
 127static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags,
 128                       bool kern);
 129static void tipc_sk_timeout(struct timer_list *t);
 130static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
 131                           struct tipc_name_seq const *seq);
 132static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
 133                            struct tipc_name_seq const *seq);
 134static int tipc_sk_leave(struct tipc_sock *tsk);
 135static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
 136static int tipc_sk_insert(struct tipc_sock *tsk);
 137static void tipc_sk_remove(struct tipc_sock *tsk);
 138static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
 139static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
 140
 141static const struct proto_ops packet_ops;
 142static const struct proto_ops stream_ops;
 143static const struct proto_ops msg_ops;
 144static struct proto tipc_proto;
 145static const struct rhashtable_params tsk_rht_params;
 146
 147static u32 tsk_own_node(struct tipc_sock *tsk)
 148{
 149        return msg_prevnode(&tsk->phdr);
 150}
 151
 152static u32 tsk_peer_node(struct tipc_sock *tsk)
 153{
 154        return msg_destnode(&tsk->phdr);
 155}
 156
 157static u32 tsk_peer_port(struct tipc_sock *tsk)
 158{
 159        return msg_destport(&tsk->phdr);
 160}
 161
 162static  bool tsk_unreliable(struct tipc_sock *tsk)
 163{
 164        return msg_src_droppable(&tsk->phdr) != 0;
 165}
 166
 167static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
 168{
 169        msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
 170}
 171
 172static bool tsk_unreturnable(struct tipc_sock *tsk)
 173{
 174        return msg_dest_droppable(&tsk->phdr) != 0;
 175}
 176
 177static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
 178{
 179        msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
 180}
 181
 182static int tsk_importance(struct tipc_sock *tsk)
 183{
 184        return msg_importance(&tsk->phdr);
 185}
 186
 187static int tsk_set_importance(struct tipc_sock *tsk, int imp)
 188{
 189        if (imp > TIPC_CRITICAL_IMPORTANCE)
 190                return -EINVAL;
 191        msg_set_importance(&tsk->phdr, (u32)imp);
 192        return 0;
 193}
 194
 195static struct tipc_sock *tipc_sk(const struct sock *sk)
 196{
 197        return container_of(sk, struct tipc_sock, sk);
 198}
 199
 200static bool tsk_conn_cong(struct tipc_sock *tsk)
 201{
 202        return tsk->snt_unacked > tsk->snd_win;
 203}
 204
 205static u16 tsk_blocks(int len)
 206{
 207        return ((len / FLOWCTL_BLK_SZ) + 1);
 208}
 209
 210/* tsk_blocks(): translate a buffer size in bytes to number of
 211 * advertisable blocks, taking into account the ratio truesize(len)/len
 212 * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
 213 */
 214static u16 tsk_adv_blocks(int len)
 215{
 216        return len / FLOWCTL_BLK_SZ / 4;
 217}
 218
 219/* tsk_inc(): increment counter for sent or received data
 220 * - If block based flow control is not supported by peer we
 221 *   fall back to message based ditto, incrementing the counter
 222 */
 223static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
 224{
 225        if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
 226                return ((msglen / FLOWCTL_BLK_SZ) + 1);
 227        return 1;
 228}
 229
 230/**
 231 * tsk_advance_rx_queue - discard first buffer in socket receive queue
 232 *
 233 * Caller must hold socket lock
 234 */
 235static void tsk_advance_rx_queue(struct sock *sk)
 236{
 237        kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
 238}
 239
 240/* tipc_sk_respond() : send response message back to sender
 241 */
 242static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
 243{
 244        u32 selector;
 245        u32 dnode;
 246        u32 onode = tipc_own_addr(sock_net(sk));
 247
 248        if (!tipc_msg_reverse(onode, &skb, err))
 249                return;
 250
 251        dnode = msg_destnode(buf_msg(skb));
 252        selector = msg_origport(buf_msg(skb));
 253        tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
 254}
 255
 256/**
 257 * tsk_rej_rx_queue - reject all buffers in socket receive queue
 258 *
 259 * Caller must hold socket lock
 260 */
 261static void tsk_rej_rx_queue(struct sock *sk)
 262{
 263        struct sk_buff *skb;
 264
 265        while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
 266                tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
 267}
 268
 269static bool tipc_sk_connected(struct sock *sk)
 270{
 271        return sk->sk_state == TIPC_ESTABLISHED;
 272}
 273
 274/* tipc_sk_type_connectionless - check if the socket is datagram socket
 275 * @sk: socket
 276 *
 277 * Returns true if connection less, false otherwise
 278 */
 279static bool tipc_sk_type_connectionless(struct sock *sk)
 280{
 281        return sk->sk_type == SOCK_RDM || sk->sk_type == SOCK_DGRAM;
 282}
 283
 284/* tsk_peer_msg - verify if message was sent by connected port's peer
 285 *
 286 * Handles cases where the node's network address has changed from
 287 * the default of <0.0.0> to its configured setting.
 288 */
 289static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
 290{
 291        struct sock *sk = &tsk->sk;
 292        u32 self = tipc_own_addr(sock_net(sk));
 293        u32 peer_port = tsk_peer_port(tsk);
 294        u32 orig_node, peer_node;
 295
 296        if (unlikely(!tipc_sk_connected(sk)))
 297                return false;
 298
 299        if (unlikely(msg_origport(msg) != peer_port))
 300                return false;
 301
 302        orig_node = msg_orignode(msg);
 303        peer_node = tsk_peer_node(tsk);
 304
 305        if (likely(orig_node == peer_node))
 306                return true;
 307
 308        if (!orig_node && peer_node == self)
 309                return true;
 310
 311        if (!peer_node && orig_node == self)
 312                return true;
 313
 314        return false;
 315}
 316
 317/* tipc_set_sk_state - set the sk_state of the socket
 318 * @sk: socket
 319 *
 320 * Caller must hold socket lock
 321 *
 322 * Returns 0 on success, errno otherwise
 323 */
 324static int tipc_set_sk_state(struct sock *sk, int state)
 325{
 326        int oldsk_state = sk->sk_state;
 327        int res = -EINVAL;
 328
 329        switch (state) {
 330        case TIPC_OPEN:
 331                res = 0;
 332                break;
 333        case TIPC_LISTEN:
 334        case TIPC_CONNECTING:
 335                if (oldsk_state == TIPC_OPEN)
 336                        res = 0;
 337                break;
 338        case TIPC_ESTABLISHED:
 339                if (oldsk_state == TIPC_CONNECTING ||
 340                    oldsk_state == TIPC_OPEN)
 341                        res = 0;
 342                break;
 343        case TIPC_DISCONNECTING:
 344                if (oldsk_state == TIPC_CONNECTING ||
 345                    oldsk_state == TIPC_ESTABLISHED)
 346                        res = 0;
 347                break;
 348        }
 349
 350        if (!res)
 351                sk->sk_state = state;
 352
 353        return res;
 354}
 355
 356static int tipc_sk_sock_err(struct socket *sock, long *timeout)
 357{
 358        struct sock *sk = sock->sk;
 359        int err = sock_error(sk);
 360        int typ = sock->type;
 361
 362        if (err)
 363                return err;
 364        if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) {
 365                if (sk->sk_state == TIPC_DISCONNECTING)
 366                        return -EPIPE;
 367                else if (!tipc_sk_connected(sk))
 368                        return -ENOTCONN;
 369        }
 370        if (!*timeout)
 371                return -EAGAIN;
 372        if (signal_pending(current))
 373                return sock_intr_errno(*timeout);
 374
 375        return 0;
 376}
 377
 378#define tipc_wait_for_cond(sock_, timeo_, condition_)                          \
 379({                                                                             \
 380        struct sock *sk_;                                                      \
 381        int rc_;                                                               \
 382                                                                               \
 383        while ((rc_ = !(condition_))) {                                        \
 384                DEFINE_WAIT_FUNC(wait_, woken_wake_function);                  \
 385                sk_ = (sock_)->sk;                                             \
 386                rc_ = tipc_sk_sock_err((sock_), timeo_);                       \
 387                if (rc_)                                                       \
 388                        break;                                                 \
 389                prepare_to_wait(sk_sleep(sk_), &wait_, TASK_INTERRUPTIBLE);    \
 390                release_sock(sk_);                                             \
 391                *(timeo_) = wait_woken(&wait_, TASK_INTERRUPTIBLE, *(timeo_)); \
 392                sched_annotate_sleep();                                        \
 393                lock_sock(sk_);                                                \
 394                remove_wait_queue(sk_sleep(sk_), &wait_);                      \
 395        }                                                                      \
 396        rc_;                                                                   \
 397})
 398
 399/**
 400 * tipc_sk_create - create a TIPC socket
 401 * @net: network namespace (must be default network)
 402 * @sock: pre-allocated socket structure
 403 * @protocol: protocol indicator (must be 0)
 404 * @kern: caused by kernel or by userspace?
 405 *
 406 * This routine creates additional data structures used by the TIPC socket,
 407 * initializes them, and links them together.
 408 *
 409 * Returns 0 on success, errno otherwise
 410 */
 411static int tipc_sk_create(struct net *net, struct socket *sock,
 412                          int protocol, int kern)
 413{
 414        struct tipc_net *tn;
 415        const struct proto_ops *ops;
 416        struct sock *sk;
 417        struct tipc_sock *tsk;
 418        struct tipc_msg *msg;
 419
 420        /* Validate arguments */
 421        if (unlikely(protocol != 0))
 422                return -EPROTONOSUPPORT;
 423
 424        switch (sock->type) {
 425        case SOCK_STREAM:
 426                ops = &stream_ops;
 427                break;
 428        case SOCK_SEQPACKET:
 429                ops = &packet_ops;
 430                break;
 431        case SOCK_DGRAM:
 432        case SOCK_RDM:
 433                ops = &msg_ops;
 434                break;
 435        default:
 436                return -EPROTOTYPE;
 437        }
 438
 439        /* Allocate socket's protocol area */
 440        sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto, kern);
 441        if (sk == NULL)
 442                return -ENOMEM;
 443
 444        tsk = tipc_sk(sk);
 445        tsk->max_pkt = MAX_PKT_DEFAULT;
 446        INIT_LIST_HEAD(&tsk->publications);
 447        INIT_LIST_HEAD(&tsk->cong_links);
 448        msg = &tsk->phdr;
 449        tn = net_generic(sock_net(sk), tipc_net_id);
 450
 451        /* Finish initializing socket data structures */
 452        sock->ops = ops;
 453        sock_init_data(sock, sk);
 454        tipc_set_sk_state(sk, TIPC_OPEN);
 455        if (tipc_sk_insert(tsk)) {
 456                pr_warn("Socket create failed; port number exhausted\n");
 457                return -EINVAL;
 458        }
 459
 460        /* Ensure tsk is visible before we read own_addr. */
 461        smp_mb();
 462
 463        tipc_msg_init(tipc_own_addr(net), msg, TIPC_LOW_IMPORTANCE,
 464                      TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
 465
 466        msg_set_origport(msg, tsk->portid);
 467        timer_setup(&sk->sk_timer, tipc_sk_timeout, 0);
 468        sk->sk_shutdown = 0;
 469        sk->sk_backlog_rcv = tipc_sk_backlog_rcv;
 470        sk->sk_rcvbuf = sysctl_tipc_rmem[1];
 471        sk->sk_data_ready = tipc_data_ready;
 472        sk->sk_write_space = tipc_write_space;
 473        sk->sk_destruct = tipc_sock_destruct;
 474        tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
 475        tsk->group_is_open = true;
 476        atomic_set(&tsk->dupl_rcvcnt, 0);
 477
 478        /* Start out with safe limits until we receive an advertised window */
 479        tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
 480        tsk->rcv_win = tsk->snd_win;
 481
 482        if (tipc_sk_type_connectionless(sk)) {
 483                tsk_set_unreturnable(tsk, true);
 484                if (sock->type == SOCK_DGRAM)
 485                        tsk_set_unreliable(tsk, true);
 486        }
 487
 488        return 0;
 489}
 490
 491static void tipc_sk_callback(struct rcu_head *head)
 492{
 493        struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
 494
 495        sock_put(&tsk->sk);
 496}
 497
 498/* Caller should hold socket lock for the socket. */
 499static void __tipc_shutdown(struct socket *sock, int error)
 500{
 501        struct sock *sk = sock->sk;
 502        struct tipc_sock *tsk = tipc_sk(sk);
 503        struct net *net = sock_net(sk);
 504        long timeout = CONN_TIMEOUT_DEFAULT;
 505        u32 dnode = tsk_peer_node(tsk);
 506        struct sk_buff *skb;
 507
 508        /* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
 509        tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
 510                                            !tsk_conn_cong(tsk)));
 511
 512        /* Reject all unreceived messages, except on an active connection
 513         * (which disconnects locally & sends a 'FIN+' to peer).
 514         */
 515        while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) {
 516                if (TIPC_SKB_CB(skb)->bytes_read) {
 517                        kfree_skb(skb);
 518                        continue;
 519                }
 520                if (!tipc_sk_type_connectionless(sk) &&
 521                    sk->sk_state != TIPC_DISCONNECTING) {
 522                        tipc_set_sk_state(sk, TIPC_DISCONNECTING);
 523                        tipc_node_remove_conn(net, dnode, tsk->portid);
 524                }
 525                tipc_sk_respond(sk, skb, error);
 526        }
 527
 528        if (tipc_sk_type_connectionless(sk))
 529                return;
 530
 531        if (sk->sk_state != TIPC_DISCONNECTING) {
 532                skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
 533                                      TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
 534                                      tsk_own_node(tsk), tsk_peer_port(tsk),
 535                                      tsk->portid, error);
 536                if (skb)
 537                        tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
 538                tipc_node_remove_conn(net, dnode, tsk->portid);
 539                tipc_set_sk_state(sk, TIPC_DISCONNECTING);
 540        }
 541}
 542
 543/**
 544 * tipc_release - destroy a TIPC socket
 545 * @sock: socket to destroy
 546 *
 547 * This routine cleans up any messages that are still queued on the socket.
 548 * For DGRAM and RDM socket types, all queued messages are rejected.
 549 * For SEQPACKET and STREAM socket types, the first message is rejected
 550 * and any others are discarded.  (If the first message on a STREAM socket
 551 * is partially-read, it is discarded and the next one is rejected instead.)
 552 *
 553 * NOTE: Rejected messages are not necessarily returned to the sender!  They
 554 * are returned or discarded according to the "destination droppable" setting
 555 * specified for the message by the sender.
 556 *
 557 * Returns 0 on success, errno otherwise
 558 */
 559static int tipc_release(struct socket *sock)
 560{
 561        struct sock *sk = sock->sk;
 562        struct tipc_sock *tsk;
 563
 564        /*
 565         * Exit if socket isn't fully initialized (occurs when a failed accept()
 566         * releases a pre-allocated child socket that was never used)
 567         */
 568        if (sk == NULL)
 569                return 0;
 570
 571        tsk = tipc_sk(sk);
 572        lock_sock(sk);
 573
 574        __tipc_shutdown(sock, TIPC_ERR_NO_PORT);
 575        sk->sk_shutdown = SHUTDOWN_MASK;
 576        tipc_sk_leave(tsk);
 577        tipc_sk_withdraw(tsk, 0, NULL);
 578        sk_stop_timer(sk, &sk->sk_timer);
 579        tipc_sk_remove(tsk);
 580
 581        /* Reject any messages that accumulated in backlog queue */
 582        release_sock(sk);
 583        tipc_dest_list_purge(&tsk->cong_links);
 584        tsk->cong_link_cnt = 0;
 585        call_rcu(&tsk->rcu, tipc_sk_callback);
 586        sock->sk = NULL;
 587
 588        return 0;
 589}
 590
 591/**
 592 * tipc_bind - associate or disassocate TIPC name(s) with a socket
 593 * @sock: socket structure
 594 * @uaddr: socket address describing name(s) and desired operation
 595 * @uaddr_len: size of socket address data structure
 596 *
 597 * Name and name sequence binding is indicated using a positive scope value;
 598 * a negative scope value unbinds the specified name.  Specifying no name
 599 * (i.e. a socket address length of 0) unbinds all names from the socket.
 600 *
 601 * Returns 0 on success, errno otherwise
 602 *
 603 * NOTE: This routine doesn't need to take the socket lock since it doesn't
 604 *       access any non-constant socket information.
 605 */
 606static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
 607                     int uaddr_len)
 608{
 609        struct sock *sk = sock->sk;
 610        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
 611        struct tipc_sock *tsk = tipc_sk(sk);
 612        int res = -EINVAL;
 613
 614        lock_sock(sk);
 615        if (unlikely(!uaddr_len)) {
 616                res = tipc_sk_withdraw(tsk, 0, NULL);
 617                goto exit;
 618        }
 619        if (tsk->group) {
 620                res = -EACCES;
 621                goto exit;
 622        }
 623        if (uaddr_len < sizeof(struct sockaddr_tipc)) {
 624                res = -EINVAL;
 625                goto exit;
 626        }
 627        if (addr->family != AF_TIPC) {
 628                res = -EAFNOSUPPORT;
 629                goto exit;
 630        }
 631
 632        if (addr->addrtype == TIPC_ADDR_NAME)
 633                addr->addr.nameseq.upper = addr->addr.nameseq.lower;
 634        else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
 635                res = -EAFNOSUPPORT;
 636                goto exit;
 637        }
 638
 639        if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
 640            (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
 641            (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
 642                res = -EACCES;
 643                goto exit;
 644        }
 645
 646        res = (addr->scope >= 0) ?
 647                tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
 648                tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
 649exit:
 650        release_sock(sk);
 651        return res;
 652}
 653
 654/**
 655 * tipc_getname - get port ID of socket or peer socket
 656 * @sock: socket structure
 657 * @uaddr: area for returned socket address
 658 * @uaddr_len: area for returned length of socket address
 659 * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
 660 *
 661 * Returns 0 on success, errno otherwise
 662 *
 663 * NOTE: This routine doesn't need to take the socket lock since it only
 664 *       accesses socket information that is unchanging (or which changes in
 665 *       a completely predictable manner).
 666 */
 667static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
 668                        int peer)
 669{
 670        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
 671        struct sock *sk = sock->sk;
 672        struct tipc_sock *tsk = tipc_sk(sk);
 673
 674        memset(addr, 0, sizeof(*addr));
 675        if (peer) {
 676                if ((!tipc_sk_connected(sk)) &&
 677                    ((peer != 2) || (sk->sk_state != TIPC_DISCONNECTING)))
 678                        return -ENOTCONN;
 679                addr->addr.id.ref = tsk_peer_port(tsk);
 680                addr->addr.id.node = tsk_peer_node(tsk);
 681        } else {
 682                addr->addr.id.ref = tsk->portid;
 683                addr->addr.id.node = tipc_own_addr(sock_net(sk));
 684        }
 685
 686        addr->addrtype = TIPC_ADDR_ID;
 687        addr->family = AF_TIPC;
 688        addr->scope = 0;
 689        addr->addr.name.domain = 0;
 690
 691        return sizeof(*addr);
 692}
 693
 694/**
 695 * tipc_poll - read and possibly block on pollmask
 696 * @file: file structure associated with the socket
 697 * @sock: socket for which to calculate the poll bits
 698 * @wait: ???
 699 *
 700 * Returns pollmask value
 701 *
 702 * COMMENTARY:
 703 * It appears that the usual socket locking mechanisms are not useful here
 704 * since the pollmask info is potentially out-of-date the moment this routine
 705 * exits.  TCP and other protocols seem to rely on higher level poll routines
 706 * to handle any preventable race conditions, so TIPC will do the same ...
 707 *
 708 * IMPORTANT: The fact that a read or write operation is indicated does NOT
 709 * imply that the operation will succeed, merely that it should be performed
 710 * and will not block.
 711 */
 712static __poll_t tipc_poll(struct file *file, struct socket *sock,
 713                              poll_table *wait)
 714{
 715        struct sock *sk = sock->sk;
 716        struct tipc_sock *tsk = tipc_sk(sk);
 717        __poll_t revents = 0;
 718
 719        sock_poll_wait(file, sk_sleep(sk), wait);
 720
 721        if (sk->sk_shutdown & RCV_SHUTDOWN)
 722                revents |= EPOLLRDHUP | EPOLLIN | EPOLLRDNORM;
 723        if (sk->sk_shutdown == SHUTDOWN_MASK)
 724                revents |= EPOLLHUP;
 725
 726        switch (sk->sk_state) {
 727        case TIPC_ESTABLISHED:
 728        case TIPC_CONNECTING:
 729                if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
 730                        revents |= EPOLLOUT;
 731                /* fall thru' */
 732        case TIPC_LISTEN:
 733                if (!skb_queue_empty(&sk->sk_receive_queue))
 734                        revents |= EPOLLIN | EPOLLRDNORM;
 735                break;
 736        case TIPC_OPEN:
 737                if (tsk->group_is_open && !tsk->cong_link_cnt)
 738                        revents |= EPOLLOUT;
 739                if (!tipc_sk_type_connectionless(sk))
 740                        break;
 741                if (skb_queue_empty(&sk->sk_receive_queue))
 742                        break;
 743                revents |= EPOLLIN | EPOLLRDNORM;
 744                break;
 745        case TIPC_DISCONNECTING:
 746                revents = EPOLLIN | EPOLLRDNORM | EPOLLHUP;
 747                break;
 748        }
 749        return revents;
 750}
 751
 752/**
 753 * tipc_sendmcast - send multicast message
 754 * @sock: socket structure
 755 * @seq: destination address
 756 * @msg: message to send
 757 * @dlen: length of data to send
 758 * @timeout: timeout to wait for wakeup
 759 *
 760 * Called from function tipc_sendmsg(), which has done all sanity checks
 761 * Returns the number of bytes sent on success, or errno
 762 */
 763static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
 764                          struct msghdr *msg, size_t dlen, long timeout)
 765{
 766        struct sock *sk = sock->sk;
 767        struct tipc_sock *tsk = tipc_sk(sk);
 768        struct tipc_msg *hdr = &tsk->phdr;
 769        struct net *net = sock_net(sk);
 770        int mtu = tipc_bcast_get_mtu(net);
 771        struct tipc_mc_method *method = &tsk->mc_method;
 772        struct sk_buff_head pkts;
 773        struct tipc_nlist dsts;
 774        int rc;
 775
 776        if (tsk->group)
 777                return -EACCES;
 778
 779        /* Block or return if any destination link is congested */
 780        rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
 781        if (unlikely(rc))
 782                return rc;
 783
 784        /* Lookup destination nodes */
 785        tipc_nlist_init(&dsts, tipc_own_addr(net));
 786        tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower,
 787                                      seq->upper, &dsts);
 788        if (!dsts.local && !dsts.remote)
 789                return -EHOSTUNREACH;
 790
 791        /* Build message header */
 792        msg_set_type(hdr, TIPC_MCAST_MSG);
 793        msg_set_hdr_sz(hdr, MCAST_H_SIZE);
 794        msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
 795        msg_set_destport(hdr, 0);
 796        msg_set_destnode(hdr, 0);
 797        msg_set_nametype(hdr, seq->type);
 798        msg_set_namelower(hdr, seq->lower);
 799        msg_set_nameupper(hdr, seq->upper);
 800
 801        /* Build message as chain of buffers */
 802        skb_queue_head_init(&pkts);
 803        rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
 804
 805        /* Send message if build was successful */
 806        if (unlikely(rc == dlen))
 807                rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
 808                                     &tsk->cong_link_cnt);
 809
 810        tipc_nlist_purge(&dsts);
 811
 812        return rc ? rc : dlen;
 813}
 814
 815/**
 816 * tipc_send_group_msg - send a message to a member in the group
 817 * @net: network namespace
 818 * @m: message to send
 819 * @mb: group member
 820 * @dnode: destination node
 821 * @dport: destination port
 822 * @dlen: total length of message data
 823 */
 824static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
 825                               struct msghdr *m, struct tipc_member *mb,
 826                               u32 dnode, u32 dport, int dlen)
 827{
 828        u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group);
 829        struct tipc_mc_method *method = &tsk->mc_method;
 830        int blks = tsk_blocks(GROUP_H_SIZE + dlen);
 831        struct tipc_msg *hdr = &tsk->phdr;
 832        struct sk_buff_head pkts;
 833        int mtu, rc;
 834
 835        /* Complete message header */
 836        msg_set_type(hdr, TIPC_GRP_UCAST_MSG);
 837        msg_set_hdr_sz(hdr, GROUP_H_SIZE);
 838        msg_set_destport(hdr, dport);
 839        msg_set_destnode(hdr, dnode);
 840        msg_set_grp_bc_seqno(hdr, bc_snd_nxt);
 841
 842        /* Build message as chain of buffers */
 843        skb_queue_head_init(&pkts);
 844        mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
 845        rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
 846        if (unlikely(rc != dlen))
 847                return rc;
 848
 849        /* Send message */
 850        rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
 851        if (unlikely(rc == -ELINKCONG)) {
 852                tipc_dest_push(&tsk->cong_links, dnode, 0);
 853                tsk->cong_link_cnt++;
 854        }
 855
 856        /* Update send window */
 857        tipc_group_update_member(mb, blks);
 858
 859        /* A broadcast sent within next EXPIRE period must follow same path */
 860        method->rcast = true;
 861        method->mandatory = true;
 862        return dlen;
 863}
 864
 865/**
 866 * tipc_send_group_unicast - send message to a member in the group
 867 * @sock: socket structure
 868 * @m: message to send
 869 * @dlen: total length of message data
 870 * @timeout: timeout to wait for wakeup
 871 *
 872 * Called from function tipc_sendmsg(), which has done all sanity checks
 873 * Returns the number of bytes sent on success, or errno
 874 */
 875static int tipc_send_group_unicast(struct socket *sock, struct msghdr *m,
 876                                   int dlen, long timeout)
 877{
 878        struct sock *sk = sock->sk;
 879        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
 880        int blks = tsk_blocks(GROUP_H_SIZE + dlen);
 881        struct tipc_sock *tsk = tipc_sk(sk);
 882        struct tipc_group *grp = tsk->group;
 883        struct net *net = sock_net(sk);
 884        struct tipc_member *mb = NULL;
 885        u32 node, port;
 886        int rc;
 887
 888        node = dest->addr.id.node;
 889        port = dest->addr.id.ref;
 890        if (!port && !node)
 891                return -EHOSTUNREACH;
 892
 893        /* Block or return if destination link or member is congested */
 894        rc = tipc_wait_for_cond(sock, &timeout,
 895                                !tipc_dest_find(&tsk->cong_links, node, 0) &&
 896                                !tipc_group_cong(grp, node, port, blks, &mb));
 897        if (unlikely(rc))
 898                return rc;
 899
 900        if (unlikely(!mb))
 901                return -EHOSTUNREACH;
 902
 903        rc = tipc_send_group_msg(net, tsk, m, mb, node, port, dlen);
 904
 905        return rc ? rc : dlen;
 906}
 907
 908/**
 909 * tipc_send_group_anycast - send message to any member with given identity
 910 * @sock: socket structure
 911 * @m: message to send
 912 * @dlen: total length of message data
 913 * @timeout: timeout to wait for wakeup
 914 *
 915 * Called from function tipc_sendmsg(), which has done all sanity checks
 916 * Returns the number of bytes sent on success, or errno
 917 */
 918static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
 919                                   int dlen, long timeout)
 920{
 921        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
 922        struct sock *sk = sock->sk;
 923        struct tipc_sock *tsk = tipc_sk(sk);
 924        struct list_head *cong_links = &tsk->cong_links;
 925        int blks = tsk_blocks(GROUP_H_SIZE + dlen);
 926        struct tipc_group *grp = tsk->group;
 927        struct tipc_msg *hdr = &tsk->phdr;
 928        struct tipc_member *first = NULL;
 929        struct tipc_member *mbr = NULL;
 930        struct net *net = sock_net(sk);
 931        u32 node, port, exclude;
 932        struct list_head dsts;
 933        u32 type, inst, scope;
 934        int lookups = 0;
 935        int dstcnt, rc;
 936        bool cong;
 937
 938        INIT_LIST_HEAD(&dsts);
 939
 940        type = msg_nametype(hdr);
 941        inst = dest->addr.name.name.instance;
 942        scope = msg_lookup_scope(hdr);
 943        exclude = tipc_group_exclude(grp);
 944
 945        while (++lookups < 4) {
 946                first = NULL;
 947
 948                /* Look for a non-congested destination member, if any */
 949                while (1) {
 950                        if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts,
 951                                                 &dstcnt, exclude, false))
 952                                return -EHOSTUNREACH;
 953                        tipc_dest_pop(&dsts, &node, &port);
 954                        cong = tipc_group_cong(grp, node, port, blks, &mbr);
 955                        if (!cong)
 956                                break;
 957                        if (mbr == first)
 958                                break;
 959                        if (!first)
 960                                first = mbr;
 961                }
 962
 963                /* Start over if destination was not in member list */
 964                if (unlikely(!mbr))
 965                        continue;
 966
 967                if (likely(!cong && !tipc_dest_find(cong_links, node, 0)))
 968                        break;
 969
 970                /* Block or return if destination link or member is congested */
 971                rc = tipc_wait_for_cond(sock, &timeout,
 972                                        !tipc_dest_find(cong_links, node, 0) &&
 973                                        !tipc_group_cong(grp, node, port,
 974                                                         blks, &mbr));
 975                if (unlikely(rc))
 976                        return rc;
 977
 978                /* Send, unless destination disappeared while waiting */
 979                if (likely(mbr))
 980                        break;
 981        }
 982
 983        if (unlikely(lookups >= 4))
 984                return -EHOSTUNREACH;
 985
 986        rc = tipc_send_group_msg(net, tsk, m, mbr, node, port, dlen);
 987
 988        return rc ? rc : dlen;
 989}
 990
 991/**
 992 * tipc_send_group_bcast - send message to all members in communication group
 993 * @sk: socket structure
 994 * @m: message to send
 995 * @dlen: total length of message data
 996 * @timeout: timeout to wait for wakeup
 997 *
 998 * Called from function tipc_sendmsg(), which has done all sanity checks
 999 * Returns the number of bytes sent on success, or errno
1000 */
1001static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
1002                                 int dlen, long timeout)
1003{
1004        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1005        struct sock *sk = sock->sk;
1006        struct net *net = sock_net(sk);
1007        struct tipc_sock *tsk = tipc_sk(sk);
1008        struct tipc_group *grp = tsk->group;
1009        struct tipc_nlist *dsts = tipc_group_dests(grp);
1010        struct tipc_mc_method *method = &tsk->mc_method;
1011        bool ack = method->mandatory && method->rcast;
1012        int blks = tsk_blocks(MCAST_H_SIZE + dlen);
1013        struct tipc_msg *hdr = &tsk->phdr;
1014        int mtu = tipc_bcast_get_mtu(net);
1015        struct sk_buff_head pkts;
1016        int rc = -EHOSTUNREACH;
1017
1018        if (!dsts->local && !dsts->remote)
1019                return -EHOSTUNREACH;
1020
1021        /* Block or return if any destination link or member is congested */
1022        rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt &&
1023                                !tipc_group_bc_cong(grp, blks));
1024        if (unlikely(rc))
1025                return rc;
1026
1027        /* Complete message header */
1028        if (dest) {
1029                msg_set_type(hdr, TIPC_GRP_MCAST_MSG);
1030                msg_set_nameinst(hdr, dest->addr.name.name.instance);
1031        } else {
1032                msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
1033                msg_set_nameinst(hdr, 0);
1034        }
1035        msg_set_hdr_sz(hdr, GROUP_H_SIZE);
1036        msg_set_destport(hdr, 0);
1037        msg_set_destnode(hdr, 0);
1038        msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp));
1039
1040        /* Avoid getting stuck with repeated forced replicasts */
1041        msg_set_grp_bc_ack_req(hdr, ack);
1042
1043        /* Build message as chain of buffers */
1044        skb_queue_head_init(&pkts);
1045        rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
1046        if (unlikely(rc != dlen))
1047                return rc;
1048
1049        /* Send message */
1050        rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt);
1051        if (unlikely(rc))
1052                return rc;
1053
1054        /* Update broadcast sequence number and send windows */
1055        tipc_group_update_bc_members(tsk->group, blks, ack);
1056
1057        /* Broadcast link is now free to choose method for next broadcast */
1058        method->mandatory = false;
1059        method->expires = jiffies;
1060
1061        return dlen;
1062}
1063
1064/**
1065 * tipc_send_group_mcast - send message to all members with given identity
1066 * @sock: socket structure
1067 * @m: message to send
1068 * @dlen: total length of message data
1069 * @timeout: timeout to wait for wakeup
1070 *
1071 * Called from function tipc_sendmsg(), which has done all sanity checks
1072 * Returns the number of bytes sent on success, or errno
1073 */
1074static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
1075                                 int dlen, long timeout)
1076{
1077        struct sock *sk = sock->sk;
1078        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1079        struct tipc_sock *tsk = tipc_sk(sk);
1080        struct tipc_group *grp = tsk->group;
1081        struct tipc_msg *hdr = &tsk->phdr;
1082        struct net *net = sock_net(sk);
1083        u32 type, inst, scope, exclude;
1084        struct list_head dsts;
1085        u32 dstcnt;
1086
1087        INIT_LIST_HEAD(&dsts);
1088
1089        type = msg_nametype(hdr);
1090        inst = dest->addr.name.name.instance;
1091        scope = msg_lookup_scope(hdr);
1092        exclude = tipc_group_exclude(grp);
1093
1094        if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts,
1095                                 &dstcnt, exclude, true))
1096                return -EHOSTUNREACH;
1097
1098        if (dstcnt == 1) {
1099                tipc_dest_pop(&dsts, &dest->addr.id.node, &dest->addr.id.ref);
1100                return tipc_send_group_unicast(sock, m, dlen, timeout);
1101        }
1102
1103        tipc_dest_list_purge(&dsts);
1104        return tipc_send_group_bcast(sock, m, dlen, timeout);
1105}
1106
1107/**
1108 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
1109 * @arrvq: queue with arriving messages, to be cloned after destination lookup
1110 * @inputq: queue with cloned messages, delivered to socket after dest lookup
1111 *
1112 * Multi-threaded: parallel calls with reference to same queues may occur
1113 */
1114void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
1115                       struct sk_buff_head *inputq)
1116{
1117        u32 self = tipc_own_addr(net);
1118        u32 type, lower, upper, scope;
1119        struct sk_buff *skb, *_skb;
1120        u32 portid, oport, onode;
1121        struct sk_buff_head tmpq;
1122        struct list_head dports;
1123        struct tipc_msg *hdr;
1124        int user, mtyp, hlen;
1125        bool exact;
1126
1127        __skb_queue_head_init(&tmpq);
1128        INIT_LIST_HEAD(&dports);
1129
1130        skb = tipc_skb_peek(arrvq, &inputq->lock);
1131        for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
1132                hdr = buf_msg(skb);
1133                user = msg_user(hdr);
1134                mtyp = msg_type(hdr);
1135                hlen = skb_headroom(skb) + msg_hdr_sz(hdr);
1136                oport = msg_origport(hdr);
1137                onode = msg_orignode(hdr);
1138                type = msg_nametype(hdr);
1139
1140                if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
1141                        spin_lock_bh(&inputq->lock);
1142                        if (skb_peek(arrvq) == skb) {
1143                                __skb_dequeue(arrvq);
1144                                __skb_queue_tail(inputq, skb);
1145                        }
1146                        kfree_skb(skb);
1147                        spin_unlock_bh(&inputq->lock);
1148                        continue;
1149                }
1150
1151                /* Group messages require exact scope match */
1152                if (msg_in_group(hdr)) {
1153                        lower = 0;
1154                        upper = ~0;
1155                        scope = msg_lookup_scope(hdr);
1156                        exact = true;
1157                } else {
1158                        /* TIPC_NODE_SCOPE means "any scope" in this context */
1159                        if (onode == self)
1160                                scope = TIPC_NODE_SCOPE;
1161                        else
1162                                scope = TIPC_CLUSTER_SCOPE;
1163                        exact = false;
1164                        lower = msg_namelower(hdr);
1165                        upper = msg_nameupper(hdr);
1166                }
1167
1168                /* Create destination port list: */
1169                tipc_nametbl_mc_lookup(net, type, lower, upper,
1170                                       scope, exact, &dports);
1171
1172                /* Clone message per destination */
1173                while (tipc_dest_pop(&dports, NULL, &portid)) {
1174                        _skb = __pskb_copy(skb, hlen, GFP_ATOMIC);
1175                        if (_skb) {
1176                                msg_set_destport(buf_msg(_skb), portid);
1177                                __skb_queue_tail(&tmpq, _skb);
1178                                continue;
1179                        }
1180                        pr_warn("Failed to clone mcast rcv buffer\n");
1181                }
1182                /* Append to inputq if not already done by other thread */
1183                spin_lock_bh(&inputq->lock);
1184                if (skb_peek(arrvq) == skb) {
1185                        skb_queue_splice_tail_init(&tmpq, inputq);
1186                        kfree_skb(__skb_dequeue(arrvq));
1187                }
1188                spin_unlock_bh(&inputq->lock);
1189                __skb_queue_purge(&tmpq);
1190                kfree_skb(skb);
1191        }
1192        tipc_sk_rcv(net, inputq);
1193}
1194
1195/**
1196 * tipc_sk_conn_proto_rcv - receive a connection mng protocol message
1197 * @tsk: receiving socket
1198 * @skb: pointer to message buffer.
1199 */
1200static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
1201                                   struct sk_buff_head *xmitq)
1202{
1203        struct tipc_msg *hdr = buf_msg(skb);
1204        u32 onode = tsk_own_node(tsk);
1205        struct sock *sk = &tsk->sk;
1206        int mtyp = msg_type(hdr);
1207        bool conn_cong;
1208
1209        /* Ignore if connection cannot be validated: */
1210        if (!tsk_peer_msg(tsk, hdr))
1211                goto exit;
1212
1213        if (unlikely(msg_errcode(hdr))) {
1214                tipc_set_sk_state(sk, TIPC_DISCONNECTING);
1215                tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
1216                                      tsk_peer_port(tsk));
1217                sk->sk_state_change(sk);
1218                goto exit;
1219        }
1220
1221        tsk->probe_unacked = false;
1222
1223        if (mtyp == CONN_PROBE) {
1224                msg_set_type(hdr, CONN_PROBE_REPLY);
1225                if (tipc_msg_reverse(onode, &skb, TIPC_OK))
1226                        __skb_queue_tail(xmitq, skb);
1227                return;
1228        } else if (mtyp == CONN_ACK) {
1229                conn_cong = tsk_conn_cong(tsk);
1230                tsk->snt_unacked -= msg_conn_ack(hdr);
1231                if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
1232                        tsk->snd_win = msg_adv_win(hdr);
1233                if (conn_cong)
1234                        sk->sk_write_space(sk);
1235        } else if (mtyp != CONN_PROBE_REPLY) {
1236                pr_warn("Received unknown CONN_PROTO msg\n");
1237        }
1238exit:
1239        kfree_skb(skb);
1240}
1241
1242/**
1243 * tipc_sendmsg - send message in connectionless manner
1244 * @sock: socket structure
1245 * @m: message to send
1246 * @dsz: amount of user data to be sent
1247 *
1248 * Message must have an destination specified explicitly.
1249 * Used for SOCK_RDM and SOCK_DGRAM messages,
1250 * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
1251 * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
1252 *
1253 * Returns the number of bytes sent on success, or errno otherwise
1254 */
1255static int tipc_sendmsg(struct socket *sock,
1256                        struct msghdr *m, size_t dsz)
1257{
1258        struct sock *sk = sock->sk;
1259        int ret;
1260
1261        lock_sock(sk);
1262        ret = __tipc_sendmsg(sock, m, dsz);
1263        release_sock(sk);
1264
1265        return ret;
1266}
1267
1268static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
1269{
1270        struct sock *sk = sock->sk;
1271        struct net *net = sock_net(sk);
1272        struct tipc_sock *tsk = tipc_sk(sk);
1273        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1274        long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1275        struct list_head *clinks = &tsk->cong_links;
1276        bool syn = !tipc_sk_type_connectionless(sk);
1277        struct tipc_group *grp = tsk->group;
1278        struct tipc_msg *hdr = &tsk->phdr;
1279        struct tipc_name_seq *seq;
1280        struct sk_buff_head pkts;
1281        u32 dport, dnode = 0;
1282        u32 type, inst;
1283        int mtu, rc;
1284
1285        if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
1286                return -EMSGSIZE;
1287
1288        if (likely(dest)) {
1289                if (unlikely(m->msg_namelen < sizeof(*dest)))
1290                        return -EINVAL;
1291                if (unlikely(dest->family != AF_TIPC))
1292                        return -EINVAL;
1293        }
1294
1295        if (grp) {
1296                if (!dest)
1297                        return tipc_send_group_bcast(sock, m, dlen, timeout);
1298                if (dest->addrtype == TIPC_ADDR_NAME)
1299                        return tipc_send_group_anycast(sock, m, dlen, timeout);
1300                if (dest->addrtype == TIPC_ADDR_ID)
1301                        return tipc_send_group_unicast(sock, m, dlen, timeout);
1302                if (dest->addrtype == TIPC_ADDR_MCAST)
1303                        return tipc_send_group_mcast(sock, m, dlen, timeout);
1304                return -EINVAL;
1305        }
1306
1307        if (unlikely(!dest)) {
1308                dest = &tsk->peer;
1309                if (!syn || dest->family != AF_TIPC)
1310                        return -EDESTADDRREQ;
1311        }
1312
1313        if (unlikely(syn)) {
1314                if (sk->sk_state == TIPC_LISTEN)
1315                        return -EPIPE;
1316                if (sk->sk_state != TIPC_OPEN)
1317                        return -EISCONN;
1318                if (tsk->published)
1319                        return -EOPNOTSUPP;
1320                if (dest->addrtype == TIPC_ADDR_NAME) {
1321                        tsk->conn_type = dest->addr.name.name.type;
1322                        tsk->conn_instance = dest->addr.name.name.instance;
1323                }
1324        }
1325
1326        seq = &dest->addr.nameseq;
1327        if (dest->addrtype == TIPC_ADDR_MCAST)
1328                return tipc_sendmcast(sock, seq, m, dlen, timeout);
1329
1330        if (dest->addrtype == TIPC_ADDR_NAME) {
1331                type = dest->addr.name.name.type;
1332                inst = dest->addr.name.name.instance;
1333                dnode = dest->addr.name.domain;
1334                msg_set_type(hdr, TIPC_NAMED_MSG);
1335                msg_set_hdr_sz(hdr, NAMED_H_SIZE);
1336                msg_set_nametype(hdr, type);
1337                msg_set_nameinst(hdr, inst);
1338                msg_set_lookup_scope(hdr, tipc_node2scope(dnode));
1339                dport = tipc_nametbl_translate(net, type, inst, &dnode);
1340                msg_set_destnode(hdr, dnode);
1341                msg_set_destport(hdr, dport);
1342                if (unlikely(!dport && !dnode))
1343                        return -EHOSTUNREACH;
1344        } else if (dest->addrtype == TIPC_ADDR_ID) {
1345                dnode = dest->addr.id.node;
1346                msg_set_type(hdr, TIPC_DIRECT_MSG);
1347                msg_set_lookup_scope(hdr, 0);
1348                msg_set_destnode(hdr, dnode);
1349                msg_set_destport(hdr, dest->addr.id.ref);
1350                msg_set_hdr_sz(hdr, BASIC_H_SIZE);
1351        } else {
1352                return -EINVAL;
1353        }
1354
1355        /* Block or return if destination link is congested */
1356        rc = tipc_wait_for_cond(sock, &timeout,
1357                                !tipc_dest_find(clinks, dnode, 0));
1358        if (unlikely(rc))
1359                return rc;
1360
1361        skb_queue_head_init(&pkts);
1362        mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
1363        rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
1364        if (unlikely(rc != dlen))
1365                return rc;
1366
1367        rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
1368        if (unlikely(rc == -ELINKCONG)) {
1369                tipc_dest_push(clinks, dnode, 0);
1370                tsk->cong_link_cnt++;
1371                rc = 0;
1372        }
1373
1374        if (unlikely(syn && !rc))
1375                tipc_set_sk_state(sk, TIPC_CONNECTING);
1376
1377        return rc ? rc : dlen;
1378}
1379
1380/**
1381 * tipc_sendstream - send stream-oriented data
1382 * @sock: socket structure
1383 * @m: data to send
1384 * @dsz: total length of data to be transmitted
1385 *
1386 * Used for SOCK_STREAM data.
1387 *
1388 * Returns the number of bytes sent on success (or partial success),
1389 * or errno if no data sent
1390 */
1391static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
1392{
1393        struct sock *sk = sock->sk;
1394        int ret;
1395
1396        lock_sock(sk);
1397        ret = __tipc_sendstream(sock, m, dsz);
1398        release_sock(sk);
1399
1400        return ret;
1401}
1402
1403static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
1404{
1405        struct sock *sk = sock->sk;
1406        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1407        long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1408        struct tipc_sock *tsk = tipc_sk(sk);
1409        struct tipc_msg *hdr = &tsk->phdr;
1410        struct net *net = sock_net(sk);
1411        struct sk_buff_head pkts;
1412        u32 dnode = tsk_peer_node(tsk);
1413        int send, sent = 0;
1414        int rc = 0;
1415
1416        skb_queue_head_init(&pkts);
1417
1418        if (unlikely(dlen > INT_MAX))
1419                return -EMSGSIZE;
1420
1421        /* Handle implicit connection setup */
1422        if (unlikely(dest)) {
1423                rc = __tipc_sendmsg(sock, m, dlen);
1424                if (dlen && (dlen == rc))
1425                        tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
1426                return rc;
1427        }
1428
1429        do {
1430                rc = tipc_wait_for_cond(sock, &timeout,
1431                                        (!tsk->cong_link_cnt &&
1432                                         !tsk_conn_cong(tsk) &&
1433                                         tipc_sk_connected(sk)));
1434                if (unlikely(rc))
1435                        break;
1436
1437                send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
1438                rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
1439                if (unlikely(rc != send))
1440                        break;
1441
1442                rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
1443                if (unlikely(rc == -ELINKCONG)) {
1444                        tsk->cong_link_cnt = 1;
1445                        rc = 0;
1446                }
1447                if (likely(!rc)) {
1448                        tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
1449                        sent += send;
1450                }
1451        } while (sent < dlen && !rc);
1452
1453        return sent ? sent : rc;
1454}
1455
1456/**
1457 * tipc_send_packet - send a connection-oriented message
1458 * @sock: socket structure
1459 * @m: message to send
1460 * @dsz: length of data to be transmitted
1461 *
1462 * Used for SOCK_SEQPACKET messages.
1463 *
1464 * Returns the number of bytes sent on success, or errno otherwise
1465 */
1466static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
1467{
1468        if (dsz > TIPC_MAX_USER_MSG_SIZE)
1469                return -EMSGSIZE;
1470
1471        return tipc_sendstream(sock, m, dsz);
1472}
1473
1474/* tipc_sk_finish_conn - complete the setup of a connection
1475 */
1476static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1477                                u32 peer_node)
1478{
1479        struct sock *sk = &tsk->sk;
1480        struct net *net = sock_net(sk);
1481        struct tipc_msg *msg = &tsk->phdr;
1482
1483        msg_set_destnode(msg, peer_node);
1484        msg_set_destport(msg, peer_port);
1485        msg_set_type(msg, TIPC_CONN_MSG);
1486        msg_set_lookup_scope(msg, 0);
1487        msg_set_hdr_sz(msg, SHORT_H_SIZE);
1488
1489        sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
1490        tipc_set_sk_state(sk, TIPC_ESTABLISHED);
1491        tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1492        tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
1493        tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
1494        if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
1495                return;
1496
1497        /* Fall back to message based flow control */
1498        tsk->rcv_win = FLOWCTL_MSG_WIN;
1499        tsk->snd_win = FLOWCTL_MSG_WIN;
1500}
1501
1502/**
1503 * tipc_sk_set_orig_addr - capture sender's address for received message
1504 * @m: descriptor for message info
1505 * @hdr: received message header
1506 *
1507 * Note: Address is not captured if not requested by receiver.
1508 */
1509static void tipc_sk_set_orig_addr(struct msghdr *m, struct sk_buff *skb)
1510{
1511        DECLARE_SOCKADDR(struct sockaddr_pair *, srcaddr, m->msg_name);
1512        struct tipc_msg *hdr = buf_msg(skb);
1513
1514        if (!srcaddr)
1515                return;
1516
1517        srcaddr->sock.family = AF_TIPC;
1518        srcaddr->sock.addrtype = TIPC_ADDR_ID;
1519        srcaddr->sock.scope = 0;
1520        srcaddr->sock.addr.id.ref = msg_origport(hdr);
1521        srcaddr->sock.addr.id.node = msg_orignode(hdr);
1522        srcaddr->sock.addr.name.domain = 0;
1523        m->msg_namelen = sizeof(struct sockaddr_tipc);
1524
1525        if (!msg_in_group(hdr))
1526                return;
1527
1528        /* Group message users may also want to know sending member's id */
1529        srcaddr->member.family = AF_TIPC;
1530        srcaddr->member.addrtype = TIPC_ADDR_NAME;
1531        srcaddr->member.scope = 0;
1532        srcaddr->member.addr.name.name.type = msg_nametype(hdr);
1533        srcaddr->member.addr.name.name.instance = TIPC_SKB_CB(skb)->orig_member;
1534        srcaddr->member.addr.name.domain = 0;
1535        m->msg_namelen = sizeof(*srcaddr);
1536}
1537
1538/**
1539 * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1540 * @m: descriptor for message info
1541 * @msg: received message header
1542 * @tsk: TIPC port associated with message
1543 *
1544 * Note: Ancillary data is not captured if not requested by receiver.
1545 *
1546 * Returns 0 if successful, otherwise errno
1547 */
1548static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1549                                 struct tipc_sock *tsk)
1550{
1551        u32 anc_data[3];
1552        u32 err;
1553        u32 dest_type;
1554        int has_name;
1555        int res;
1556
1557        if (likely(m->msg_controllen == 0))
1558                return 0;
1559
1560        /* Optionally capture errored message object(s) */
1561        err = msg ? msg_errcode(msg) : 0;
1562        if (unlikely(err)) {
1563                anc_data[0] = err;
1564                anc_data[1] = msg_data_sz(msg);
1565                res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
1566                if (res)
1567                        return res;
1568                if (anc_data[1]) {
1569                        res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
1570                                       msg_data(msg));
1571                        if (res)
1572                                return res;
1573                }
1574        }
1575
1576        /* Optionally capture message destination object */
1577        dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
1578        switch (dest_type) {
1579        case TIPC_NAMED_MSG:
1580                has_name = 1;
1581                anc_data[0] = msg_nametype(msg);
1582                anc_data[1] = msg_namelower(msg);
1583                anc_data[2] = msg_namelower(msg);
1584                break;
1585        case TIPC_MCAST_MSG:
1586                has_name = 1;
1587                anc_data[0] = msg_nametype(msg);
1588                anc_data[1] = msg_namelower(msg);
1589                anc_data[2] = msg_nameupper(msg);
1590                break;
1591        case TIPC_CONN_MSG:
1592                has_name = (tsk->conn_type != 0);
1593                anc_data[0] = tsk->conn_type;
1594                anc_data[1] = tsk->conn_instance;
1595                anc_data[2] = tsk->conn_instance;
1596                break;
1597        default:
1598                has_name = 0;
1599        }
1600        if (has_name) {
1601                res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
1602                if (res)
1603                        return res;
1604        }
1605
1606        return 0;
1607}
1608
1609static void tipc_sk_send_ack(struct tipc_sock *tsk)
1610{
1611        struct sock *sk = &tsk->sk;
1612        struct net *net = sock_net(sk);
1613        struct sk_buff *skb = NULL;
1614        struct tipc_msg *msg;
1615        u32 peer_port = tsk_peer_port(tsk);
1616        u32 dnode = tsk_peer_node(tsk);
1617
1618        if (!tipc_sk_connected(sk))
1619                return;
1620        skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
1621                              dnode, tsk_own_node(tsk), peer_port,
1622                              tsk->portid, TIPC_OK);
1623        if (!skb)
1624                return;
1625        msg = buf_msg(skb);
1626        msg_set_conn_ack(msg, tsk->rcv_unacked);
1627        tsk->rcv_unacked = 0;
1628
1629        /* Adjust to and advertize the correct window limit */
1630        if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
1631                tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
1632                msg_set_adv_win(msg, tsk->rcv_win);
1633        }
1634        tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1635}
1636
1637static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1638{
1639        struct sock *sk = sock->sk;
1640        DEFINE_WAIT(wait);
1641        long timeo = *timeop;
1642        int err = sock_error(sk);
1643
1644        if (err)
1645                return err;
1646
1647        for (;;) {
1648                prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1649                if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1650                        if (sk->sk_shutdown & RCV_SHUTDOWN) {
1651                                err = -ENOTCONN;
1652                                break;
1653                        }
1654                        release_sock(sk);
1655                        timeo = schedule_timeout(timeo);
1656                        lock_sock(sk);
1657                }
1658                err = 0;
1659                if (!skb_queue_empty(&sk->sk_receive_queue))
1660                        break;
1661                err = -EAGAIN;
1662                if (!timeo)
1663                        break;
1664                err = sock_intr_errno(timeo);
1665                if (signal_pending(current))
1666                        break;
1667
1668                err = sock_error(sk);
1669                if (err)
1670                        break;
1671        }
1672        finish_wait(sk_sleep(sk), &wait);
1673        *timeop = timeo;
1674        return err;
1675}
1676
1677/**
1678 * tipc_recvmsg - receive packet-oriented message
1679 * @m: descriptor for message info
1680 * @buflen: length of user buffer area
1681 * @flags: receive flags
1682 *
1683 * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1684 * If the complete message doesn't fit in user area, truncate it.
1685 *
1686 * Returns size of returned message data, errno otherwise
1687 */
1688static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1689                        size_t buflen,  int flags)
1690{
1691        struct sock *sk = sock->sk;
1692        bool connected = !tipc_sk_type_connectionless(sk);
1693        struct tipc_sock *tsk = tipc_sk(sk);
1694        int rc, err, hlen, dlen, copy;
1695        struct sk_buff_head xmitq;
1696        struct tipc_msg *hdr;
1697        struct sk_buff *skb;
1698        bool grp_evt;
1699        long timeout;
1700
1701        /* Catch invalid receive requests */
1702        if (unlikely(!buflen))
1703                return -EINVAL;
1704
1705        lock_sock(sk);
1706        if (unlikely(connected && sk->sk_state == TIPC_OPEN)) {
1707                rc = -ENOTCONN;
1708                goto exit;
1709        }
1710        timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1711
1712        /* Step rcv queue to first msg with data or error; wait if necessary */
1713        do {
1714                rc = tipc_wait_for_rcvmsg(sock, &timeout);
1715                if (unlikely(rc))
1716                        goto exit;
1717                skb = skb_peek(&sk->sk_receive_queue);
1718                hdr = buf_msg(skb);
1719                dlen = msg_data_sz(hdr);
1720                hlen = msg_hdr_sz(hdr);
1721                err = msg_errcode(hdr);
1722                grp_evt = msg_is_grp_evt(hdr);
1723                if (likely(dlen || err))
1724                        break;
1725                tsk_advance_rx_queue(sk);
1726        } while (1);
1727
1728        /* Collect msg meta data, including error code and rejected data */
1729        tipc_sk_set_orig_addr(m, skb);
1730        rc = tipc_sk_anc_data_recv(m, hdr, tsk);
1731        if (unlikely(rc))
1732                goto exit;
1733
1734        /* Capture data if non-error msg, otherwise just set return value */
1735        if (likely(!err)) {
1736                copy = min_t(int, dlen, buflen);
1737                if (unlikely(copy != dlen))
1738                        m->msg_flags |= MSG_TRUNC;
1739                rc = skb_copy_datagram_msg(skb, hlen, m, copy);
1740        } else {
1741                copy = 0;
1742                rc = 0;
1743                if (err != TIPC_CONN_SHUTDOWN && connected && !m->msg_control)
1744                        rc = -ECONNRESET;
1745        }
1746        if (unlikely(rc))
1747                goto exit;
1748
1749        /* Mark message as group event if applicable */
1750        if (unlikely(grp_evt)) {
1751                if (msg_grp_evt(hdr) == TIPC_WITHDRAWN)
1752                        m->msg_flags |= MSG_EOR;
1753                m->msg_flags |= MSG_OOB;
1754                copy = 0;
1755        }
1756
1757        /* Caption of data or error code/rejected data was successful */
1758        if (unlikely(flags & MSG_PEEK))
1759                goto exit;
1760
1761        /* Send group flow control advertisement when applicable */
1762        if (tsk->group && msg_in_group(hdr) && !grp_evt) {
1763                skb_queue_head_init(&xmitq);
1764                tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
1765                                          msg_orignode(hdr), msg_origport(hdr),
1766                                          &xmitq);
1767                tipc_node_distr_xmit(sock_net(sk), &xmitq);
1768        }
1769
1770        tsk_advance_rx_queue(sk);
1771
1772        if (likely(!connected))
1773                goto exit;
1774
1775        /* Send connection flow control advertisement when applicable */
1776        tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
1777        if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
1778                tipc_sk_send_ack(tsk);
1779exit:
1780        release_sock(sk);
1781        return rc ? rc : copy;
1782}
1783
1784/**
1785 * tipc_recvstream - receive stream-oriented data
1786 * @m: descriptor for message info
1787 * @buflen: total size of user buffer area
1788 * @flags: receive flags
1789 *
1790 * Used for SOCK_STREAM messages only.  If not enough data is available
1791 * will optionally wait for more; never truncates data.
1792 *
1793 * Returns size of returned message data, errno otherwise
1794 */
1795static int tipc_recvstream(struct socket *sock, struct msghdr *m,
1796                           size_t buflen, int flags)
1797{
1798        struct sock *sk = sock->sk;
1799        struct tipc_sock *tsk = tipc_sk(sk);
1800        struct sk_buff *skb;
1801        struct tipc_msg *hdr;
1802        struct tipc_skb_cb *skb_cb;
1803        bool peek = flags & MSG_PEEK;
1804        int offset, required, copy, copied = 0;
1805        int hlen, dlen, err, rc;
1806        long timeout;
1807
1808        /* Catch invalid receive attempts */
1809        if (unlikely(!buflen))
1810                return -EINVAL;
1811
1812        lock_sock(sk);
1813
1814        if (unlikely(sk->sk_state == TIPC_OPEN)) {
1815                rc = -ENOTCONN;
1816                goto exit;
1817        }
1818        required = sock_rcvlowat(sk, flags & MSG_WAITALL, buflen);
1819        timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1820
1821        do {
1822                /* Look at first msg in receive queue; wait if necessary */
1823                rc = tipc_wait_for_rcvmsg(sock, &timeout);
1824                if (unlikely(rc))
1825                        break;
1826                skb = skb_peek(&sk->sk_receive_queue);
1827                skb_cb = TIPC_SKB_CB(skb);
1828                hdr = buf_msg(skb);
1829                dlen = msg_data_sz(hdr);
1830                hlen = msg_hdr_sz(hdr);
1831                err = msg_errcode(hdr);
1832
1833                /* Discard any empty non-errored (SYN-) message */
1834                if (unlikely(!dlen && !err)) {
1835                        tsk_advance_rx_queue(sk);
1836                        continue;
1837                }
1838
1839                /* Collect msg meta data, incl. error code and rejected data */
1840                if (!copied) {
1841                        tipc_sk_set_orig_addr(m, skb);
1842                        rc = tipc_sk_anc_data_recv(m, hdr, tsk);
1843                        if (rc)
1844                                break;
1845                }
1846
1847                /* Copy data if msg ok, otherwise return error/partial data */
1848                if (likely(!err)) {
1849                        offset = skb_cb->bytes_read;
1850                        copy = min_t(int, dlen - offset, buflen - copied);
1851                        rc = skb_copy_datagram_msg(skb, hlen + offset, m, copy);
1852                        if (unlikely(rc))
1853                                break;
1854                        copied += copy;
1855                        offset += copy;
1856                        if (unlikely(offset < dlen)) {
1857                                if (!peek)
1858                                        skb_cb->bytes_read = offset;
1859                                break;
1860                        }
1861                } else {
1862                        rc = 0;
1863                        if ((err != TIPC_CONN_SHUTDOWN) && !m->msg_control)
1864                                rc = -ECONNRESET;
1865                        if (copied || rc)
1866                                break;
1867                }
1868
1869                if (unlikely(peek))
1870                        break;
1871
1872                tsk_advance_rx_queue(sk);
1873
1874                /* Send connection flow control advertisement when applicable */
1875                tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
1876                if (unlikely(tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE))
1877                        tipc_sk_send_ack(tsk);
1878
1879                /* Exit if all requested data or FIN/error received */
1880                if (copied == buflen || err)
1881                        break;
1882
1883        } while (!skb_queue_empty(&sk->sk_receive_queue) || copied < required);
1884exit:
1885        release_sock(sk);
1886        return copied ? copied : rc;
1887}
1888
1889/**
1890 * tipc_write_space - wake up thread if port congestion is released
1891 * @sk: socket
1892 */
1893static void tipc_write_space(struct sock *sk)
1894{
1895        struct socket_wq *wq;
1896
1897        rcu_read_lock();
1898        wq = rcu_dereference(sk->sk_wq);
1899        if (skwq_has_sleeper(wq))
1900                wake_up_interruptible_sync_poll(&wq->wait, EPOLLOUT |
1901                                                EPOLLWRNORM | EPOLLWRBAND);
1902        rcu_read_unlock();
1903}
1904
1905/**
1906 * tipc_data_ready - wake up threads to indicate messages have been received
1907 * @sk: socket
1908 * @len: the length of messages
1909 */
1910static void tipc_data_ready(struct sock *sk)
1911{
1912        struct socket_wq *wq;
1913
1914        rcu_read_lock();
1915        wq = rcu_dereference(sk->sk_wq);
1916        if (skwq_has_sleeper(wq))
1917                wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN |
1918                                                EPOLLRDNORM | EPOLLRDBAND);
1919        rcu_read_unlock();
1920}
1921
1922static void tipc_sock_destruct(struct sock *sk)
1923{
1924        __skb_queue_purge(&sk->sk_receive_queue);
1925}
1926
1927static void tipc_sk_proto_rcv(struct sock *sk,
1928                              struct sk_buff_head *inputq,
1929                              struct sk_buff_head *xmitq)
1930{
1931        struct sk_buff *skb = __skb_dequeue(inputq);
1932        struct tipc_sock *tsk = tipc_sk(sk);
1933        struct tipc_msg *hdr = buf_msg(skb);
1934        struct tipc_group *grp = tsk->group;
1935        bool wakeup = false;
1936
1937        switch (msg_user(hdr)) {
1938        case CONN_MANAGER:
1939                tipc_sk_conn_proto_rcv(tsk, skb, xmitq);
1940                return;
1941        case SOCK_WAKEUP:
1942                tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);
1943                tsk->cong_link_cnt--;
1944                wakeup = true;
1945                break;
1946        case GROUP_PROTOCOL:
1947                tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
1948                break;
1949        case TOP_SRV:
1950                tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
1951                                      hdr, inputq, xmitq);
1952                break;
1953        default:
1954                break;
1955        }
1956
1957        if (wakeup)
1958                sk->sk_write_space(sk);
1959
1960        kfree_skb(skb);
1961}
1962
1963/**
1964 * tipc_filter_connect - Handle incoming message for a connection-based socket
1965 * @tsk: TIPC socket
1966 * @skb: pointer to message buffer. Set to NULL if buffer is consumed
1967 *
1968 * Returns true if everything ok, false otherwise
1969 */
1970static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
1971{
1972        struct sock *sk = &tsk->sk;
1973        struct net *net = sock_net(sk);
1974        struct tipc_msg *hdr = buf_msg(skb);
1975        u32 pport = msg_origport(hdr);
1976        u32 pnode = msg_orignode(hdr);
1977
1978        if (unlikely(msg_mcast(hdr)))
1979                return false;
1980
1981        switch (sk->sk_state) {
1982        case TIPC_CONNECTING:
1983                /* Accept only ACK or NACK message */
1984                if (unlikely(!msg_connected(hdr))) {
1985                        if (pport != tsk_peer_port(tsk) ||
1986                            pnode != tsk_peer_node(tsk))
1987                                return false;
1988
1989                        tipc_set_sk_state(sk, TIPC_DISCONNECTING);
1990                        sk->sk_err = ECONNREFUSED;
1991                        sk->sk_state_change(sk);
1992                        return true;
1993                }
1994
1995                if (unlikely(msg_errcode(hdr))) {
1996                        tipc_set_sk_state(sk, TIPC_DISCONNECTING);
1997                        sk->sk_err = ECONNREFUSED;
1998                        sk->sk_state_change(sk);
1999                        return true;
2000                }
2001
2002                if (unlikely(!msg_isdata(hdr))) {
2003                        tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2004                        sk->sk_err = EINVAL;
2005                        sk->sk_state_change(sk);
2006                        return true;
2007                }
2008
2009                tipc_sk_finish_conn(tsk, msg_origport(hdr), msg_orignode(hdr));
2010                msg_set_importance(&tsk->phdr, msg_importance(hdr));
2011
2012                /* If 'ACK+' message, add to socket receive queue */
2013                if (msg_data_sz(hdr))
2014                        return true;
2015
2016                /* If empty 'ACK-' message, wake up sleeping connect() */
2017                sk->sk_data_ready(sk);
2018
2019                /* 'ACK-' message is neither accepted nor rejected: */
2020                msg_set_dest_droppable(hdr, 1);
2021                return false;
2022
2023        case TIPC_OPEN:
2024        case TIPC_DISCONNECTING:
2025                break;
2026        case TIPC_LISTEN:
2027                /* Accept only SYN message */
2028                if (!msg_connected(hdr) && !(msg_errcode(hdr)))
2029                        return true;
2030                break;
2031        case TIPC_ESTABLISHED:
2032                /* Accept only connection-based messages sent by peer */
2033                if (unlikely(!tsk_peer_msg(tsk, hdr)))
2034                        return false;
2035
2036                if (unlikely(msg_errcode(hdr))) {
2037                        tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2038                        /* Let timer expire on it's own */
2039                        tipc_node_remove_conn(net, tsk_peer_node(tsk),
2040                                              tsk->portid);
2041                        sk->sk_state_change(sk);
2042                }
2043                return true;
2044        default:
2045                pr_err("Unknown sk_state %u\n", sk->sk_state);
2046        }
2047
2048        return false;
2049}
2050
2051/**
2052 * rcvbuf_limit - get proper overload limit of socket receive queue
2053 * @sk: socket
2054 * @skb: message
2055 *
2056 * For connection oriented messages, irrespective of importance,
2057 * default queue limit is 2 MB.
2058 *
2059 * For connectionless messages, queue limits are based on message
2060 * importance as follows:
2061 *
2062 * TIPC_LOW_IMPORTANCE       (2 MB)
2063 * TIPC_MEDIUM_IMPORTANCE    (4 MB)
2064 * TIPC_HIGH_IMPORTANCE      (8 MB)
2065 * TIPC_CRITICAL_IMPORTANCE  (16 MB)
2066 *
2067 * Returns overload limit according to corresponding message importance
2068 */
2069static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
2070{
2071        struct tipc_sock *tsk = tipc_sk(sk);
2072        struct tipc_msg *hdr = buf_msg(skb);
2073
2074        if (unlikely(msg_in_group(hdr)))
2075                return sk->sk_rcvbuf;
2076
2077        if (unlikely(!msg_connected(hdr)))
2078                return sk->sk_rcvbuf << msg_importance(hdr);
2079
2080        if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
2081                return sk->sk_rcvbuf;
2082
2083        return FLOWCTL_MSG_LIM;
2084}
2085
2086/**
2087 * tipc_sk_filter_rcv - validate incoming message
2088 * @sk: socket
2089 * @skb: pointer to message.
2090 *
2091 * Enqueues message on receive queue if acceptable; optionally handles
2092 * disconnect indication for a connected socket.
2093 *
2094 * Called with socket lock already taken
2095 *
2096 */
2097static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
2098                               struct sk_buff_head *xmitq)
2099{
2100        bool sk_conn = !tipc_sk_type_connectionless(sk);
2101        struct tipc_sock *tsk = tipc_sk(sk);
2102        struct tipc_group *grp = tsk->group;
2103        struct tipc_msg *hdr = buf_msg(skb);
2104        struct net *net = sock_net(sk);
2105        struct sk_buff_head inputq;
2106        int limit, err = TIPC_OK;
2107
2108        TIPC_SKB_CB(skb)->bytes_read = 0;
2109        __skb_queue_head_init(&inputq);
2110        __skb_queue_tail(&inputq, skb);
2111
2112        if (unlikely(!msg_isdata(hdr)))
2113                tipc_sk_proto_rcv(sk, &inputq, xmitq);
2114
2115        if (unlikely(grp))
2116                tipc_group_filter_msg(grp, &inputq, xmitq);
2117
2118        /* Validate and add to receive buffer if there is space */
2119        while ((skb = __skb_dequeue(&inputq))) {
2120                hdr = buf_msg(skb);
2121                limit = rcvbuf_limit(sk, skb);
2122                if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) ||
2123                    (!sk_conn && msg_connected(hdr)) ||
2124                    (!grp && msg_in_group(hdr)))
2125                        err = TIPC_ERR_NO_PORT;
2126                else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) {
2127                        atomic_inc(&sk->sk_drops);
2128                        err = TIPC_ERR_OVERLOAD;
2129                }
2130
2131                if (unlikely(err)) {
2132                        tipc_skb_reject(net, err, skb, xmitq);
2133                        err = TIPC_OK;
2134                        continue;
2135                }
2136                __skb_queue_tail(&sk->sk_receive_queue, skb);
2137                skb_set_owner_r(skb, sk);
2138                sk->sk_data_ready(sk);
2139        }
2140}
2141
2142/**
2143 * tipc_sk_backlog_rcv - handle incoming message from backlog queue
2144 * @sk: socket
2145 * @skb: message
2146 *
2147 * Caller must hold socket lock
2148 */
2149static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
2150{
2151        unsigned int before = sk_rmem_alloc_get(sk);
2152        struct sk_buff_head xmitq;
2153        unsigned int added;
2154
2155        __skb_queue_head_init(&xmitq);
2156
2157        tipc_sk_filter_rcv(sk, skb, &xmitq);
2158        added = sk_rmem_alloc_get(sk) - before;
2159        atomic_add(added, &tipc_sk(sk)->dupl_rcvcnt);
2160
2161        /* Send pending response/rejected messages, if any */
2162        tipc_node_distr_xmit(sock_net(sk), &xmitq);
2163        return 0;
2164}
2165
2166/**
2167 * tipc_sk_enqueue - extract all buffers with destination 'dport' from
2168 *                   inputq and try adding them to socket or backlog queue
2169 * @inputq: list of incoming buffers with potentially different destinations
2170 * @sk: socket where the buffers should be enqueued
2171 * @dport: port number for the socket
2172 *
2173 * Caller must hold socket lock
2174 */
2175static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
2176                            u32 dport, struct sk_buff_head *xmitq)
2177{
2178        unsigned long time_limit = jiffies + 2;
2179        struct sk_buff *skb;
2180        unsigned int lim;
2181        atomic_t *dcnt;
2182        u32 onode;
2183
2184        while (skb_queue_len(inputq)) {
2185                if (unlikely(time_after_eq(jiffies, time_limit)))
2186                        return;
2187
2188                skb = tipc_skb_dequeue(inputq, dport);
2189                if (unlikely(!skb))
2190                        return;
2191
2192                /* Add message directly to receive queue if possible */
2193                if (!sock_owned_by_user(sk)) {
2194                        tipc_sk_filter_rcv(sk, skb, xmitq);
2195                        continue;
2196                }
2197
2198                /* Try backlog, compensating for double-counted bytes */
2199                dcnt = &tipc_sk(sk)->dupl_rcvcnt;
2200                if (!sk->sk_backlog.len)
2201                        atomic_set(dcnt, 0);
2202                lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
2203                if (likely(!sk_add_backlog(sk, skb, lim)))
2204                        continue;
2205
2206                /* Overload => reject message back to sender */
2207                onode = tipc_own_addr(sock_net(sk));
2208                atomic_inc(&sk->sk_drops);
2209                if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
2210                        __skb_queue_tail(xmitq, skb);
2211                break;
2212        }
2213}
2214
2215/**
2216 * tipc_sk_rcv - handle a chain of incoming buffers
2217 * @inputq: buffer list containing the buffers
2218 * Consumes all buffers in list until inputq is empty
2219 * Note: may be called in multiple threads referring to the same queue
2220 */
2221void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
2222{
2223        struct sk_buff_head xmitq;
2224        u32 dnode, dport = 0;
2225        int err;
2226        struct tipc_sock *tsk;
2227        struct sock *sk;
2228        struct sk_buff *skb;
2229
2230        __skb_queue_head_init(&xmitq);
2231        while (skb_queue_len(inputq)) {
2232                dport = tipc_skb_peek_port(inputq, dport);
2233                tsk = tipc_sk_lookup(net, dport);
2234
2235                if (likely(tsk)) {
2236                        sk = &tsk->sk;
2237                        if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
2238                                tipc_sk_enqueue(inputq, sk, dport, &xmitq);
2239                                spin_unlock_bh(&sk->sk_lock.slock);
2240                        }
2241                        /* Send pending response/rejected messages, if any */
2242                        tipc_node_distr_xmit(sock_net(sk), &xmitq);
2243                        sock_put(sk);
2244                        continue;
2245                }
2246                /* No destination socket => dequeue skb if still there */
2247                skb = tipc_skb_dequeue(inputq, dport);
2248                if (!skb)
2249                        return;
2250
2251                /* Try secondary lookup if unresolved named message */
2252                err = TIPC_ERR_NO_PORT;
2253                if (tipc_msg_lookup_dest(net, skb, &err))
2254                        goto xmit;
2255
2256                /* Prepare for message rejection */
2257                if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
2258                        continue;
2259xmit:
2260                dnode = msg_destnode(buf_msg(skb));
2261                tipc_node_xmit_skb(net, skb, dnode, dport);
2262        }
2263}
2264
2265static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
2266{
2267        DEFINE_WAIT_FUNC(wait, woken_wake_function);
2268        struct sock *sk = sock->sk;
2269        int done;
2270
2271        do {
2272                int err = sock_error(sk);
2273                if (err)
2274                        return err;
2275                if (!*timeo_p)
2276                        return -ETIMEDOUT;
2277                if (signal_pending(current))
2278                        return sock_intr_errno(*timeo_p);
2279
2280                add_wait_queue(sk_sleep(sk), &wait);
2281                done = sk_wait_event(sk, timeo_p,
2282                                     sk->sk_state != TIPC_CONNECTING, &wait);
2283                remove_wait_queue(sk_sleep(sk), &wait);
2284        } while (!done);
2285        return 0;
2286}
2287
2288/**
2289 * tipc_connect - establish a connection to another TIPC port
2290 * @sock: socket structure
2291 * @dest: socket address for destination port
2292 * @destlen: size of socket address data structure
2293 * @flags: file-related flags associated with socket
2294 *
2295 * Returns 0 on success, errno otherwise
2296 */
2297static int tipc_connect(struct socket *sock, struct sockaddr *dest,
2298                        int destlen, int flags)
2299{
2300        struct sock *sk = sock->sk;
2301        struct tipc_sock *tsk = tipc_sk(sk);
2302        struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
2303        struct msghdr m = {NULL,};
2304        long timeout = (flags & O_NONBLOCK) ? 0 : tsk->conn_timeout;
2305        int previous;
2306        int res = 0;
2307
2308        if (destlen != sizeof(struct sockaddr_tipc))
2309                return -EINVAL;
2310
2311        lock_sock(sk);
2312
2313        if (tsk->group) {
2314                res = -EINVAL;
2315                goto exit;
2316        }
2317
2318        if (dst->family == AF_UNSPEC) {
2319                memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
2320                if (!tipc_sk_type_connectionless(sk))
2321                        res = -EINVAL;
2322                goto exit;
2323        } else if (dst->family != AF_TIPC) {
2324                res = -EINVAL;
2325        }
2326        if (dst->addrtype != TIPC_ADDR_ID && dst->addrtype != TIPC_ADDR_NAME)
2327                res = -EINVAL;
2328        if (res)
2329                goto exit;
2330
2331        /* DGRAM/RDM connect(), just save the destaddr */
2332        if (tipc_sk_type_connectionless(sk)) {
2333                memcpy(&tsk->peer, dest, destlen);
2334                goto exit;
2335        }
2336
2337        previous = sk->sk_state;
2338
2339        switch (sk->sk_state) {
2340        case TIPC_OPEN:
2341                /* Send a 'SYN-' to destination */
2342                m.msg_name = dest;
2343                m.msg_namelen = destlen;
2344
2345                /* If connect is in non-blocking case, set MSG_DONTWAIT to
2346                 * indicate send_msg() is never blocked.
2347                 */
2348                if (!timeout)
2349                        m.msg_flags = MSG_DONTWAIT;
2350
2351                res = __tipc_sendmsg(sock, &m, 0);
2352                if ((res < 0) && (res != -EWOULDBLOCK))
2353                        goto exit;
2354
2355                /* Just entered TIPC_CONNECTING state; the only
2356                 * difference is that return value in non-blocking
2357                 * case is EINPROGRESS, rather than EALREADY.
2358                 */
2359                res = -EINPROGRESS;
2360                /* fall thru' */
2361        case TIPC_CONNECTING:
2362                if (!timeout) {
2363                        if (previous == TIPC_CONNECTING)
2364                                res = -EALREADY;
2365                        goto exit;
2366                }
2367                timeout = msecs_to_jiffies(timeout);
2368                /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
2369                res = tipc_wait_for_connect(sock, &timeout);
2370                break;
2371        case TIPC_ESTABLISHED:
2372                res = -EISCONN;
2373                break;
2374        default:
2375                res = -EINVAL;
2376        }
2377
2378exit:
2379        release_sock(sk);
2380        return res;
2381}
2382
2383/**
2384 * tipc_listen - allow socket to listen for incoming connections
2385 * @sock: socket structure
2386 * @len: (unused)
2387 *
2388 * Returns 0 on success, errno otherwise
2389 */
2390static int tipc_listen(struct socket *sock, int len)
2391{
2392        struct sock *sk = sock->sk;
2393        int res;
2394
2395        lock_sock(sk);
2396        res = tipc_set_sk_state(sk, TIPC_LISTEN);
2397        release_sock(sk);
2398
2399        return res;
2400}
2401
2402static int tipc_wait_for_accept(struct socket *sock, long timeo)
2403{
2404        struct sock *sk = sock->sk;
2405        DEFINE_WAIT(wait);
2406        int err;
2407
2408        /* True wake-one mechanism for incoming connections: only
2409         * one process gets woken up, not the 'whole herd'.
2410         * Since we do not 'race & poll' for established sockets
2411         * anymore, the common case will execute the loop only once.
2412        */
2413        for (;;) {
2414                prepare_to_wait_exclusive(sk_sleep(sk), &wait,
2415                                          TASK_INTERRUPTIBLE);
2416                if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
2417                        release_sock(sk);
2418                        timeo = schedule_timeout(timeo);
2419                        lock_sock(sk);
2420                }
2421                err = 0;
2422                if (!skb_queue_empty(&sk->sk_receive_queue))
2423                        break;
2424                err = -EAGAIN;
2425                if (!timeo)
2426                        break;
2427                err = sock_intr_errno(timeo);
2428                if (signal_pending(current))
2429                        break;
2430        }
2431        finish_wait(sk_sleep(sk), &wait);
2432        return err;
2433}
2434
2435/**
2436 * tipc_accept - wait for connection request
2437 * @sock: listening socket
2438 * @newsock: new socket that is to be connected
2439 * @flags: file-related flags associated with socket
2440 *
2441 * Returns 0 on success, errno otherwise
2442 */
2443static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags,
2444                       bool kern)
2445{
2446        struct sock *new_sk, *sk = sock->sk;
2447        struct sk_buff *buf;
2448        struct tipc_sock *new_tsock;
2449        struct tipc_msg *msg;
2450        long timeo;
2451        int res;
2452
2453        lock_sock(sk);
2454
2455        if (sk->sk_state != TIPC_LISTEN) {
2456                res = -EINVAL;
2457                goto exit;
2458        }
2459        timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
2460        res = tipc_wait_for_accept(sock, timeo);
2461        if (res)
2462                goto exit;
2463
2464        buf = skb_peek(&sk->sk_receive_queue);
2465
2466        res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, kern);
2467        if (res)
2468                goto exit;
2469        security_sk_clone(sock->sk, new_sock->sk);
2470
2471        new_sk = new_sock->sk;
2472        new_tsock = tipc_sk(new_sk);
2473        msg = buf_msg(buf);
2474
2475        /* we lock on new_sk; but lockdep sees the lock on sk */
2476        lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
2477
2478        /*
2479         * Reject any stray messages received by new socket
2480         * before the socket lock was taken (very, very unlikely)
2481         */
2482        tsk_rej_rx_queue(new_sk);
2483
2484        /* Connect new socket to it's peer */
2485        tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2486
2487        tsk_set_importance(new_tsock, msg_importance(msg));
2488        if (msg_named(msg)) {
2489                new_tsock->conn_type = msg_nametype(msg);
2490                new_tsock->conn_instance = msg_nameinst(msg);
2491        }
2492
2493        /*
2494         * Respond to 'SYN-' by discarding it & returning 'ACK'-.
2495         * Respond to 'SYN+' by queuing it on new socket.
2496         */
2497        if (!msg_data_sz(msg)) {
2498                struct msghdr m = {NULL,};
2499
2500                tsk_advance_rx_queue(sk);
2501                __tipc_sendstream(new_sock, &m, 0);
2502        } else {
2503                __skb_dequeue(&sk->sk_receive_queue);
2504                __skb_queue_head(&new_sk->sk_receive_queue, buf);
2505                skb_set_owner_r(buf, new_sk);
2506        }
2507        release_sock(new_sk);
2508exit:
2509        release_sock(sk);
2510        return res;
2511}
2512
2513/**
2514 * tipc_shutdown - shutdown socket connection
2515 * @sock: socket structure
2516 * @how: direction to close (must be SHUT_RDWR)
2517 *
2518 * Terminates connection (if necessary), then purges socket's receive queue.
2519 *
2520 * Returns 0 on success, errno otherwise
2521 */
2522static int tipc_shutdown(struct socket *sock, int how)
2523{
2524        struct sock *sk = sock->sk;
2525        int res;
2526
2527        if (how != SHUT_RDWR)
2528                return -EINVAL;
2529
2530        lock_sock(sk);
2531
2532        __tipc_shutdown(sock, TIPC_CONN_SHUTDOWN);
2533        sk->sk_shutdown = SEND_SHUTDOWN;
2534
2535        if (sk->sk_state == TIPC_DISCONNECTING) {
2536                /* Discard any unreceived messages */
2537                __skb_queue_purge(&sk->sk_receive_queue);
2538
2539                /* Wake up anyone sleeping in poll */
2540                sk->sk_state_change(sk);
2541                res = 0;
2542        } else {
2543                res = -ENOTCONN;
2544        }
2545
2546        release_sock(sk);
2547        return res;
2548}
2549
2550static void tipc_sk_timeout(struct timer_list *t)
2551{
2552        struct sock *sk = from_timer(sk, t, sk_timer);
2553        struct tipc_sock *tsk = tipc_sk(sk);
2554        u32 peer_port = tsk_peer_port(tsk);
2555        u32 peer_node = tsk_peer_node(tsk);
2556        u32 own_node = tsk_own_node(tsk);
2557        u32 own_port = tsk->portid;
2558        struct net *net = sock_net(sk);
2559        struct sk_buff *skb = NULL;
2560
2561        bh_lock_sock(sk);
2562        if (!tipc_sk_connected(sk))
2563                goto exit;
2564
2565        /* Try again later if socket is busy */
2566        if (sock_owned_by_user(sk)) {
2567                sk_reset_timer(sk, &sk->sk_timer, jiffies + HZ / 20);
2568                goto exit;
2569        }
2570
2571        if (tsk->probe_unacked) {
2572                tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2573                tipc_node_remove_conn(net, peer_node, peer_port);
2574                sk->sk_state_change(sk);
2575                goto exit;
2576        }
2577        /* Send new probe */
2578        skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, 0,
2579                              peer_node, own_node, peer_port, own_port,
2580                              TIPC_OK);
2581        tsk->probe_unacked = true;
2582        sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
2583exit:
2584        bh_unlock_sock(sk);
2585        if (skb)
2586                tipc_node_xmit_skb(net, skb, peer_node, own_port);
2587        sock_put(sk);
2588}
2589
2590static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2591                           struct tipc_name_seq const *seq)
2592{
2593        struct sock *sk = &tsk->sk;
2594        struct net *net = sock_net(sk);
2595        struct publication *publ;
2596        u32 key;
2597
2598        if (scope != TIPC_NODE_SCOPE)
2599                scope = TIPC_CLUSTER_SCOPE;
2600
2601        if (tipc_sk_connected(sk))
2602                return -EINVAL;
2603        key = tsk->portid + tsk->pub_count + 1;
2604        if (key == tsk->portid)
2605                return -EADDRINUSE;
2606
2607        publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
2608                                    scope, tsk->portid, key);
2609        if (unlikely(!publ))
2610                return -EINVAL;
2611
2612        list_add(&publ->binding_sock, &tsk->publications);
2613        tsk->pub_count++;
2614        tsk->published = 1;
2615        return 0;
2616}
2617
2618static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2619                            struct tipc_name_seq const *seq)
2620{
2621        struct net *net = sock_net(&tsk->sk);
2622        struct publication *publ;
2623        struct publication *safe;
2624        int rc = -EINVAL;
2625
2626        if (scope != TIPC_NODE_SCOPE)
2627                scope = TIPC_CLUSTER_SCOPE;
2628
2629        list_for_each_entry_safe(publ, safe, &tsk->publications, binding_sock) {
2630                if (seq) {
2631                        if (publ->scope != scope)
2632                                continue;
2633                        if (publ->type != seq->type)
2634                                continue;
2635                        if (publ->lower != seq->lower)
2636                                continue;
2637                        if (publ->upper != seq->upper)
2638                                break;
2639                        tipc_nametbl_withdraw(net, publ->type, publ->lower,
2640                                              publ->upper, publ->key);
2641                        rc = 0;
2642                        break;
2643                }
2644                tipc_nametbl_withdraw(net, publ->type, publ->lower,
2645                                      publ->upper, publ->key);
2646                rc = 0;
2647        }
2648        if (list_empty(&tsk->publications))
2649                tsk->published = 0;
2650        return rc;
2651}
2652
2653/* tipc_sk_reinit: set non-zero address in all existing sockets
2654 *                 when we go from standalone to network mode.
2655 */
2656void tipc_sk_reinit(struct net *net)
2657{
2658        struct tipc_net *tn = net_generic(net, tipc_net_id);
2659        struct rhashtable_iter iter;
2660        struct tipc_sock *tsk;
2661        struct tipc_msg *msg;
2662
2663        rhashtable_walk_enter(&tn->sk_rht, &iter);
2664
2665        do {
2666                rhashtable_walk_start(&iter);
2667
2668                while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) {
2669                        spin_lock_bh(&tsk->sk.sk_lock.slock);
2670                        msg = &tsk->phdr;
2671                        msg_set_prevnode(msg, tipc_own_addr(net));
2672                        msg_set_orignode(msg, tipc_own_addr(net));
2673                        spin_unlock_bh(&tsk->sk.sk_lock.slock);
2674                }
2675
2676                rhashtable_walk_stop(&iter);
2677        } while (tsk == ERR_PTR(-EAGAIN));
2678}
2679
2680static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2681{
2682        struct tipc_net *tn = net_generic(net, tipc_net_id);
2683        struct tipc_sock *tsk;
2684
2685        rcu_read_lock();
2686        tsk = rhashtable_lookup_fast(&tn->sk_rht, &portid, tsk_rht_params);
2687        if (tsk)
2688                sock_hold(&tsk->sk);
2689        rcu_read_unlock();
2690
2691        return tsk;
2692}
2693
2694static int tipc_sk_insert(struct tipc_sock *tsk)
2695{
2696        struct sock *sk = &tsk->sk;
2697        struct net *net = sock_net(sk);
2698        struct tipc_net *tn = net_generic(net, tipc_net_id);
2699        u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
2700        u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2701
2702        while (remaining--) {
2703                portid++;
2704                if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
2705                        portid = TIPC_MIN_PORT;
2706                tsk->portid = portid;
2707                sock_hold(&tsk->sk);
2708                if (!rhashtable_lookup_insert_fast(&tn->sk_rht, &tsk->node,
2709                                                   tsk_rht_params))
2710                        return 0;
2711                sock_put(&tsk->sk);
2712        }
2713
2714        return -1;
2715}
2716
2717static void tipc_sk_remove(struct tipc_sock *tsk)
2718{
2719        struct sock *sk = &tsk->sk;
2720        struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
2721
2722        if (!rhashtable_remove_fast(&tn->sk_rht, &tsk->node, tsk_rht_params)) {
2723                WARN_ON(refcount_read(&sk->sk_refcnt) == 1);
2724                __sock_put(sk);
2725        }
2726}
2727
2728static const struct rhashtable_params tsk_rht_params = {
2729        .nelem_hint = 192,
2730        .head_offset = offsetof(struct tipc_sock, node),
2731        .key_offset = offsetof(struct tipc_sock, portid),
2732        .key_len = sizeof(u32), /* portid */
2733        .max_size = 1048576,
2734        .min_size = 256,
2735        .automatic_shrinking = true,
2736};
2737
2738int tipc_sk_rht_init(struct net *net)
2739{
2740        struct tipc_net *tn = net_generic(net, tipc_net_id);
2741
2742        return rhashtable_init(&tn->sk_rht, &tsk_rht_params);
2743}
2744
2745void tipc_sk_rht_destroy(struct net *net)
2746{
2747        struct tipc_net *tn = net_generic(net, tipc_net_id);
2748
2749        /* Wait for socket readers to complete */
2750        synchronize_net();
2751
2752        rhashtable_destroy(&tn->sk_rht);
2753}
2754
2755static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
2756{
2757        struct net *net = sock_net(&tsk->sk);
2758        struct tipc_group *grp = tsk->group;
2759        struct tipc_msg *hdr = &tsk->phdr;
2760        struct tipc_name_seq seq;
2761        int rc;
2762
2763        if (mreq->type < TIPC_RESERVED_TYPES)
2764                return -EACCES;
2765        if (mreq->scope > TIPC_NODE_SCOPE)
2766                return -EINVAL;
2767        if (grp)
2768                return -EACCES;
2769        grp = tipc_group_create(net, tsk->portid, mreq, &tsk->group_is_open);
2770        if (!grp)
2771                return -ENOMEM;
2772        tsk->group = grp;
2773        msg_set_lookup_scope(hdr, mreq->scope);
2774        msg_set_nametype(hdr, mreq->type);
2775        msg_set_dest_droppable(hdr, true);
2776        seq.type = mreq->type;
2777        seq.lower = mreq->instance;
2778        seq.upper = seq.lower;
2779        tipc_nametbl_build_group(net, grp, mreq->type, mreq->scope);
2780        rc = tipc_sk_publish(tsk, mreq->scope, &seq);
2781        if (rc) {
2782                tipc_group_delete(net, grp);
2783                tsk->group = NULL;
2784                return rc;
2785        }
2786        /* Eliminate any risk that a broadcast overtakes sent JOINs */
2787        tsk->mc_method.rcast = true;
2788        tsk->mc_method.mandatory = true;
2789        tipc_group_join(net, grp, &tsk->sk.sk_rcvbuf);
2790        return rc;
2791}
2792
2793static int tipc_sk_leave(struct tipc_sock *tsk)
2794{
2795        struct net *net = sock_net(&tsk->sk);
2796        struct tipc_group *grp = tsk->group;
2797        struct tipc_name_seq seq;
2798        int scope;
2799
2800        if (!grp)
2801                return -EINVAL;
2802        tipc_group_self(grp, &seq, &scope);
2803        tipc_group_delete(net, grp);
2804        tsk->group = NULL;
2805        tipc_sk_withdraw(tsk, scope, &seq);
2806        return 0;
2807}
2808
2809/**
2810 * tipc_setsockopt - set socket option
2811 * @sock: socket structure
2812 * @lvl: option level
2813 * @opt: option identifier
2814 * @ov: pointer to new option value
2815 * @ol: length of option value
2816 *
2817 * For stream sockets only, accepts and ignores all IPPROTO_TCP options
2818 * (to ease compatibility).
2819 *
2820 * Returns 0 on success, errno otherwise
2821 */
2822static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2823                           char __user *ov, unsigned int ol)
2824{
2825        struct sock *sk = sock->sk;
2826        struct tipc_sock *tsk = tipc_sk(sk);
2827        struct tipc_group_req mreq;
2828        u32 value = 0;
2829        int res = 0;
2830
2831        if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2832                return 0;
2833        if (lvl != SOL_TIPC)
2834                return -ENOPROTOOPT;
2835
2836        switch (opt) {
2837        case TIPC_IMPORTANCE:
2838        case TIPC_SRC_DROPPABLE:
2839        case TIPC_DEST_DROPPABLE:
2840        case TIPC_CONN_TIMEOUT:
2841                if (ol < sizeof(value))
2842                        return -EINVAL;
2843                if (get_user(value, (u32 __user *)ov))
2844                        return -EFAULT;
2845                break;
2846        case TIPC_GROUP_JOIN:
2847                if (ol < sizeof(mreq))
2848                        return -EINVAL;
2849                if (copy_from_user(&mreq, ov, sizeof(mreq)))
2850                        return -EFAULT;
2851                break;
2852        default:
2853                if (ov || ol)
2854                        return -EINVAL;
2855        }
2856
2857        lock_sock(sk);
2858
2859        switch (opt) {
2860        case TIPC_IMPORTANCE:
2861                res = tsk_set_importance(tsk, value);
2862                break;
2863        case TIPC_SRC_DROPPABLE:
2864                if (sock->type != SOCK_STREAM)
2865                        tsk_set_unreliable(tsk, value);
2866                else
2867                        res = -ENOPROTOOPT;
2868                break;
2869        case TIPC_DEST_DROPPABLE:
2870                tsk_set_unreturnable(tsk, value);
2871                break;
2872        case TIPC_CONN_TIMEOUT:
2873                tipc_sk(sk)->conn_timeout = value;
2874                break;
2875        case TIPC_MCAST_BROADCAST:
2876                tsk->mc_method.rcast = false;
2877                tsk->mc_method.mandatory = true;
2878                break;
2879        case TIPC_MCAST_REPLICAST:
2880                tsk->mc_method.rcast = true;
2881                tsk->mc_method.mandatory = true;
2882                break;
2883        case TIPC_GROUP_JOIN:
2884                res = tipc_sk_join(tsk, &mreq);
2885                break;
2886        case TIPC_GROUP_LEAVE:
2887                res = tipc_sk_leave(tsk);
2888                break;
2889        default:
2890                res = -EINVAL;
2891        }
2892
2893        release_sock(sk);
2894
2895        return res;
2896}
2897
2898/**
2899 * tipc_getsockopt - get socket option
2900 * @sock: socket structure
2901 * @lvl: option level
2902 * @opt: option identifier
2903 * @ov: receptacle for option value
2904 * @ol: receptacle for length of option value
2905 *
2906 * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
2907 * (to ease compatibility).
2908 *
2909 * Returns 0 on success, errno otherwise
2910 */
2911static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2912                           char __user *ov, int __user *ol)
2913{
2914        struct sock *sk = sock->sk;
2915        struct tipc_sock *tsk = tipc_sk(sk);
2916        struct tipc_name_seq seq;
2917        int len, scope;
2918        u32 value;
2919        int res;
2920
2921        if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2922                return put_user(0, ol);
2923        if (lvl != SOL_TIPC)
2924                return -ENOPROTOOPT;
2925        res = get_user(len, ol);
2926        if (res)
2927                return res;
2928
2929        lock_sock(sk);
2930
2931        switch (opt) {
2932        case TIPC_IMPORTANCE:
2933                value = tsk_importance(tsk);
2934                break;
2935        case TIPC_SRC_DROPPABLE:
2936                value = tsk_unreliable(tsk);
2937                break;
2938        case TIPC_DEST_DROPPABLE:
2939                value = tsk_unreturnable(tsk);
2940                break;
2941        case TIPC_CONN_TIMEOUT:
2942                value = tsk->conn_timeout;
2943                /* no need to set "res", since already 0 at this point */
2944                break;
2945        case TIPC_NODE_RECVQ_DEPTH:
2946                value = 0; /* was tipc_queue_size, now obsolete */
2947                break;
2948        case TIPC_SOCK_RECVQ_DEPTH:
2949                value = skb_queue_len(&sk->sk_receive_queue);
2950                break;
2951        case TIPC_GROUP_JOIN:
2952                seq.type = 0;
2953                if (tsk->group)
2954                        tipc_group_self(tsk->group, &seq, &scope);
2955                value = seq.type;
2956                break;
2957        default:
2958                res = -EINVAL;
2959        }
2960
2961        release_sock(sk);
2962
2963        if (res)
2964                return res;     /* "get" failed */
2965
2966        if (len < sizeof(value))
2967                return -EINVAL;
2968
2969        if (copy_to_user(ov, &value, sizeof(value)))
2970                return -EFAULT;
2971
2972        return put_user(sizeof(value), ol);
2973}
2974
2975static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
2976{
2977        struct sock *sk = sock->sk;
2978        struct tipc_sioc_ln_req lnr;
2979        void __user *argp = (void __user *)arg;
2980
2981        switch (cmd) {
2982        case SIOCGETLINKNAME:
2983                if (copy_from_user(&lnr, argp, sizeof(lnr)))
2984                        return -EFAULT;
2985                if (!tipc_node_get_linkname(sock_net(sk),
2986                                            lnr.bearer_id & 0xffff, lnr.peer,
2987                                            lnr.linkname, TIPC_MAX_LINK_NAME)) {
2988                        if (copy_to_user(argp, &lnr, sizeof(lnr)))
2989                                return -EFAULT;
2990                        return 0;
2991                }
2992                return -EADDRNOTAVAIL;
2993        default:
2994                return -ENOIOCTLCMD;
2995        }
2996}
2997
2998static int tipc_socketpair(struct socket *sock1, struct socket *sock2)
2999{
3000        struct tipc_sock *tsk2 = tipc_sk(sock2->sk);
3001        struct tipc_sock *tsk1 = tipc_sk(sock1->sk);
3002        u32 onode = tipc_own_addr(sock_net(sock1->sk));
3003
3004        tsk1->peer.family = AF_TIPC;
3005        tsk1->peer.addrtype = TIPC_ADDR_ID;
3006        tsk1->peer.scope = TIPC_NODE_SCOPE;
3007        tsk1->peer.addr.id.ref = tsk2->portid;
3008        tsk1->peer.addr.id.node = onode;
3009        tsk2->peer.family = AF_TIPC;
3010        tsk2->peer.addrtype = TIPC_ADDR_ID;
3011        tsk2->peer.scope = TIPC_NODE_SCOPE;
3012        tsk2->peer.addr.id.ref = tsk1->portid;
3013        tsk2->peer.addr.id.node = onode;
3014
3015        tipc_sk_finish_conn(tsk1, tsk2->portid, onode);
3016        tipc_sk_finish_conn(tsk2, tsk1->portid, onode);
3017        return 0;
3018}
3019
3020/* Protocol switches for the various types of TIPC sockets */
3021
3022static const struct proto_ops msg_ops = {
3023        .owner          = THIS_MODULE,
3024        .family         = AF_TIPC,
3025        .release        = tipc_release,
3026        .bind           = tipc_bind,
3027        .connect        = tipc_connect,
3028        .socketpair     = tipc_socketpair,
3029        .accept         = sock_no_accept,
3030        .getname        = tipc_getname,
3031        .poll           = tipc_poll,
3032        .ioctl          = tipc_ioctl,
3033        .listen         = sock_no_listen,
3034        .shutdown       = tipc_shutdown,
3035        .setsockopt     = tipc_setsockopt,
3036        .getsockopt     = tipc_getsockopt,
3037        .sendmsg        = tipc_sendmsg,
3038        .recvmsg        = tipc_recvmsg,
3039        .mmap           = sock_no_mmap,
3040        .sendpage       = sock_no_sendpage
3041};
3042
3043static const struct proto_ops packet_ops = {
3044        .owner          = THIS_MODULE,
3045        .family         = AF_TIPC,
3046        .release        = tipc_release,
3047        .bind           = tipc_bind,
3048        .connect        = tipc_connect,
3049        .socketpair     = tipc_socketpair,
3050        .accept         = tipc_accept,
3051        .getname        = tipc_getname,
3052        .poll           = tipc_poll,
3053        .ioctl          = tipc_ioctl,
3054        .listen         = tipc_listen,
3055        .shutdown       = tipc_shutdown,
3056        .setsockopt     = tipc_setsockopt,
3057        .getsockopt     = tipc_getsockopt,
3058        .sendmsg        = tipc_send_packet,
3059        .recvmsg        = tipc_recvmsg,
3060        .mmap           = sock_no_mmap,
3061        .sendpage       = sock_no_sendpage
3062};
3063
3064static const struct proto_ops stream_ops = {
3065        .owner          = THIS_MODULE,
3066        .family         = AF_TIPC,
3067        .release        = tipc_release,
3068        .bind           = tipc_bind,
3069        .connect        = tipc_connect,
3070        .socketpair     = tipc_socketpair,
3071        .accept         = tipc_accept,
3072        .getname        = tipc_getname,
3073        .poll           = tipc_poll,
3074        .ioctl          = tipc_ioctl,
3075        .listen         = tipc_listen,
3076        .shutdown       = tipc_shutdown,
3077        .setsockopt     = tipc_setsockopt,
3078        .getsockopt     = tipc_getsockopt,
3079        .sendmsg        = tipc_sendstream,
3080        .recvmsg        = tipc_recvstream,
3081        .mmap           = sock_no_mmap,
3082        .sendpage       = sock_no_sendpage
3083};
3084
3085static const struct net_proto_family tipc_family_ops = {
3086        .owner          = THIS_MODULE,
3087        .family         = AF_TIPC,
3088        .create         = tipc_sk_create
3089};
3090
3091static struct proto tipc_proto = {
3092        .name           = "TIPC",
3093        .owner          = THIS_MODULE,
3094        .obj_size       = sizeof(struct tipc_sock),
3095        .sysctl_rmem    = sysctl_tipc_rmem
3096};
3097
3098/**
3099 * tipc_socket_init - initialize TIPC socket interface
3100 *
3101 * Returns 0 on success, errno otherwise
3102 */
3103int tipc_socket_init(void)
3104{
3105        int res;
3106
3107        res = proto_register(&tipc_proto, 1);
3108        if (res) {
3109                pr_err("Failed to register TIPC protocol type\n");
3110                goto out;
3111        }
3112
3113        res = sock_register(&tipc_family_ops);
3114        if (res) {
3115                pr_err("Failed to register TIPC socket type\n");
3116                proto_unregister(&tipc_proto);
3117                goto out;
3118        }
3119 out:
3120        return res;
3121}
3122
3123/**
3124 * tipc_socket_stop - stop TIPC socket interface
3125 */
3126void tipc_socket_stop(void)
3127{
3128        sock_unregister(tipc_family_ops.family);
3129        proto_unregister(&tipc_proto);
3130}
3131
3132/* Caller should hold socket lock for the passed tipc socket. */
3133static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
3134{
3135        u32 peer_node;
3136        u32 peer_port;
3137        struct nlattr *nest;
3138
3139        peer_node = tsk_peer_node(tsk);
3140        peer_port = tsk_peer_port(tsk);
3141
3142        nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);
3143
3144        if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
3145                goto msg_full;
3146        if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
3147                goto msg_full;
3148
3149        if (tsk->conn_type != 0) {
3150                if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
3151                        goto msg_full;
3152                if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
3153                        goto msg_full;
3154                if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
3155                        goto msg_full;
3156        }
3157        nla_nest_end(skb, nest);
3158
3159        return 0;
3160
3161msg_full:
3162        nla_nest_cancel(skb, nest);
3163
3164        return -EMSGSIZE;
3165}
3166
3167static int __tipc_nl_add_sk_info(struct sk_buff *skb, struct tipc_sock
3168                          *tsk)
3169{
3170        struct net *net = sock_net(skb->sk);
3171        struct sock *sk = &tsk->sk;
3172
3173        if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid) ||
3174            nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tipc_own_addr(net)))
3175                return -EMSGSIZE;
3176
3177        if (tipc_sk_connected(sk)) {
3178                if (__tipc_nl_add_sk_con(skb, tsk))
3179                        return -EMSGSIZE;
3180        } else if (!list_empty(&tsk->publications)) {
3181                if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
3182                        return -EMSGSIZE;
3183        }
3184        return 0;
3185}
3186
3187/* Caller should hold socket lock for the passed tipc socket. */
3188static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
3189                            struct tipc_sock *tsk)
3190{
3191        struct nlattr *attrs;
3192        void *hdr;
3193
3194        hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
3195                          &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
3196        if (!hdr)
3197                goto msg_cancel;
3198
3199        attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
3200        if (!attrs)
3201                goto genlmsg_cancel;
3202
3203        if (__tipc_nl_add_sk_info(skb, tsk))
3204                goto attr_msg_cancel;
3205
3206        nla_nest_end(skb, attrs);
3207        genlmsg_end(skb, hdr);
3208
3209        return 0;
3210
3211attr_msg_cancel:
3212        nla_nest_cancel(skb, attrs);
3213genlmsg_cancel:
3214        genlmsg_cancel(skb, hdr);
3215msg_cancel:
3216        return -EMSGSIZE;
3217}
3218
3219int tipc_nl_sk_walk(struct sk_buff *skb, struct netlink_callback *cb,
3220                    int (*skb_handler)(struct sk_buff *skb,
3221                                       struct netlink_callback *cb,
3222                                       struct tipc_sock *tsk))
3223{
3224        struct net *net = sock_net(skb->sk);
3225        struct tipc_net *tn = tipc_net(net);
3226        const struct bucket_table *tbl;
3227        u32 prev_portid = cb->args[1];
3228        u32 tbl_id = cb->args[0];
3229        struct rhash_head *pos;
3230        struct tipc_sock *tsk;
3231        int err;
3232
3233        rcu_read_lock();
3234        tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
3235        for (; tbl_id < tbl->size; tbl_id++) {
3236                rht_for_each_entry_rcu(tsk, pos, tbl, tbl_id, node) {
3237                        spin_lock_bh(&tsk->sk.sk_lock.slock);
3238                        if (prev_portid && prev_portid != tsk->portid) {
3239                                spin_unlock_bh(&tsk->sk.sk_lock.slock);
3240                                continue;
3241                        }
3242
3243                        err = skb_handler(skb, cb, tsk);
3244                        if (err) {
3245                                prev_portid = tsk->portid;
3246                                spin_unlock_bh(&tsk->sk.sk_lock.slock);
3247                                goto out;
3248                        }
3249
3250                        prev_portid = 0;
3251                        spin_unlock_bh(&tsk->sk.sk_lock.slock);
3252                }
3253        }
3254out:
3255        rcu_read_unlock();
3256        cb->args[0] = tbl_id;
3257        cb->args[1] = prev_portid;
3258
3259        return skb->len;
3260}
3261EXPORT_SYMBOL(tipc_nl_sk_walk);
3262
3263int tipc_sk_fill_sock_diag(struct sk_buff *skb, struct netlink_callback *cb,
3264                           struct tipc_sock *tsk, u32 sk_filter_state,
3265                           u64 (*tipc_diag_gen_cookie)(struct sock *sk))
3266{
3267        struct sock *sk = &tsk->sk;
3268        struct nlattr *attrs;
3269        struct nlattr *stat;
3270
3271        /*filter response w.r.t sk_state*/
3272        if (!(sk_filter_state & (1 << sk->sk_state)))
3273                return 0;
3274
3275        attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
3276        if (!attrs)
3277                goto msg_cancel;
3278
3279        if (__tipc_nl_add_sk_info(skb, tsk))
3280                goto attr_msg_cancel;
3281
3282        if (nla_put_u32(skb, TIPC_NLA_SOCK_TYPE, (u32)sk->sk_type) ||
3283            nla_put_u32(skb, TIPC_NLA_SOCK_TIPC_STATE, (u32)sk->sk_state) ||
3284            nla_put_u32(skb, TIPC_NLA_SOCK_INO, sock_i_ino(sk)) ||
3285            nla_put_u32(skb, TIPC_NLA_SOCK_UID,
3286                        from_kuid_munged(sk_user_ns(NETLINK_CB(cb->skb).sk),
3287                                         sock_i_uid(sk))) ||
3288            nla_put_u64_64bit(skb, TIPC_NLA_SOCK_COOKIE,
3289                              tipc_diag_gen_cookie(sk),
3290                              TIPC_NLA_SOCK_PAD))
3291                goto attr_msg_cancel;
3292
3293        stat = nla_nest_start(skb, TIPC_NLA_SOCK_STAT);
3294        if (!stat)
3295                goto attr_msg_cancel;
3296
3297        if (nla_put_u32(skb, TIPC_NLA_SOCK_STAT_RCVQ,
3298                        skb_queue_len(&sk->sk_receive_queue)) ||
3299            nla_put_u32(skb, TIPC_NLA_SOCK_STAT_SENDQ,
3300                        skb_queue_len(&sk->sk_write_queue)) ||
3301            nla_put_u32(skb, TIPC_NLA_SOCK_STAT_DROP,
3302                        atomic_read(&sk->sk_drops)))
3303                goto stat_msg_cancel;
3304
3305        if (tsk->cong_link_cnt &&
3306            nla_put_flag(skb, TIPC_NLA_SOCK_STAT_LINK_CONG))
3307                goto stat_msg_cancel;
3308
3309        if (tsk_conn_cong(tsk) &&
3310            nla_put_flag(skb, TIPC_NLA_SOCK_STAT_CONN_CONG))
3311                goto stat_msg_cancel;
3312
3313        nla_nest_end(skb, stat);
3314        nla_nest_end(skb, attrs);
3315
3316        return 0;
3317
3318stat_msg_cancel:
3319        nla_nest_cancel(skb, stat);
3320attr_msg_cancel:
3321        nla_nest_cancel(skb, attrs);
3322msg_cancel:
3323        return -EMSGSIZE;
3324}
3325EXPORT_SYMBOL(tipc_sk_fill_sock_diag);
3326
3327int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
3328{
3329        return tipc_nl_sk_walk(skb, cb, __tipc_nl_add_sk);
3330}
3331
3332/* Caller should hold socket lock for the passed tipc socket. */
3333static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
3334                                 struct netlink_callback *cb,
3335                                 struct publication *publ)
3336{
3337        void *hdr;
3338        struct nlattr *attrs;
3339
3340        hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
3341                          &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
3342        if (!hdr)
3343                goto msg_cancel;
3344
3345        attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
3346        if (!attrs)
3347                goto genlmsg_cancel;
3348
3349        if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
3350                goto attr_msg_cancel;
3351        if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
3352                goto attr_msg_cancel;
3353        if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
3354                goto attr_msg_cancel;
3355        if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
3356                goto attr_msg_cancel;
3357
3358        nla_nest_end(skb, attrs);
3359        genlmsg_end(skb, hdr);
3360
3361        return 0;
3362
3363attr_msg_cancel:
3364        nla_nest_cancel(skb, attrs);
3365genlmsg_cancel:
3366        genlmsg_cancel(skb, hdr);
3367msg_cancel:
3368        return -EMSGSIZE;
3369}
3370
3371/* Caller should hold socket lock for the passed tipc socket. */
3372static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
3373                                  struct netlink_callback *cb,
3374                                  struct tipc_sock *tsk, u32 *last_publ)
3375{
3376        int err;
3377        struct publication *p;
3378
3379        if (*last_publ) {
3380                list_for_each_entry(p, &tsk->publications, binding_sock) {
3381                        if (p->key == *last_publ)
3382                                break;
3383                }
3384                if (p->key != *last_publ) {
3385                        /* We never set seq or call nl_dump_check_consistent()
3386                         * this means that setting prev_seq here will cause the
3387                         * consistence check to fail in the netlink callback
3388                         * handler. Resulting in the last NLMSG_DONE message
3389                         * having the NLM_F_DUMP_INTR flag set.
3390                         */
3391                        cb->prev_seq = 1;
3392                        *last_publ = 0;
3393                        return -EPIPE;
3394                }
3395        } else {
3396                p = list_first_entry(&tsk->publications, struct publication,
3397                                     binding_sock);
3398        }
3399
3400        list_for_each_entry_from(p, &tsk->publications, binding_sock) {
3401                err = __tipc_nl_add_sk_publ(skb, cb, p);
3402                if (err) {
3403                        *last_publ = p->key;
3404                        return err;
3405                }
3406        }
3407        *last_publ = 0;
3408
3409        return 0;
3410}
3411
3412int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
3413{
3414        int err;
3415        u32 tsk_portid = cb->args[0];
3416        u32 last_publ = cb->args[1];
3417        u32 done = cb->args[2];
3418        struct net *net = sock_net(skb->sk);
3419        struct tipc_sock *tsk;
3420
3421        if (!tsk_portid) {
3422                struct nlattr **attrs;
3423                struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
3424
3425                err = tipc_nlmsg_parse(cb->nlh, &attrs);
3426                if (err)
3427                        return err;
3428
3429                if (!attrs[TIPC_NLA_SOCK])
3430                        return -EINVAL;
3431
3432                err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
3433                                       attrs[TIPC_NLA_SOCK],
3434                                       tipc_nl_sock_policy, NULL);
3435                if (err)
3436                        return err;
3437
3438                if (!sock[TIPC_NLA_SOCK_REF])
3439                        return -EINVAL;
3440
3441                tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
3442        }
3443
3444        if (done)
3445                return 0;
3446
3447        tsk = tipc_sk_lookup(net, tsk_portid);
3448        if (!tsk)
3449                return -EINVAL;
3450
3451        lock_sock(&tsk->sk);
3452        err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
3453        if (!err)
3454                done = 1;
3455        release_sock(&tsk->sk);
3456        sock_put(&tsk->sk);
3457
3458        cb->args[0] = tsk_portid;
3459        cb->args[1] = last_publ;
3460        cb->args[2] = done;
3461
3462        return skb->len;
3463}
3464