linux/net/tipc/socket.c
<<
>>
Prefs
   1/*
   2 * net/tipc/socket.c: TIPC socket API
   3 *
   4 * Copyright (c) 2001-2007, 2012-2016, Ericsson AB
   5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include <linux/rhashtable.h>
  38#include "core.h"
  39#include "name_table.h"
  40#include "node.h"
  41#include "link.h"
  42#include "name_distr.h"
  43#include "socket.h"
  44#include "bcast.h"
  45#include "netlink.h"
  46
  47#define SS_LISTENING            -1      /* socket is listening */
  48#define SS_READY                -2      /* socket is connectionless */
  49
  50#define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
  51#define CONN_PROBING_INTERVAL   msecs_to_jiffies(3600000)  /* [ms] => 1 h */
  52#define TIPC_FWD_MSG            1
  53#define TIPC_CONN_OK            0
  54#define TIPC_CONN_PROBING       1
  55#define TIPC_MAX_PORT           0xffffffff
  56#define TIPC_MIN_PORT           1
  57
  58/**
  59 * struct tipc_sock - TIPC socket structure
  60 * @sk: socket - interacts with 'port' and with user via the socket API
  61 * @connected: non-zero if port is currently connected to a peer port
  62 * @conn_type: TIPC type used when connection was established
  63 * @conn_instance: TIPC instance used when connection was established
  64 * @published: non-zero if port has one or more associated names
  65 * @max_pkt: maximum packet size "hint" used when building messages sent by port
  66 * @portid: unique port identity in TIPC socket hash table
  67 * @phdr: preformatted message header used when sending messages
  68 * @port_list: adjacent ports in TIPC's global list of ports
  69 * @publications: list of publications for port
  70 * @pub_count: total # of publications port has made during its lifetime
  71 * @probing_state:
  72 * @probing_intv:
  73 * @conn_timeout: the time we can wait for an unresponded setup request
  74 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
  75 * @link_cong: non-zero if owner must sleep because of link congestion
  76 * @sent_unacked: # messages sent by socket, and not yet acked by peer
  77 * @rcv_unacked: # messages read by user, but not yet acked back to peer
  78 * @remote: 'connected' peer for dgram/rdm
  79 * @node: hash table node
  80 * @rcu: rcu struct for tipc_sock
  81 */
  82struct tipc_sock {
  83        struct sock sk;
  84        int connected;
  85        u32 conn_type;
  86        u32 conn_instance;
  87        int published;
  88        u32 max_pkt;
  89        u32 portid;
  90        struct tipc_msg phdr;
  91        struct list_head sock_list;
  92        struct list_head publications;
  93        u32 pub_count;
  94        u32 probing_state;
  95        unsigned long probing_intv;
  96        uint conn_timeout;
  97        atomic_t dupl_rcvcnt;
  98        bool link_cong;
  99        u16 snt_unacked;
 100        u16 snd_win;
 101        u16 peer_caps;
 102        u16 rcv_unacked;
 103        u16 rcv_win;
 104        struct sockaddr_tipc remote;
 105        struct rhash_head node;
 106        struct rcu_head rcu;
 107};
 108
 109static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
 110static void tipc_data_ready(struct sock *sk);
 111static void tipc_write_space(struct sock *sk);
 112static void tipc_sock_destruct(struct sock *sk);
 113static int tipc_release(struct socket *sock);
 114static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
 115static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
 116static void tipc_sk_timeout(unsigned long data);
 117static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
 118                           struct tipc_name_seq const *seq);
 119static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
 120                            struct tipc_name_seq const *seq);
 121static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
 122static int tipc_sk_insert(struct tipc_sock *tsk);
 123static void tipc_sk_remove(struct tipc_sock *tsk);
 124static int __tipc_send_stream(struct socket *sock, struct msghdr *m,
 125                              size_t dsz);
 126static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
 127
 128static const struct proto_ops packet_ops;
 129static const struct proto_ops stream_ops;
 130static const struct proto_ops msg_ops;
 131static struct proto tipc_proto;
 132static const struct rhashtable_params tsk_rht_params;
 133
 134static u32 tsk_own_node(struct tipc_sock *tsk)
 135{
 136        return msg_prevnode(&tsk->phdr);
 137}
 138
 139static u32 tsk_peer_node(struct tipc_sock *tsk)
 140{
 141        return msg_destnode(&tsk->phdr);
 142}
 143
 144static u32 tsk_peer_port(struct tipc_sock *tsk)
 145{
 146        return msg_destport(&tsk->phdr);
 147}
 148
 149static  bool tsk_unreliable(struct tipc_sock *tsk)
 150{
 151        return msg_src_droppable(&tsk->phdr) != 0;
 152}
 153
 154static void tsk_set_unreliable(struct tipc_sock *tsk, bool unreliable)
 155{
 156        msg_set_src_droppable(&tsk->phdr, unreliable ? 1 : 0);
 157}
 158
 159static bool tsk_unreturnable(struct tipc_sock *tsk)
 160{
 161        return msg_dest_droppable(&tsk->phdr) != 0;
 162}
 163
 164static void tsk_set_unreturnable(struct tipc_sock *tsk, bool unreturnable)
 165{
 166        msg_set_dest_droppable(&tsk->phdr, unreturnable ? 1 : 0);
 167}
 168
 169static int tsk_importance(struct tipc_sock *tsk)
 170{
 171        return msg_importance(&tsk->phdr);
 172}
 173
 174static int tsk_set_importance(struct tipc_sock *tsk, int imp)
 175{
 176        if (imp > TIPC_CRITICAL_IMPORTANCE)
 177                return -EINVAL;
 178        msg_set_importance(&tsk->phdr, (u32)imp);
 179        return 0;
 180}
 181
 182static struct tipc_sock *tipc_sk(const struct sock *sk)
 183{
 184        return container_of(sk, struct tipc_sock, sk);
 185}
 186
 187static bool tsk_conn_cong(struct tipc_sock *tsk)
 188{
 189        return tsk->snt_unacked > tsk->snd_win;
 190}
 191
 192/* tsk_blocks(): translate a buffer size in bytes to number of
 193 * advertisable blocks, taking into account the ratio truesize(len)/len
 194 * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
 195 */
 196static u16 tsk_adv_blocks(int len)
 197{
 198        return len / FLOWCTL_BLK_SZ / 4;
 199}
 200
 201/* tsk_inc(): increment counter for sent or received data
 202 * - If block based flow control is not supported by peer we
 203 *   fall back to message based ditto, incrementing the counter
 204 */
 205static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
 206{
 207        if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
 208                return ((msglen / FLOWCTL_BLK_SZ) + 1);
 209        return 1;
 210}
 211
 212/**
 213 * tsk_advance_rx_queue - discard first buffer in socket receive queue
 214 *
 215 * Caller must hold socket lock
 216 */
 217static void tsk_advance_rx_queue(struct sock *sk)
 218{
 219        kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
 220}
 221
 222/* tipc_sk_respond() : send response message back to sender
 223 */
 224static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
 225{
 226        u32 selector;
 227        u32 dnode;
 228        u32 onode = tipc_own_addr(sock_net(sk));
 229
 230        if (!tipc_msg_reverse(onode, &skb, err))
 231                return;
 232
 233        dnode = msg_destnode(buf_msg(skb));
 234        selector = msg_origport(buf_msg(skb));
 235        tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
 236}
 237
 238/**
 239 * tsk_rej_rx_queue - reject all buffers in socket receive queue
 240 *
 241 * Caller must hold socket lock
 242 */
 243static void tsk_rej_rx_queue(struct sock *sk)
 244{
 245        struct sk_buff *skb;
 246
 247        while ((skb = __skb_dequeue(&sk->sk_receive_queue)))
 248                tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
 249}
 250
 251/* tsk_peer_msg - verify if message was sent by connected port's peer
 252 *
 253 * Handles cases where the node's network address has changed from
 254 * the default of <0.0.0> to its configured setting.
 255 */
 256static bool tsk_peer_msg(struct tipc_sock *tsk, struct tipc_msg *msg)
 257{
 258        struct tipc_net *tn = net_generic(sock_net(&tsk->sk), tipc_net_id);
 259        u32 peer_port = tsk_peer_port(tsk);
 260        u32 orig_node;
 261        u32 peer_node;
 262
 263        if (unlikely(!tsk->connected))
 264                return false;
 265
 266        if (unlikely(msg_origport(msg) != peer_port))
 267                return false;
 268
 269        orig_node = msg_orignode(msg);
 270        peer_node = tsk_peer_node(tsk);
 271
 272        if (likely(orig_node == peer_node))
 273                return true;
 274
 275        if (!orig_node && (peer_node == tn->own_addr))
 276                return true;
 277
 278        if (!peer_node && (orig_node == tn->own_addr))
 279                return true;
 280
 281        return false;
 282}
 283
 284/**
 285 * tipc_sk_create - create a TIPC socket
 286 * @net: network namespace (must be default network)
 287 * @sock: pre-allocated socket structure
 288 * @protocol: protocol indicator (must be 0)
 289 * @kern: caused by kernel or by userspace?
 290 *
 291 * This routine creates additional data structures used by the TIPC socket,
 292 * initializes them, and links them together.
 293 *
 294 * Returns 0 on success, errno otherwise
 295 */
 296static int tipc_sk_create(struct net *net, struct socket *sock,
 297                          int protocol, int kern)
 298{
 299        struct tipc_net *tn;
 300        const struct proto_ops *ops;
 301        socket_state state;
 302        struct sock *sk;
 303        struct tipc_sock *tsk;
 304        struct tipc_msg *msg;
 305
 306        /* Validate arguments */
 307        if (unlikely(protocol != 0))
 308                return -EPROTONOSUPPORT;
 309
 310        switch (sock->type) {
 311        case SOCK_STREAM:
 312                ops = &stream_ops;
 313                state = SS_UNCONNECTED;
 314                break;
 315        case SOCK_SEQPACKET:
 316                ops = &packet_ops;
 317                state = SS_UNCONNECTED;
 318                break;
 319        case SOCK_DGRAM:
 320        case SOCK_RDM:
 321                ops = &msg_ops;
 322                state = SS_READY;
 323                break;
 324        default:
 325                return -EPROTOTYPE;
 326        }
 327
 328        /* Allocate socket's protocol area */
 329        sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto, kern);
 330        if (sk == NULL)
 331                return -ENOMEM;
 332
 333        tsk = tipc_sk(sk);
 334        tsk->max_pkt = MAX_PKT_DEFAULT;
 335        INIT_LIST_HEAD(&tsk->publications);
 336        msg = &tsk->phdr;
 337        tn = net_generic(sock_net(sk), tipc_net_id);
 338        tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
 339                      NAMED_H_SIZE, 0);
 340
 341        /* Finish initializing socket data structures */
 342        sock->ops = ops;
 343        sock->state = state;
 344        sock_init_data(sock, sk);
 345        if (tipc_sk_insert(tsk)) {
 346                pr_warn("Socket create failed; port number exhausted\n");
 347                return -EINVAL;
 348        }
 349        msg_set_origport(msg, tsk->portid);
 350        setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
 351        sk->sk_backlog_rcv = tipc_backlog_rcv;
 352        sk->sk_rcvbuf = sysctl_tipc_rmem[1];
 353        sk->sk_data_ready = tipc_data_ready;
 354        sk->sk_write_space = tipc_write_space;
 355        sk->sk_destruct = tipc_sock_destruct;
 356        tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
 357        atomic_set(&tsk->dupl_rcvcnt, 0);
 358
 359        /* Start out with safe limits until we receive an advertised window */
 360        tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
 361        tsk->rcv_win = tsk->snd_win;
 362
 363        if (sock->state == SS_READY) {
 364                tsk_set_unreturnable(tsk, true);
 365                if (sock->type == SOCK_DGRAM)
 366                        tsk_set_unreliable(tsk, true);
 367        }
 368        return 0;
 369}
 370
 371static void tipc_sk_callback(struct rcu_head *head)
 372{
 373        struct tipc_sock *tsk = container_of(head, struct tipc_sock, rcu);
 374
 375        sock_put(&tsk->sk);
 376}
 377
 378/**
 379 * tipc_release - destroy a TIPC socket
 380 * @sock: socket to destroy
 381 *
 382 * This routine cleans up any messages that are still queued on the socket.
 383 * For DGRAM and RDM socket types, all queued messages are rejected.
 384 * For SEQPACKET and STREAM socket types, the first message is rejected
 385 * and any others are discarded.  (If the first message on a STREAM socket
 386 * is partially-read, it is discarded and the next one is rejected instead.)
 387 *
 388 * NOTE: Rejected messages are not necessarily returned to the sender!  They
 389 * are returned or discarded according to the "destination droppable" setting
 390 * specified for the message by the sender.
 391 *
 392 * Returns 0 on success, errno otherwise
 393 */
 394static int tipc_release(struct socket *sock)
 395{
 396        struct sock *sk = sock->sk;
 397        struct net *net;
 398        struct tipc_sock *tsk;
 399        struct sk_buff *skb;
 400        u32 dnode;
 401
 402        /*
 403         * Exit if socket isn't fully initialized (occurs when a failed accept()
 404         * releases a pre-allocated child socket that was never used)
 405         */
 406        if (sk == NULL)
 407                return 0;
 408
 409        net = sock_net(sk);
 410        tsk = tipc_sk(sk);
 411        lock_sock(sk);
 412
 413        /*
 414         * Reject all unreceived messages, except on an active connection
 415         * (which disconnects locally & sends a 'FIN+' to peer)
 416         */
 417        dnode = tsk_peer_node(tsk);
 418        while (sock->state != SS_DISCONNECTING) {
 419                skb = __skb_dequeue(&sk->sk_receive_queue);
 420                if (skb == NULL)
 421                        break;
 422                if (TIPC_SKB_CB(skb)->handle != NULL)
 423                        kfree_skb(skb);
 424                else {
 425                        if ((sock->state == SS_CONNECTING) ||
 426                            (sock->state == SS_CONNECTED)) {
 427                                sock->state = SS_DISCONNECTING;
 428                                tsk->connected = 0;
 429                                tipc_node_remove_conn(net, dnode, tsk->portid);
 430                        }
 431                        tipc_sk_respond(sk, skb, TIPC_ERR_NO_PORT);
 432                }
 433        }
 434
 435        tipc_sk_withdraw(tsk, 0, NULL);
 436        sk_stop_timer(sk, &sk->sk_timer);
 437        tipc_sk_remove(tsk);
 438        if (tsk->connected) {
 439                skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
 440                                      TIPC_CONN_MSG, SHORT_H_SIZE, 0, dnode,
 441                                      tsk_own_node(tsk), tsk_peer_port(tsk),
 442                                      tsk->portid, TIPC_ERR_NO_PORT);
 443                if (skb)
 444                        tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
 445                tipc_node_remove_conn(net, dnode, tsk->portid);
 446        }
 447
 448        /* Reject any messages that accumulated in backlog queue */
 449        sock->state = SS_DISCONNECTING;
 450        release_sock(sk);
 451
 452        call_rcu(&tsk->rcu, tipc_sk_callback);
 453        sock->sk = NULL;
 454
 455        return 0;
 456}
 457
 458/**
 459 * tipc_bind - associate or disassocate TIPC name(s) with a socket
 460 * @sock: socket structure
 461 * @uaddr: socket address describing name(s) and desired operation
 462 * @uaddr_len: size of socket address data structure
 463 *
 464 * Name and name sequence binding is indicated using a positive scope value;
 465 * a negative scope value unbinds the specified name.  Specifying no name
 466 * (i.e. a socket address length of 0) unbinds all names from the socket.
 467 *
 468 * Returns 0 on success, errno otherwise
 469 *
 470 * NOTE: This routine doesn't need to take the socket lock since it doesn't
 471 *       access any non-constant socket information.
 472 */
 473static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
 474                     int uaddr_len)
 475{
 476        struct sock *sk = sock->sk;
 477        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
 478        struct tipc_sock *tsk = tipc_sk(sk);
 479        int res = -EINVAL;
 480
 481        lock_sock(sk);
 482        if (unlikely(!uaddr_len)) {
 483                res = tipc_sk_withdraw(tsk, 0, NULL);
 484                goto exit;
 485        }
 486
 487        if (uaddr_len < sizeof(struct sockaddr_tipc)) {
 488                res = -EINVAL;
 489                goto exit;
 490        }
 491        if (addr->family != AF_TIPC) {
 492                res = -EAFNOSUPPORT;
 493                goto exit;
 494        }
 495
 496        if (addr->addrtype == TIPC_ADDR_NAME)
 497                addr->addr.nameseq.upper = addr->addr.nameseq.lower;
 498        else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
 499                res = -EAFNOSUPPORT;
 500                goto exit;
 501        }
 502
 503        if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
 504            (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
 505            (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
 506                res = -EACCES;
 507                goto exit;
 508        }
 509
 510        res = (addr->scope > 0) ?
 511                tipc_sk_publish(tsk, addr->scope, &addr->addr.nameseq) :
 512                tipc_sk_withdraw(tsk, -addr->scope, &addr->addr.nameseq);
 513exit:
 514        release_sock(sk);
 515        return res;
 516}
 517
 518/**
 519 * tipc_getname - get port ID of socket or peer socket
 520 * @sock: socket structure
 521 * @uaddr: area for returned socket address
 522 * @uaddr_len: area for returned length of socket address
 523 * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
 524 *
 525 * Returns 0 on success, errno otherwise
 526 *
 527 * NOTE: This routine doesn't need to take the socket lock since it only
 528 *       accesses socket information that is unchanging (or which changes in
 529 *       a completely predictable manner).
 530 */
 531static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
 532                        int *uaddr_len, int peer)
 533{
 534        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
 535        struct tipc_sock *tsk = tipc_sk(sock->sk);
 536        struct tipc_net *tn = net_generic(sock_net(sock->sk), tipc_net_id);
 537
 538        memset(addr, 0, sizeof(*addr));
 539        if (peer) {
 540                if ((sock->state != SS_CONNECTED) &&
 541                        ((peer != 2) || (sock->state != SS_DISCONNECTING)))
 542                        return -ENOTCONN;
 543                addr->addr.id.ref = tsk_peer_port(tsk);
 544                addr->addr.id.node = tsk_peer_node(tsk);
 545        } else {
 546                addr->addr.id.ref = tsk->portid;
 547                addr->addr.id.node = tn->own_addr;
 548        }
 549
 550        *uaddr_len = sizeof(*addr);
 551        addr->addrtype = TIPC_ADDR_ID;
 552        addr->family = AF_TIPC;
 553        addr->scope = 0;
 554        addr->addr.name.domain = 0;
 555
 556        return 0;
 557}
 558
 559/**
 560 * tipc_poll - read and possibly block on pollmask
 561 * @file: file structure associated with the socket
 562 * @sock: socket for which to calculate the poll bits
 563 * @wait: ???
 564 *
 565 * Returns pollmask value
 566 *
 567 * COMMENTARY:
 568 * It appears that the usual socket locking mechanisms are not useful here
 569 * since the pollmask info is potentially out-of-date the moment this routine
 570 * exits.  TCP and other protocols seem to rely on higher level poll routines
 571 * to handle any preventable race conditions, so TIPC will do the same ...
 572 *
 573 * TIPC sets the returned events as follows:
 574 *
 575 * socket state         flags set
 576 * ------------         ---------
 577 * unconnected          no read flags
 578 *                      POLLOUT if port is not congested
 579 *
 580 * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
 581 *                      no write flags
 582 *
 583 * connected            POLLIN/POLLRDNORM if data in rx queue
 584 *                      POLLOUT if port is not congested
 585 *
 586 * disconnecting        POLLIN/POLLRDNORM/POLLHUP
 587 *                      no write flags
 588 *
 589 * listening            POLLIN if SYN in rx queue
 590 *                      no write flags
 591 *
 592 * ready                POLLIN/POLLRDNORM if data in rx queue
 593 * [connectionless]     POLLOUT (since port cannot be congested)
 594 *
 595 * IMPORTANT: The fact that a read or write operation is indicated does NOT
 596 * imply that the operation will succeed, merely that it should be performed
 597 * and will not block.
 598 */
 599static unsigned int tipc_poll(struct file *file, struct socket *sock,
 600                              poll_table *wait)
 601{
 602        struct sock *sk = sock->sk;
 603        struct tipc_sock *tsk = tipc_sk(sk);
 604        u32 mask = 0;
 605
 606        sock_poll_wait(file, sk_sleep(sk), wait);
 607
 608        switch ((int)sock->state) {
 609        case SS_UNCONNECTED:
 610                if (!tsk->link_cong)
 611                        mask |= POLLOUT;
 612                break;
 613        case SS_READY:
 614        case SS_CONNECTED:
 615                if (!tsk->link_cong && !tsk_conn_cong(tsk))
 616                        mask |= POLLOUT;
 617                /* fall thru' */
 618        case SS_CONNECTING:
 619        case SS_LISTENING:
 620                if (!skb_queue_empty(&sk->sk_receive_queue))
 621                        mask |= (POLLIN | POLLRDNORM);
 622                break;
 623        case SS_DISCONNECTING:
 624                mask = (POLLIN | POLLRDNORM | POLLHUP);
 625                break;
 626        }
 627
 628        return mask;
 629}
 630
 631/**
 632 * tipc_sendmcast - send multicast message
 633 * @sock: socket structure
 634 * @seq: destination address
 635 * @msg: message to send
 636 * @dsz: total length of message data
 637 * @timeo: timeout to wait for wakeup
 638 *
 639 * Called from function tipc_sendmsg(), which has done all sanity checks
 640 * Returns the number of bytes sent on success, or errno
 641 */
 642static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
 643                          struct msghdr *msg, size_t dsz, long timeo)
 644{
 645        struct sock *sk = sock->sk;
 646        struct tipc_sock *tsk = tipc_sk(sk);
 647        struct net *net = sock_net(sk);
 648        struct tipc_msg *mhdr = &tsk->phdr;
 649        struct sk_buff_head pktchain;
 650        struct iov_iter save = msg->msg_iter;
 651        uint mtu;
 652        int rc;
 653
 654        msg_set_type(mhdr, TIPC_MCAST_MSG);
 655        msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
 656        msg_set_destport(mhdr, 0);
 657        msg_set_destnode(mhdr, 0);
 658        msg_set_nametype(mhdr, seq->type);
 659        msg_set_namelower(mhdr, seq->lower);
 660        msg_set_nameupper(mhdr, seq->upper);
 661        msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
 662
 663        skb_queue_head_init(&pktchain);
 664
 665new_mtu:
 666        mtu = tipc_bcast_get_mtu(net);
 667        rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain);
 668        if (unlikely(rc < 0))
 669                return rc;
 670
 671        do {
 672                rc = tipc_bcast_xmit(net, &pktchain);
 673                if (likely(!rc))
 674                        return dsz;
 675
 676                if (rc == -ELINKCONG) {
 677                        tsk->link_cong = 1;
 678                        rc = tipc_wait_for_sndmsg(sock, &timeo);
 679                        if (!rc)
 680                                continue;
 681                }
 682                __skb_queue_purge(&pktchain);
 683                if (rc == -EMSGSIZE) {
 684                        msg->msg_iter = save;
 685                        goto new_mtu;
 686                }
 687                break;
 688        } while (1);
 689        return rc;
 690}
 691
 692/**
 693 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
 694 * @arrvq: queue with arriving messages, to be cloned after destination lookup
 695 * @inputq: queue with cloned messages, delivered to socket after dest lookup
 696 *
 697 * Multi-threaded: parallel calls with reference to same queues may occur
 698 */
 699void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
 700                       struct sk_buff_head *inputq)
 701{
 702        struct tipc_msg *msg;
 703        struct tipc_plist dports;
 704        u32 portid;
 705        u32 scope = TIPC_CLUSTER_SCOPE;
 706        struct sk_buff_head tmpq;
 707        uint hsz;
 708        struct sk_buff *skb, *_skb;
 709
 710        __skb_queue_head_init(&tmpq);
 711        tipc_plist_init(&dports);
 712
 713        skb = tipc_skb_peek(arrvq, &inputq->lock);
 714        for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
 715                msg = buf_msg(skb);
 716                hsz = skb_headroom(skb) + msg_hdr_sz(msg);
 717
 718                if (in_own_node(net, msg_orignode(msg)))
 719                        scope = TIPC_NODE_SCOPE;
 720
 721                /* Create destination port list and message clones: */
 722                tipc_nametbl_mc_translate(net,
 723                                          msg_nametype(msg), msg_namelower(msg),
 724                                          msg_nameupper(msg), scope, &dports);
 725                portid = tipc_plist_pop(&dports);
 726                for (; portid; portid = tipc_plist_pop(&dports)) {
 727                        _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
 728                        if (_skb) {
 729                                msg_set_destport(buf_msg(_skb), portid);
 730                                __skb_queue_tail(&tmpq, _skb);
 731                                continue;
 732                        }
 733                        pr_warn("Failed to clone mcast rcv buffer\n");
 734                }
 735                /* Append to inputq if not already done by other thread */
 736                spin_lock_bh(&inputq->lock);
 737                if (skb_peek(arrvq) == skb) {
 738                        skb_queue_splice_tail_init(&tmpq, inputq);
 739                        kfree_skb(__skb_dequeue(arrvq));
 740                }
 741                spin_unlock_bh(&inputq->lock);
 742                __skb_queue_purge(&tmpq);
 743                kfree_skb(skb);
 744        }
 745        tipc_sk_rcv(net, inputq);
 746}
 747
 748/**
 749 * tipc_sk_proto_rcv - receive a connection mng protocol message
 750 * @tsk: receiving socket
 751 * @skb: pointer to message buffer.
 752 */
 753static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
 754                              struct sk_buff_head *xmitq)
 755{
 756        struct sock *sk = &tsk->sk;
 757        u32 onode = tsk_own_node(tsk);
 758        struct tipc_msg *hdr = buf_msg(skb);
 759        int mtyp = msg_type(hdr);
 760        bool conn_cong;
 761
 762        /* Ignore if connection cannot be validated: */
 763        if (!tsk_peer_msg(tsk, hdr))
 764                goto exit;
 765
 766        tsk->probing_state = TIPC_CONN_OK;
 767
 768        if (mtyp == CONN_PROBE) {
 769                msg_set_type(hdr, CONN_PROBE_REPLY);
 770                if (tipc_msg_reverse(onode, &skb, TIPC_OK))
 771                        __skb_queue_tail(xmitq, skb);
 772                return;
 773        } else if (mtyp == CONN_ACK) {
 774                conn_cong = tsk_conn_cong(tsk);
 775                tsk->snt_unacked -= msg_conn_ack(hdr);
 776                if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
 777                        tsk->snd_win = msg_adv_win(hdr);
 778                if (conn_cong)
 779                        sk->sk_write_space(sk);
 780        } else if (mtyp != CONN_PROBE_REPLY) {
 781                pr_warn("Received unknown CONN_PROTO msg\n");
 782        }
 783exit:
 784        kfree_skb(skb);
 785}
 786
 787static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
 788{
 789        struct sock *sk = sock->sk;
 790        struct tipc_sock *tsk = tipc_sk(sk);
 791        DEFINE_WAIT(wait);
 792        int done;
 793
 794        do {
 795                int err = sock_error(sk);
 796                if (err)
 797                        return err;
 798                if (sock->state == SS_DISCONNECTING)
 799                        return -EPIPE;
 800                if (!*timeo_p)
 801                        return -EAGAIN;
 802                if (signal_pending(current))
 803                        return sock_intr_errno(*timeo_p);
 804
 805                prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
 806                done = sk_wait_event(sk, timeo_p, !tsk->link_cong);
 807                finish_wait(sk_sleep(sk), &wait);
 808        } while (!done);
 809        return 0;
 810}
 811
 812/**
 813 * tipc_sendmsg - send message in connectionless manner
 814 * @sock: socket structure
 815 * @m: message to send
 816 * @dsz: amount of user data to be sent
 817 *
 818 * Message must have an destination specified explicitly.
 819 * Used for SOCK_RDM and SOCK_DGRAM messages,
 820 * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
 821 * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
 822 *
 823 * Returns the number of bytes sent on success, or errno otherwise
 824 */
 825static int tipc_sendmsg(struct socket *sock,
 826                        struct msghdr *m, size_t dsz)
 827{
 828        struct sock *sk = sock->sk;
 829        int ret;
 830
 831        lock_sock(sk);
 832        ret = __tipc_sendmsg(sock, m, dsz);
 833        release_sock(sk);
 834
 835        return ret;
 836}
 837
 838static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
 839{
 840        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
 841        struct sock *sk = sock->sk;
 842        struct tipc_sock *tsk = tipc_sk(sk);
 843        struct net *net = sock_net(sk);
 844        struct tipc_msg *mhdr = &tsk->phdr;
 845        u32 dnode, dport;
 846        struct sk_buff_head pktchain;
 847        struct sk_buff *skb;
 848        struct tipc_name_seq *seq;
 849        struct iov_iter save;
 850        u32 mtu;
 851        long timeo;
 852        int rc;
 853
 854        if (dsz > TIPC_MAX_USER_MSG_SIZE)
 855                return -EMSGSIZE;
 856        if (unlikely(!dest)) {
 857                if (tsk->connected && sock->state == SS_READY)
 858                        dest = &tsk->remote;
 859                else
 860                        return -EDESTADDRREQ;
 861        } else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
 862                   dest->family != AF_TIPC) {
 863                return -EINVAL;
 864        }
 865        if (unlikely(sock->state != SS_READY)) {
 866                if (sock->state == SS_LISTENING)
 867                        return -EPIPE;
 868                if (sock->state != SS_UNCONNECTED)
 869                        return -EISCONN;
 870                if (tsk->published)
 871                        return -EOPNOTSUPP;
 872                if (dest->addrtype == TIPC_ADDR_NAME) {
 873                        tsk->conn_type = dest->addr.name.name.type;
 874                        tsk->conn_instance = dest->addr.name.name.instance;
 875                }
 876        }
 877        seq = &dest->addr.nameseq;
 878        timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
 879
 880        if (dest->addrtype == TIPC_ADDR_MCAST) {
 881                return tipc_sendmcast(sock, seq, m, dsz, timeo);
 882        } else if (dest->addrtype == TIPC_ADDR_NAME) {
 883                u32 type = dest->addr.name.name.type;
 884                u32 inst = dest->addr.name.name.instance;
 885                u32 domain = dest->addr.name.domain;
 886
 887                dnode = domain;
 888                msg_set_type(mhdr, TIPC_NAMED_MSG);
 889                msg_set_hdr_sz(mhdr, NAMED_H_SIZE);
 890                msg_set_nametype(mhdr, type);
 891                msg_set_nameinst(mhdr, inst);
 892                msg_set_lookup_scope(mhdr, tipc_addr_scope(domain));
 893                dport = tipc_nametbl_translate(net, type, inst, &dnode);
 894                msg_set_destnode(mhdr, dnode);
 895                msg_set_destport(mhdr, dport);
 896                if (unlikely(!dport && !dnode))
 897                        return -EHOSTUNREACH;
 898        } else if (dest->addrtype == TIPC_ADDR_ID) {
 899                dnode = dest->addr.id.node;
 900                msg_set_type(mhdr, TIPC_DIRECT_MSG);
 901                msg_set_lookup_scope(mhdr, 0);
 902                msg_set_destnode(mhdr, dnode);
 903                msg_set_destport(mhdr, dest->addr.id.ref);
 904                msg_set_hdr_sz(mhdr, BASIC_H_SIZE);
 905        }
 906
 907        skb_queue_head_init(&pktchain);
 908        save = m->msg_iter;
 909new_mtu:
 910        mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
 911        rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain);
 912        if (rc < 0)
 913                return rc;
 914
 915        do {
 916                skb = skb_peek(&pktchain);
 917                TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
 918                rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid);
 919                if (likely(!rc)) {
 920                        if (sock->state != SS_READY)
 921                                sock->state = SS_CONNECTING;
 922                        return dsz;
 923                }
 924                if (rc == -ELINKCONG) {
 925                        tsk->link_cong = 1;
 926                        rc = tipc_wait_for_sndmsg(sock, &timeo);
 927                        if (!rc)
 928                                continue;
 929                }
 930                __skb_queue_purge(&pktchain);
 931                if (rc == -EMSGSIZE) {
 932                        m->msg_iter = save;
 933                        goto new_mtu;
 934                }
 935                break;
 936        } while (1);
 937
 938        return rc;
 939}
 940
 941static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
 942{
 943        struct sock *sk = sock->sk;
 944        struct tipc_sock *tsk = tipc_sk(sk);
 945        DEFINE_WAIT(wait);
 946        int done;
 947
 948        do {
 949                int err = sock_error(sk);
 950                if (err)
 951                        return err;
 952                if (sock->state == SS_DISCONNECTING)
 953                        return -EPIPE;
 954                else if (sock->state != SS_CONNECTED)
 955                        return -ENOTCONN;
 956                if (!*timeo_p)
 957                        return -EAGAIN;
 958                if (signal_pending(current))
 959                        return sock_intr_errno(*timeo_p);
 960
 961                prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
 962                done = sk_wait_event(sk, timeo_p,
 963                                     (!tsk->link_cong &&
 964                                      !tsk_conn_cong(tsk)) ||
 965                                     !tsk->connected);
 966                finish_wait(sk_sleep(sk), &wait);
 967        } while (!done);
 968        return 0;
 969}
 970
 971/**
 972 * tipc_send_stream - send stream-oriented data
 973 * @sock: socket structure
 974 * @m: data to send
 975 * @dsz: total length of data to be transmitted
 976 *
 977 * Used for SOCK_STREAM data.
 978 *
 979 * Returns the number of bytes sent on success (or partial success),
 980 * or errno if no data sent
 981 */
 982static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
 983{
 984        struct sock *sk = sock->sk;
 985        int ret;
 986
 987        lock_sock(sk);
 988        ret = __tipc_send_stream(sock, m, dsz);
 989        release_sock(sk);
 990
 991        return ret;
 992}
 993
 994static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
 995{
 996        struct sock *sk = sock->sk;
 997        struct net *net = sock_net(sk);
 998        struct tipc_sock *tsk = tipc_sk(sk);
 999        struct tipc_msg *mhdr = &tsk->phdr;
1000        struct sk_buff_head pktchain;
1001        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1002        u32 portid = tsk->portid;
1003        int rc = -EINVAL;
1004        long timeo;
1005        u32 dnode;
1006        uint mtu, send, sent = 0;
1007        struct iov_iter save;
1008        int hlen = MIN_H_SIZE;
1009
1010        /* Handle implied connection establishment */
1011        if (unlikely(dest)) {
1012                rc = __tipc_sendmsg(sock, m, dsz);
1013                hlen = msg_hdr_sz(mhdr);
1014                if (dsz && (dsz == rc))
1015                        tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
1016                return rc;
1017        }
1018        if (dsz > (uint)INT_MAX)
1019                return -EMSGSIZE;
1020
1021        if (unlikely(sock->state != SS_CONNECTED)) {
1022                if (sock->state == SS_DISCONNECTING)
1023                        return -EPIPE;
1024                else
1025                        return -ENOTCONN;
1026        }
1027
1028        timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1029        dnode = tsk_peer_node(tsk);
1030        skb_queue_head_init(&pktchain);
1031
1032next:
1033        save = m->msg_iter;
1034        mtu = tsk->max_pkt;
1035        send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1036        rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain);
1037        if (unlikely(rc < 0))
1038                return rc;
1039
1040        do {
1041                if (likely(!tsk_conn_cong(tsk))) {
1042                        rc = tipc_node_xmit(net, &pktchain, dnode, portid);
1043                        if (likely(!rc)) {
1044                                tsk->snt_unacked += tsk_inc(tsk, send + hlen);
1045                                sent += send;
1046                                if (sent == dsz)
1047                                        return dsz;
1048                                goto next;
1049                        }
1050                        if (rc == -EMSGSIZE) {
1051                                __skb_queue_purge(&pktchain);
1052                                tsk->max_pkt = tipc_node_get_mtu(net, dnode,
1053                                                                 portid);
1054                                m->msg_iter = save;
1055                                goto next;
1056                        }
1057                        if (rc != -ELINKCONG)
1058                                break;
1059
1060                        tsk->link_cong = 1;
1061                }
1062                rc = tipc_wait_for_sndpkt(sock, &timeo);
1063        } while (!rc);
1064
1065        __skb_queue_purge(&pktchain);
1066        return sent ? sent : rc;
1067}
1068
1069/**
1070 * tipc_send_packet - send a connection-oriented message
1071 * @sock: socket structure
1072 * @m: message to send
1073 * @dsz: length of data to be transmitted
1074 *
1075 * Used for SOCK_SEQPACKET messages.
1076 *
1077 * Returns the number of bytes sent on success, or errno otherwise
1078 */
1079static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
1080{
1081        if (dsz > TIPC_MAX_USER_MSG_SIZE)
1082                return -EMSGSIZE;
1083
1084        return tipc_send_stream(sock, m, dsz);
1085}
1086
1087/* tipc_sk_finish_conn - complete the setup of a connection
1088 */
1089static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1090                                u32 peer_node)
1091{
1092        struct sock *sk = &tsk->sk;
1093        struct net *net = sock_net(sk);
1094        struct tipc_msg *msg = &tsk->phdr;
1095
1096        msg_set_destnode(msg, peer_node);
1097        msg_set_destport(msg, peer_port);
1098        msg_set_type(msg, TIPC_CONN_MSG);
1099        msg_set_lookup_scope(msg, 0);
1100        msg_set_hdr_sz(msg, SHORT_H_SIZE);
1101
1102        tsk->probing_intv = CONN_PROBING_INTERVAL;
1103        tsk->probing_state = TIPC_CONN_OK;
1104        tsk->connected = 1;
1105        sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
1106        tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1107        tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
1108        tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
1109        if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
1110                return;
1111
1112        /* Fall back to message based flow control */
1113        tsk->rcv_win = FLOWCTL_MSG_WIN;
1114        tsk->snd_win = FLOWCTL_MSG_WIN;
1115}
1116
1117/**
1118 * set_orig_addr - capture sender's address for received message
1119 * @m: descriptor for message info
1120 * @msg: received message header
1121 *
1122 * Note: Address is not captured if not requested by receiver.
1123 */
1124static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
1125{
1126        DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
1127
1128        if (addr) {
1129                addr->family = AF_TIPC;
1130                addr->addrtype = TIPC_ADDR_ID;
1131                memset(&addr->addr, 0, sizeof(addr->addr));
1132                addr->addr.id.ref = msg_origport(msg);
1133                addr->addr.id.node = msg_orignode(msg);
1134                addr->addr.name.domain = 0;     /* could leave uninitialized */
1135                addr->scope = 0;                /* could leave uninitialized */
1136                m->msg_namelen = sizeof(struct sockaddr_tipc);
1137        }
1138}
1139
1140/**
1141 * tipc_sk_anc_data_recv - optionally capture ancillary data for received message
1142 * @m: descriptor for message info
1143 * @msg: received message header
1144 * @tsk: TIPC port associated with message
1145 *
1146 * Note: Ancillary data is not captured if not requested by receiver.
1147 *
1148 * Returns 0 if successful, otherwise errno
1149 */
1150static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1151                                 struct tipc_sock *tsk)
1152{
1153        u32 anc_data[3];
1154        u32 err;
1155        u32 dest_type;
1156        int has_name;
1157        int res;
1158
1159        if (likely(m->msg_controllen == 0))
1160                return 0;
1161
1162        /* Optionally capture errored message object(s) */
1163        err = msg ? msg_errcode(msg) : 0;
1164        if (unlikely(err)) {
1165                anc_data[0] = err;
1166                anc_data[1] = msg_data_sz(msg);
1167                res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
1168                if (res)
1169                        return res;
1170                if (anc_data[1]) {
1171                        res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
1172                                       msg_data(msg));
1173                        if (res)
1174                                return res;
1175                }
1176        }
1177
1178        /* Optionally capture message destination object */
1179        dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
1180        switch (dest_type) {
1181        case TIPC_NAMED_MSG:
1182                has_name = 1;
1183                anc_data[0] = msg_nametype(msg);
1184                anc_data[1] = msg_namelower(msg);
1185                anc_data[2] = msg_namelower(msg);
1186                break;
1187        case TIPC_MCAST_MSG:
1188                has_name = 1;
1189                anc_data[0] = msg_nametype(msg);
1190                anc_data[1] = msg_namelower(msg);
1191                anc_data[2] = msg_nameupper(msg);
1192                break;
1193        case TIPC_CONN_MSG:
1194                has_name = (tsk->conn_type != 0);
1195                anc_data[0] = tsk->conn_type;
1196                anc_data[1] = tsk->conn_instance;
1197                anc_data[2] = tsk->conn_instance;
1198                break;
1199        default:
1200                has_name = 0;
1201        }
1202        if (has_name) {
1203                res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
1204                if (res)
1205                        return res;
1206        }
1207
1208        return 0;
1209}
1210
1211static void tipc_sk_send_ack(struct tipc_sock *tsk)
1212{
1213        struct net *net = sock_net(&tsk->sk);
1214        struct sk_buff *skb = NULL;
1215        struct tipc_msg *msg;
1216        u32 peer_port = tsk_peer_port(tsk);
1217        u32 dnode = tsk_peer_node(tsk);
1218
1219        if (!tsk->connected)
1220                return;
1221        skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0,
1222                              dnode, tsk_own_node(tsk), peer_port,
1223                              tsk->portid, TIPC_OK);
1224        if (!skb)
1225                return;
1226        msg = buf_msg(skb);
1227        msg_set_conn_ack(msg, tsk->rcv_unacked);
1228        tsk->rcv_unacked = 0;
1229
1230        /* Adjust to and advertize the correct window limit */
1231        if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
1232                tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
1233                msg_set_adv_win(msg, tsk->rcv_win);
1234        }
1235        tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1236}
1237
1238static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
1239{
1240        struct sock *sk = sock->sk;
1241        DEFINE_WAIT(wait);
1242        long timeo = *timeop;
1243        int err;
1244
1245        for (;;) {
1246                prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1247                if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1248                        if (sock->state == SS_DISCONNECTING) {
1249                                err = -ENOTCONN;
1250                                break;
1251                        }
1252                        release_sock(sk);
1253                        timeo = schedule_timeout(timeo);
1254                        lock_sock(sk);
1255                }
1256                err = 0;
1257                if (!skb_queue_empty(&sk->sk_receive_queue))
1258                        break;
1259                err = -EAGAIN;
1260                if (!timeo)
1261                        break;
1262                err = sock_intr_errno(timeo);
1263                if (signal_pending(current))
1264                        break;
1265        }
1266        finish_wait(sk_sleep(sk), &wait);
1267        *timeop = timeo;
1268        return err;
1269}
1270
1271/**
1272 * tipc_recvmsg - receive packet-oriented message
1273 * @m: descriptor for message info
1274 * @buf_len: total size of user buffer area
1275 * @flags: receive flags
1276 *
1277 * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1278 * If the complete message doesn't fit in user area, truncate it.
1279 *
1280 * Returns size of returned message data, errno otherwise
1281 */
1282static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
1283                        int flags)
1284{
1285        struct sock *sk = sock->sk;
1286        struct tipc_sock *tsk = tipc_sk(sk);
1287        struct sk_buff *buf;
1288        struct tipc_msg *msg;
1289        long timeo;
1290        unsigned int sz;
1291        u32 err;
1292        int res, hlen;
1293
1294        /* Catch invalid receive requests */
1295        if (unlikely(!buf_len))
1296                return -EINVAL;
1297
1298        lock_sock(sk);
1299
1300        if (unlikely(sock->state == SS_UNCONNECTED)) {
1301                res = -ENOTCONN;
1302                goto exit;
1303        }
1304
1305        timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1306restart:
1307
1308        /* Look for a message in receive queue; wait if necessary */
1309        res = tipc_wait_for_rcvmsg(sock, &timeo);
1310        if (res)
1311                goto exit;
1312
1313        /* Look at first message in receive queue */
1314        buf = skb_peek(&sk->sk_receive_queue);
1315        msg = buf_msg(buf);
1316        sz = msg_data_sz(msg);
1317        hlen = msg_hdr_sz(msg);
1318        err = msg_errcode(msg);
1319
1320        /* Discard an empty non-errored message & try again */
1321        if ((!sz) && (!err)) {
1322                tsk_advance_rx_queue(sk);
1323                goto restart;
1324        }
1325
1326        /* Capture sender's address (optional) */
1327        set_orig_addr(m, msg);
1328
1329        /* Capture ancillary data (optional) */
1330        res = tipc_sk_anc_data_recv(m, msg, tsk);
1331        if (res)
1332                goto exit;
1333
1334        /* Capture message data (if valid) & compute return value (always) */
1335        if (!err) {
1336                if (unlikely(buf_len < sz)) {
1337                        sz = buf_len;
1338                        m->msg_flags |= MSG_TRUNC;
1339                }
1340                res = skb_copy_datagram_msg(buf, hlen, m, sz);
1341                if (res)
1342                        goto exit;
1343                res = sz;
1344        } else {
1345                if ((sock->state == SS_READY) ||
1346                    ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1347                        res = 0;
1348                else
1349                        res = -ECONNRESET;
1350        }
1351
1352        if (unlikely(flags & MSG_PEEK))
1353                goto exit;
1354
1355        if (likely(sock->state != SS_READY)) {
1356                tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
1357                if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
1358                        tipc_sk_send_ack(tsk);
1359        }
1360        tsk_advance_rx_queue(sk);
1361exit:
1362        release_sock(sk);
1363        return res;
1364}
1365
1366/**
1367 * tipc_recv_stream - receive stream-oriented data
1368 * @m: descriptor for message info
1369 * @buf_len: total size of user buffer area
1370 * @flags: receive flags
1371 *
1372 * Used for SOCK_STREAM messages only.  If not enough data is available
1373 * will optionally wait for more; never truncates data.
1374 *
1375 * Returns size of returned message data, errno otherwise
1376 */
1377static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
1378                            size_t buf_len, int flags)
1379{
1380        struct sock *sk = sock->sk;
1381        struct tipc_sock *tsk = tipc_sk(sk);
1382        struct sk_buff *buf;
1383        struct tipc_msg *msg;
1384        long timeo;
1385        unsigned int sz;
1386        int sz_to_copy, target, needed;
1387        int sz_copied = 0;
1388        u32 err;
1389        int res = 0, hlen;
1390
1391        /* Catch invalid receive attempts */
1392        if (unlikely(!buf_len))
1393                return -EINVAL;
1394
1395        lock_sock(sk);
1396
1397        if (unlikely(sock->state == SS_UNCONNECTED)) {
1398                res = -ENOTCONN;
1399                goto exit;
1400        }
1401
1402        target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1403        timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1404
1405restart:
1406        /* Look for a message in receive queue; wait if necessary */
1407        res = tipc_wait_for_rcvmsg(sock, &timeo);
1408        if (res)
1409                goto exit;
1410
1411        /* Look at first message in receive queue */
1412        buf = skb_peek(&sk->sk_receive_queue);
1413        msg = buf_msg(buf);
1414        sz = msg_data_sz(msg);
1415        hlen = msg_hdr_sz(msg);
1416        err = msg_errcode(msg);
1417
1418        /* Discard an empty non-errored message & try again */
1419        if ((!sz) && (!err)) {
1420                tsk_advance_rx_queue(sk);
1421                goto restart;
1422        }
1423
1424        /* Optionally capture sender's address & ancillary data of first msg */
1425        if (sz_copied == 0) {
1426                set_orig_addr(m, msg);
1427                res = tipc_sk_anc_data_recv(m, msg, tsk);
1428                if (res)
1429                        goto exit;
1430        }
1431
1432        /* Capture message data (if valid) & compute return value (always) */
1433        if (!err) {
1434                u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1435
1436                sz -= offset;
1437                needed = (buf_len - sz_copied);
1438                sz_to_copy = (sz <= needed) ? sz : needed;
1439
1440                res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy);
1441                if (res)
1442                        goto exit;
1443
1444                sz_copied += sz_to_copy;
1445
1446                if (sz_to_copy < sz) {
1447                        if (!(flags & MSG_PEEK))
1448                                TIPC_SKB_CB(buf)->handle =
1449                                (void *)(unsigned long)(offset + sz_to_copy);
1450                        goto exit;
1451                }
1452        } else {
1453                if (sz_copied != 0)
1454                        goto exit; /* can't add error msg to valid data */
1455
1456                if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1457                        res = 0;
1458                else
1459                        res = -ECONNRESET;
1460        }
1461
1462        if (unlikely(flags & MSG_PEEK))
1463                goto exit;
1464
1465        tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
1466        if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
1467                tipc_sk_send_ack(tsk);
1468        tsk_advance_rx_queue(sk);
1469
1470        /* Loop around if more data is required */
1471        if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1472            (!skb_queue_empty(&sk->sk_receive_queue) ||
1473            (sz_copied < target)) &&    /* and more is ready or required */
1474            (!err))                     /* and haven't reached a FIN */
1475                goto restart;
1476
1477exit:
1478        release_sock(sk);
1479        return sz_copied ? sz_copied : res;
1480}
1481
1482/**
1483 * tipc_write_space - wake up thread if port congestion is released
1484 * @sk: socket
1485 */
1486static void tipc_write_space(struct sock *sk)
1487{
1488        struct socket_wq *wq;
1489
1490        rcu_read_lock();
1491        wq = rcu_dereference(sk->sk_wq);
1492        if (skwq_has_sleeper(wq))
1493                wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1494                                                POLLWRNORM | POLLWRBAND);
1495        rcu_read_unlock();
1496}
1497
1498/**
1499 * tipc_data_ready - wake up threads to indicate messages have been received
1500 * @sk: socket
1501 * @len: the length of messages
1502 */
1503static void tipc_data_ready(struct sock *sk)
1504{
1505        struct socket_wq *wq;
1506
1507        rcu_read_lock();
1508        wq = rcu_dereference(sk->sk_wq);
1509        if (skwq_has_sleeper(wq))
1510                wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1511                                                POLLRDNORM | POLLRDBAND);
1512        rcu_read_unlock();
1513}
1514
1515static void tipc_sock_destruct(struct sock *sk)
1516{
1517        __skb_queue_purge(&sk->sk_receive_queue);
1518}
1519
1520/**
1521 * filter_connect - Handle all incoming messages for a connection-based socket
1522 * @tsk: TIPC socket
1523 * @skb: pointer to message buffer. Set to NULL if buffer is consumed
1524 *
1525 * Returns true if everything ok, false otherwise
1526 */
1527static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
1528{
1529        struct sock *sk = &tsk->sk;
1530        struct net *net = sock_net(sk);
1531        struct socket *sock = sk->sk_socket;
1532        struct tipc_msg *hdr = buf_msg(skb);
1533
1534        if (unlikely(msg_mcast(hdr)))
1535                return false;
1536
1537        switch ((int)sock->state) {
1538        case SS_CONNECTED:
1539
1540                /* Accept only connection-based messages sent by peer */
1541                if (unlikely(!tsk_peer_msg(tsk, hdr)))
1542                        return false;
1543
1544                if (unlikely(msg_errcode(hdr))) {
1545                        sock->state = SS_DISCONNECTING;
1546                        tsk->connected = 0;
1547                        /* Let timer expire on it's own */
1548                        tipc_node_remove_conn(net, tsk_peer_node(tsk),
1549                                              tsk->portid);
1550                }
1551                return true;
1552
1553        case SS_CONNECTING:
1554
1555                /* Accept only ACK or NACK message */
1556                if (unlikely(!msg_connected(hdr)))
1557                        return false;
1558
1559                if (unlikely(msg_errcode(hdr))) {
1560                        sock->state = SS_DISCONNECTING;
1561                        sk->sk_err = ECONNREFUSED;
1562                        return true;
1563                }
1564
1565                if (unlikely(!msg_isdata(hdr))) {
1566                        sock->state = SS_DISCONNECTING;
1567                        sk->sk_err = EINVAL;
1568                        return true;
1569                }
1570
1571                tipc_sk_finish_conn(tsk, msg_origport(hdr), msg_orignode(hdr));
1572                msg_set_importance(&tsk->phdr, msg_importance(hdr));
1573                sock->state = SS_CONNECTED;
1574
1575                /* If 'ACK+' message, add to socket receive queue */
1576                if (msg_data_sz(hdr))
1577                        return true;
1578
1579                /* If empty 'ACK-' message, wake up sleeping connect() */
1580                if (waitqueue_active(sk_sleep(sk)))
1581                        wake_up_interruptible(sk_sleep(sk));
1582
1583                /* 'ACK-' message is neither accepted nor rejected: */
1584                msg_set_dest_droppable(hdr, 1);
1585                return false;
1586
1587        case SS_LISTENING:
1588        case SS_UNCONNECTED:
1589
1590                /* Accept only SYN message */
1591                if (!msg_connected(hdr) && !(msg_errcode(hdr)))
1592                        return true;
1593                break;
1594        case SS_DISCONNECTING:
1595                break;
1596        default:
1597                pr_err("Unknown socket state %u\n", sock->state);
1598        }
1599        return false;
1600}
1601
1602/**
1603 * rcvbuf_limit - get proper overload limit of socket receive queue
1604 * @sk: socket
1605 * @skb: message
1606 *
1607 * For connection oriented messages, irrespective of importance,
1608 * default queue limit is 2 MB.
1609 *
1610 * For connectionless messages, queue limits are based on message
1611 * importance as follows:
1612 *
1613 * TIPC_LOW_IMPORTANCE       (2 MB)
1614 * TIPC_MEDIUM_IMPORTANCE    (4 MB)
1615 * TIPC_HIGH_IMPORTANCE      (8 MB)
1616 * TIPC_CRITICAL_IMPORTANCE  (16 MB)
1617 *
1618 * Returns overload limit according to corresponding message importance
1619 */
1620static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
1621{
1622        struct tipc_sock *tsk = tipc_sk(sk);
1623        struct tipc_msg *hdr = buf_msg(skb);
1624
1625        if (unlikely(!msg_connected(hdr)))
1626                return sk->sk_rcvbuf << msg_importance(hdr);
1627
1628        if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
1629                return sk->sk_rcvbuf;
1630
1631        return FLOWCTL_MSG_LIM;
1632}
1633
1634/**
1635 * filter_rcv - validate incoming message
1636 * @sk: socket
1637 * @skb: pointer to message.
1638 *
1639 * Enqueues message on receive queue if acceptable; optionally handles
1640 * disconnect indication for a connected socket.
1641 *
1642 * Called with socket lock already taken
1643 *
1644 * Returns true if message was added to socket receive queue, otherwise false
1645 */
1646static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
1647                       struct sk_buff_head *xmitq)
1648{
1649        struct socket *sock = sk->sk_socket;
1650        struct tipc_sock *tsk = tipc_sk(sk);
1651        struct tipc_msg *hdr = buf_msg(skb);
1652        unsigned int limit = rcvbuf_limit(sk, skb);
1653        int err = TIPC_OK;
1654        int usr = msg_user(hdr);
1655
1656        if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
1657                tipc_sk_proto_rcv(tsk, skb, xmitq);
1658                return false;
1659        }
1660
1661        if (unlikely(usr == SOCK_WAKEUP)) {
1662                kfree_skb(skb);
1663                tsk->link_cong = 0;
1664                sk->sk_write_space(sk);
1665                return false;
1666        }
1667
1668        /* Drop if illegal message type */
1669        if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) {
1670                kfree_skb(skb);
1671                return false;
1672        }
1673
1674        /* Reject if wrong message type for current socket state */
1675        if (unlikely(sock->state == SS_READY)) {
1676                if (msg_connected(hdr)) {
1677                        err = TIPC_ERR_NO_PORT;
1678                        goto reject;
1679                }
1680        } else if (unlikely(!filter_connect(tsk, skb))) {
1681                err = TIPC_ERR_NO_PORT;
1682                goto reject;
1683        }
1684
1685        /* Reject message if there isn't room to queue it */
1686        if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) {
1687                err = TIPC_ERR_OVERLOAD;
1688                goto reject;
1689        }
1690
1691        /* Enqueue message */
1692        TIPC_SKB_CB(skb)->handle = NULL;
1693        __skb_queue_tail(&sk->sk_receive_queue, skb);
1694        skb_set_owner_r(skb, sk);
1695
1696        sk->sk_data_ready(sk);
1697        return true;
1698
1699reject:
1700        if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
1701                __skb_queue_tail(xmitq, skb);
1702        return false;
1703}
1704
1705/**
1706 * tipc_backlog_rcv - handle incoming message from backlog queue
1707 * @sk: socket
1708 * @skb: message
1709 *
1710 * Caller must hold socket lock
1711 *
1712 * Returns 0
1713 */
1714static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1715{
1716        unsigned int truesize = skb->truesize;
1717        struct sk_buff_head xmitq;
1718        u32 dnode, selector;
1719
1720        __skb_queue_head_init(&xmitq);
1721
1722        if (likely(filter_rcv(sk, skb, &xmitq))) {
1723                atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt);
1724                return 0;
1725        }
1726
1727        if (skb_queue_empty(&xmitq))
1728                return 0;
1729
1730        /* Send response/rejected message */
1731        skb = __skb_dequeue(&xmitq);
1732        dnode = msg_destnode(buf_msg(skb));
1733        selector = msg_origport(buf_msg(skb));
1734        tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
1735        return 0;
1736}
1737
1738/**
1739 * tipc_sk_enqueue - extract all buffers with destination 'dport' from
1740 *                   inputq and try adding them to socket or backlog queue
1741 * @inputq: list of incoming buffers with potentially different destinations
1742 * @sk: socket where the buffers should be enqueued
1743 * @dport: port number for the socket
1744 *
1745 * Caller must hold socket lock
1746 */
1747static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1748                            u32 dport, struct sk_buff_head *xmitq)
1749{
1750        unsigned long time_limit = jiffies + 2;
1751        struct sk_buff *skb;
1752        unsigned int lim;
1753        atomic_t *dcnt;
1754        u32 onode;
1755
1756        while (skb_queue_len(inputq)) {
1757                if (unlikely(time_after_eq(jiffies, time_limit)))
1758                        return;
1759
1760                skb = tipc_skb_dequeue(inputq, dport);
1761                if (unlikely(!skb))
1762                        return;
1763
1764                /* Add message directly to receive queue if possible */
1765                if (!sock_owned_by_user(sk)) {
1766                        filter_rcv(sk, skb, xmitq);
1767                        continue;
1768                }
1769
1770                /* Try backlog, compensating for double-counted bytes */
1771                dcnt = &tipc_sk(sk)->dupl_rcvcnt;
1772                if (!sk->sk_backlog.len)
1773                        atomic_set(dcnt, 0);
1774                lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
1775                if (likely(!sk_add_backlog(sk, skb, lim)))
1776                        continue;
1777
1778                /* Overload => reject message back to sender */
1779                onode = tipc_own_addr(sock_net(sk));
1780                if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
1781                        __skb_queue_tail(xmitq, skb);
1782                break;
1783        }
1784}
1785
1786/**
1787 * tipc_sk_rcv - handle a chain of incoming buffers
1788 * @inputq: buffer list containing the buffers
1789 * Consumes all buffers in list until inputq is empty
1790 * Note: may be called in multiple threads referring to the same queue
1791 */
1792void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1793{
1794        struct sk_buff_head xmitq;
1795        u32 dnode, dport = 0;
1796        int err;
1797        struct tipc_sock *tsk;
1798        struct sock *sk;
1799        struct sk_buff *skb;
1800
1801        __skb_queue_head_init(&xmitq);
1802        while (skb_queue_len(inputq)) {
1803                dport = tipc_skb_peek_port(inputq, dport);
1804                tsk = tipc_sk_lookup(net, dport);
1805
1806                if (likely(tsk)) {
1807                        sk = &tsk->sk;
1808                        if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
1809                                tipc_sk_enqueue(inputq, sk, dport, &xmitq);
1810                                spin_unlock_bh(&sk->sk_lock.slock);
1811                        }
1812                        /* Send pending response/rejected messages, if any */
1813                        while ((skb = __skb_dequeue(&xmitq))) {
1814                                dnode = msg_destnode(buf_msg(skb));
1815                                tipc_node_xmit_skb(net, skb, dnode, dport);
1816                        }
1817                        sock_put(sk);
1818                        continue;
1819                }
1820
1821                /* No destination socket => dequeue skb if still there */
1822                skb = tipc_skb_dequeue(inputq, dport);
1823                if (!skb)
1824                        return;
1825
1826                /* Try secondary lookup if unresolved named message */
1827                err = TIPC_ERR_NO_PORT;
1828                if (tipc_msg_lookup_dest(net, skb, &err))
1829                        goto xmit;
1830
1831                /* Prepare for message rejection */
1832                if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
1833                        continue;
1834xmit:
1835                dnode = msg_destnode(buf_msg(skb));
1836                tipc_node_xmit_skb(net, skb, dnode, dport);
1837        }
1838}
1839
1840static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1841{
1842        struct sock *sk = sock->sk;
1843        DEFINE_WAIT(wait);
1844        int done;
1845
1846        do {
1847                int err = sock_error(sk);
1848                if (err)
1849                        return err;
1850                if (!*timeo_p)
1851                        return -ETIMEDOUT;
1852                if (signal_pending(current))
1853                        return sock_intr_errno(*timeo_p);
1854
1855                prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1856                done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1857                finish_wait(sk_sleep(sk), &wait);
1858        } while (!done);
1859        return 0;
1860}
1861
1862/**
1863 * tipc_connect - establish a connection to another TIPC port
1864 * @sock: socket structure
1865 * @dest: socket address for destination port
1866 * @destlen: size of socket address data structure
1867 * @flags: file-related flags associated with socket
1868 *
1869 * Returns 0 on success, errno otherwise
1870 */
1871static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1872                        int destlen, int flags)
1873{
1874        struct sock *sk = sock->sk;
1875        struct tipc_sock *tsk = tipc_sk(sk);
1876        struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1877        struct msghdr m = {NULL,};
1878        long timeout = (flags & O_NONBLOCK) ? 0 : tsk->conn_timeout;
1879        socket_state previous;
1880        int res = 0;
1881
1882        lock_sock(sk);
1883
1884        /* DGRAM/RDM connect(), just save the destaddr */
1885        if (sock->state == SS_READY) {
1886                if (dst->family == AF_UNSPEC) {
1887                        memset(&tsk->remote, 0, sizeof(struct sockaddr_tipc));
1888                        tsk->connected = 0;
1889                } else if (destlen != sizeof(struct sockaddr_tipc)) {
1890                        res = -EINVAL;
1891                } else {
1892                        memcpy(&tsk->remote, dest, destlen);
1893                        tsk->connected = 1;
1894                }
1895                goto exit;
1896        }
1897
1898        /*
1899         * Reject connection attempt using multicast address
1900         *
1901         * Note: send_msg() validates the rest of the address fields,
1902         *       so there's no need to do it here
1903         */
1904        if (dst->addrtype == TIPC_ADDR_MCAST) {
1905                res = -EINVAL;
1906                goto exit;
1907        }
1908
1909        previous = sock->state;
1910        switch (sock->state) {
1911        case SS_UNCONNECTED:
1912                /* Send a 'SYN-' to destination */
1913                m.msg_name = dest;
1914                m.msg_namelen = destlen;
1915
1916                /* If connect is in non-blocking case, set MSG_DONTWAIT to
1917                 * indicate send_msg() is never blocked.
1918                 */
1919                if (!timeout)
1920                        m.msg_flags = MSG_DONTWAIT;
1921
1922                res = __tipc_sendmsg(sock, &m, 0);
1923                if ((res < 0) && (res != -EWOULDBLOCK))
1924                        goto exit;
1925
1926                /* Just entered SS_CONNECTING state; the only
1927                 * difference is that return value in non-blocking
1928                 * case is EINPROGRESS, rather than EALREADY.
1929                 */
1930                res = -EINPROGRESS;
1931        case SS_CONNECTING:
1932                if (previous == SS_CONNECTING)
1933                        res = -EALREADY;
1934                if (!timeout)
1935                        goto exit;
1936                timeout = msecs_to_jiffies(timeout);
1937                /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1938                res = tipc_wait_for_connect(sock, &timeout);
1939                break;
1940        case SS_CONNECTED:
1941                res = -EISCONN;
1942                break;
1943        default:
1944                res = -EINVAL;
1945                break;
1946        }
1947exit:
1948        release_sock(sk);
1949        return res;
1950}
1951
1952/**
1953 * tipc_listen - allow socket to listen for incoming connections
1954 * @sock: socket structure
1955 * @len: (unused)
1956 *
1957 * Returns 0 on success, errno otherwise
1958 */
1959static int tipc_listen(struct socket *sock, int len)
1960{
1961        struct sock *sk = sock->sk;
1962        int res;
1963
1964        lock_sock(sk);
1965
1966        if (sock->state != SS_UNCONNECTED)
1967                res = -EINVAL;
1968        else {
1969                sock->state = SS_LISTENING;
1970                res = 0;
1971        }
1972
1973        release_sock(sk);
1974        return res;
1975}
1976
1977static int tipc_wait_for_accept(struct socket *sock, long timeo)
1978{
1979        struct sock *sk = sock->sk;
1980        DEFINE_WAIT(wait);
1981        int err;
1982
1983        /* True wake-one mechanism for incoming connections: only
1984         * one process gets woken up, not the 'whole herd'.
1985         * Since we do not 'race & poll' for established sockets
1986         * anymore, the common case will execute the loop only once.
1987        */
1988        for (;;) {
1989                prepare_to_wait_exclusive(sk_sleep(sk), &wait,
1990                                          TASK_INTERRUPTIBLE);
1991                if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1992                        release_sock(sk);
1993                        timeo = schedule_timeout(timeo);
1994                        lock_sock(sk);
1995                }
1996                err = 0;
1997                if (!skb_queue_empty(&sk->sk_receive_queue))
1998                        break;
1999                err = -EINVAL;
2000                if (sock->state != SS_LISTENING)
2001                        break;
2002                err = -EAGAIN;
2003                if (!timeo)
2004                        break;
2005                err = sock_intr_errno(timeo);
2006                if (signal_pending(current))
2007                        break;
2008        }
2009        finish_wait(sk_sleep(sk), &wait);
2010        return err;
2011}
2012
2013/**
2014 * tipc_accept - wait for connection request
2015 * @sock: listening socket
2016 * @newsock: new socket that is to be connected
2017 * @flags: file-related flags associated with socket
2018 *
2019 * Returns 0 on success, errno otherwise
2020 */
2021static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
2022{
2023        struct sock *new_sk, *sk = sock->sk;
2024        struct sk_buff *buf;
2025        struct tipc_sock *new_tsock;
2026        struct tipc_msg *msg;
2027        long timeo;
2028        int res;
2029
2030        lock_sock(sk);
2031
2032        if (sock->state != SS_LISTENING) {
2033                res = -EINVAL;
2034                goto exit;
2035        }
2036        timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
2037        res = tipc_wait_for_accept(sock, timeo);
2038        if (res)
2039                goto exit;
2040
2041        buf = skb_peek(&sk->sk_receive_queue);
2042
2043        res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
2044        if (res)
2045                goto exit;
2046        security_sk_clone(sock->sk, new_sock->sk);
2047
2048        new_sk = new_sock->sk;
2049        new_tsock = tipc_sk(new_sk);
2050        msg = buf_msg(buf);
2051
2052        /* we lock on new_sk; but lockdep sees the lock on sk */
2053        lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
2054
2055        /*
2056         * Reject any stray messages received by new socket
2057         * before the socket lock was taken (very, very unlikely)
2058         */
2059        tsk_rej_rx_queue(new_sk);
2060
2061        /* Connect new socket to it's peer */
2062        tipc_sk_finish_conn(new_tsock, msg_origport(msg), msg_orignode(msg));
2063        new_sock->state = SS_CONNECTED;
2064
2065        tsk_set_importance(new_tsock, msg_importance(msg));
2066        if (msg_named(msg)) {
2067                new_tsock->conn_type = msg_nametype(msg);
2068                new_tsock->conn_instance = msg_nameinst(msg);
2069        }
2070
2071        /*
2072         * Respond to 'SYN-' by discarding it & returning 'ACK'-.
2073         * Respond to 'SYN+' by queuing it on new socket.
2074         */
2075        if (!msg_data_sz(msg)) {
2076                struct msghdr m = {NULL,};
2077
2078                tsk_advance_rx_queue(sk);
2079                __tipc_send_stream(new_sock, &m, 0);
2080        } else {
2081                __skb_dequeue(&sk->sk_receive_queue);
2082                __skb_queue_head(&new_sk->sk_receive_queue, buf);
2083                skb_set_owner_r(buf, new_sk);
2084        }
2085        release_sock(new_sk);
2086exit:
2087        release_sock(sk);
2088        return res;
2089}
2090
2091/**
2092 * tipc_shutdown - shutdown socket connection
2093 * @sock: socket structure
2094 * @how: direction to close (must be SHUT_RDWR)
2095 *
2096 * Terminates connection (if necessary), then purges socket's receive queue.
2097 *
2098 * Returns 0 on success, errno otherwise
2099 */
2100static int tipc_shutdown(struct socket *sock, int how)
2101{
2102        struct sock *sk = sock->sk;
2103        struct net *net = sock_net(sk);
2104        struct tipc_sock *tsk = tipc_sk(sk);
2105        struct sk_buff *skb;
2106        u32 dnode = tsk_peer_node(tsk);
2107        u32 dport = tsk_peer_port(tsk);
2108        u32 onode = tipc_own_addr(net);
2109        u32 oport = tsk->portid;
2110        int res;
2111
2112        if (how != SHUT_RDWR)
2113                return -EINVAL;
2114
2115        lock_sock(sk);
2116
2117        switch (sock->state) {
2118        case SS_CONNECTING:
2119        case SS_CONNECTED:
2120
2121restart:
2122                dnode = tsk_peer_node(tsk);
2123
2124                /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
2125                skb = __skb_dequeue(&sk->sk_receive_queue);
2126                if (skb) {
2127                        if (TIPC_SKB_CB(skb)->handle != NULL) {
2128                                kfree_skb(skb);
2129                                goto restart;
2130                        }
2131                        tipc_sk_respond(sk, skb, TIPC_CONN_SHUTDOWN);
2132                } else {
2133                        skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2134                                              TIPC_CONN_MSG, SHORT_H_SIZE,
2135                                              0, dnode, onode, dport, oport,
2136                                              TIPC_CONN_SHUTDOWN);
2137                        if (skb)
2138                                tipc_node_xmit_skb(net, skb, dnode, tsk->portid);
2139                }
2140                tsk->connected = 0;
2141                sock->state = SS_DISCONNECTING;
2142                tipc_node_remove_conn(net, dnode, tsk->portid);
2143                /* fall through */
2144
2145        case SS_DISCONNECTING:
2146
2147                /* Discard any unreceived messages */
2148                __skb_queue_purge(&sk->sk_receive_queue);
2149
2150                /* Wake up anyone sleeping in poll */
2151                sk->sk_state_change(sk);
2152                res = 0;
2153                break;
2154
2155        default:
2156                res = -ENOTCONN;
2157        }
2158
2159        release_sock(sk);
2160        return res;
2161}
2162
2163static void tipc_sk_timeout(unsigned long data)
2164{
2165        struct tipc_sock *tsk = (struct tipc_sock *)data;
2166        struct sock *sk = &tsk->sk;
2167        struct sk_buff *skb = NULL;
2168        u32 peer_port, peer_node;
2169        u32 own_node = tsk_own_node(tsk);
2170
2171        bh_lock_sock(sk);
2172        if (!tsk->connected) {
2173                bh_unlock_sock(sk);
2174                goto exit;
2175        }
2176        peer_port = tsk_peer_port(tsk);
2177        peer_node = tsk_peer_node(tsk);
2178
2179        if (tsk->probing_state == TIPC_CONN_PROBING) {
2180                if (!sock_owned_by_user(sk)) {
2181                        sk->sk_socket->state = SS_DISCONNECTING;
2182                        tsk->connected = 0;
2183                        tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk),
2184                                              tsk_peer_port(tsk));
2185                        sk->sk_state_change(sk);
2186                } else {
2187                        /* Try again later */
2188                        sk_reset_timer(sk, &sk->sk_timer, (HZ / 20));
2189                }
2190
2191        } else {
2192                skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE,
2193                                      INT_H_SIZE, 0, peer_node, own_node,
2194                                      peer_port, tsk->portid, TIPC_OK);
2195                tsk->probing_state = TIPC_CONN_PROBING;
2196                sk_reset_timer(sk, &sk->sk_timer, jiffies + tsk->probing_intv);
2197        }
2198        bh_unlock_sock(sk);
2199        if (skb)
2200                tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid);
2201exit:
2202        sock_put(sk);
2203}
2204
2205static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
2206                           struct tipc_name_seq const *seq)
2207{
2208        struct net *net = sock_net(&tsk->sk);
2209        struct publication *publ;
2210        u32 key;
2211
2212        if (tsk->connected)
2213                return -EINVAL;
2214        key = tsk->portid + tsk->pub_count + 1;
2215        if (key == tsk->portid)
2216                return -EADDRINUSE;
2217
2218        publ = tipc_nametbl_publish(net, seq->type, seq->lower, seq->upper,
2219                                    scope, tsk->portid, key);
2220        if (unlikely(!publ))
2221                return -EINVAL;
2222
2223        list_add(&publ->pport_list, &tsk->publications);
2224        tsk->pub_count++;
2225        tsk->published = 1;
2226        return 0;
2227}
2228
2229static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2230                            struct tipc_name_seq const *seq)
2231{
2232        struct net *net = sock_net(&tsk->sk);
2233        struct publication *publ;
2234        struct publication *safe;
2235        int rc = -EINVAL;
2236
2237        list_for_each_entry_safe(publ, safe, &tsk->publications, pport_list) {
2238                if (seq) {
2239                        if (publ->scope != scope)
2240                                continue;
2241                        if (publ->type != seq->type)
2242                                continue;
2243                        if (publ->lower != seq->lower)
2244                                continue;
2245                        if (publ->upper != seq->upper)
2246                                break;
2247                        tipc_nametbl_withdraw(net, publ->type, publ->lower,
2248                                              publ->ref, publ->key);
2249                        rc = 0;
2250                        break;
2251                }
2252                tipc_nametbl_withdraw(net, publ->type, publ->lower,
2253                                      publ->ref, publ->key);
2254                rc = 0;
2255        }
2256        if (list_empty(&tsk->publications))
2257                tsk->published = 0;
2258        return rc;
2259}
2260
2261/* tipc_sk_reinit: set non-zero address in all existing sockets
2262 *                 when we go from standalone to network mode.
2263 */
2264void tipc_sk_reinit(struct net *net)
2265{
2266        struct tipc_net *tn = net_generic(net, tipc_net_id);
2267        const struct bucket_table *tbl;
2268        struct rhash_head *pos;
2269        struct tipc_sock *tsk;
2270        struct tipc_msg *msg;
2271        int i;
2272
2273        rcu_read_lock();
2274        tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2275        for (i = 0; i < tbl->size; i++) {
2276                rht_for_each_entry_rcu(tsk, pos, tbl, i, node) {
2277                        spin_lock_bh(&tsk->sk.sk_lock.slock);
2278                        msg = &tsk->phdr;
2279                        msg_set_prevnode(msg, tn->own_addr);
2280                        msg_set_orignode(msg, tn->own_addr);
2281                        spin_unlock_bh(&tsk->sk.sk_lock.slock);
2282                }
2283        }
2284        rcu_read_unlock();
2285}
2286
2287static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
2288{
2289        struct tipc_net *tn = net_generic(net, tipc_net_id);
2290        struct tipc_sock *tsk;
2291
2292        rcu_read_lock();
2293        tsk = rhashtable_lookup_fast(&tn->sk_rht, &portid, tsk_rht_params);
2294        if (tsk)
2295                sock_hold(&tsk->sk);
2296        rcu_read_unlock();
2297
2298        return tsk;
2299}
2300
2301static int tipc_sk_insert(struct tipc_sock *tsk)
2302{
2303        struct sock *sk = &tsk->sk;
2304        struct net *net = sock_net(sk);
2305        struct tipc_net *tn = net_generic(net, tipc_net_id);
2306        u32 remaining = (TIPC_MAX_PORT - TIPC_MIN_PORT) + 1;
2307        u32 portid = prandom_u32() % remaining + TIPC_MIN_PORT;
2308
2309        while (remaining--) {
2310                portid++;
2311                if ((portid < TIPC_MIN_PORT) || (portid > TIPC_MAX_PORT))
2312                        portid = TIPC_MIN_PORT;
2313                tsk->portid = portid;
2314                sock_hold(&tsk->sk);
2315                if (!rhashtable_lookup_insert_fast(&tn->sk_rht, &tsk->node,
2316                                                   tsk_rht_params))
2317                        return 0;
2318                sock_put(&tsk->sk);
2319        }
2320
2321        return -1;
2322}
2323
2324static void tipc_sk_remove(struct tipc_sock *tsk)
2325{
2326        struct sock *sk = &tsk->sk;
2327        struct tipc_net *tn = net_generic(sock_net(sk), tipc_net_id);
2328
2329        if (!rhashtable_remove_fast(&tn->sk_rht, &tsk->node, tsk_rht_params)) {
2330                WARN_ON(atomic_read(&sk->sk_refcnt) == 1);
2331                __sock_put(sk);
2332        }
2333}
2334
2335static const struct rhashtable_params tsk_rht_params = {
2336        .nelem_hint = 192,
2337        .head_offset = offsetof(struct tipc_sock, node),
2338        .key_offset = offsetof(struct tipc_sock, portid),
2339        .key_len = sizeof(u32), /* portid */
2340        .max_size = 1048576,
2341        .min_size = 256,
2342        .automatic_shrinking = true,
2343};
2344
2345int tipc_sk_rht_init(struct net *net)
2346{
2347        struct tipc_net *tn = net_generic(net, tipc_net_id);
2348
2349        return rhashtable_init(&tn->sk_rht, &tsk_rht_params);
2350}
2351
2352void tipc_sk_rht_destroy(struct net *net)
2353{
2354        struct tipc_net *tn = net_generic(net, tipc_net_id);
2355
2356        /* Wait for socket readers to complete */
2357        synchronize_net();
2358
2359        rhashtable_destroy(&tn->sk_rht);
2360}
2361
2362/**
2363 * tipc_setsockopt - set socket option
2364 * @sock: socket structure
2365 * @lvl: option level
2366 * @opt: option identifier
2367 * @ov: pointer to new option value
2368 * @ol: length of option value
2369 *
2370 * For stream sockets only, accepts and ignores all IPPROTO_TCP options
2371 * (to ease compatibility).
2372 *
2373 * Returns 0 on success, errno otherwise
2374 */
2375static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2376                           char __user *ov, unsigned int ol)
2377{
2378        struct sock *sk = sock->sk;
2379        struct tipc_sock *tsk = tipc_sk(sk);
2380        u32 value;
2381        int res;
2382
2383        if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2384                return 0;
2385        if (lvl != SOL_TIPC)
2386                return -ENOPROTOOPT;
2387        if (ol < sizeof(value))
2388                return -EINVAL;
2389        res = get_user(value, (u32 __user *)ov);
2390        if (res)
2391                return res;
2392
2393        lock_sock(sk);
2394
2395        switch (opt) {
2396        case TIPC_IMPORTANCE:
2397                res = tsk_set_importance(tsk, value);
2398                break;
2399        case TIPC_SRC_DROPPABLE:
2400                if (sock->type != SOCK_STREAM)
2401                        tsk_set_unreliable(tsk, value);
2402                else
2403                        res = -ENOPROTOOPT;
2404                break;
2405        case TIPC_DEST_DROPPABLE:
2406                tsk_set_unreturnable(tsk, value);
2407                break;
2408        case TIPC_CONN_TIMEOUT:
2409                tipc_sk(sk)->conn_timeout = value;
2410                /* no need to set "res", since already 0 at this point */
2411                break;
2412        default:
2413                res = -EINVAL;
2414        }
2415
2416        release_sock(sk);
2417
2418        return res;
2419}
2420
2421/**
2422 * tipc_getsockopt - get socket option
2423 * @sock: socket structure
2424 * @lvl: option level
2425 * @opt: option identifier
2426 * @ov: receptacle for option value
2427 * @ol: receptacle for length of option value
2428 *
2429 * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
2430 * (to ease compatibility).
2431 *
2432 * Returns 0 on success, errno otherwise
2433 */
2434static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2435                           char __user *ov, int __user *ol)
2436{
2437        struct sock *sk = sock->sk;
2438        struct tipc_sock *tsk = tipc_sk(sk);
2439        int len;
2440        u32 value;
2441        int res;
2442
2443        if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2444                return put_user(0, ol);
2445        if (lvl != SOL_TIPC)
2446                return -ENOPROTOOPT;
2447        res = get_user(len, ol);
2448        if (res)
2449                return res;
2450
2451        lock_sock(sk);
2452
2453        switch (opt) {
2454        case TIPC_IMPORTANCE:
2455                value = tsk_importance(tsk);
2456                break;
2457        case TIPC_SRC_DROPPABLE:
2458                value = tsk_unreliable(tsk);
2459                break;
2460        case TIPC_DEST_DROPPABLE:
2461                value = tsk_unreturnable(tsk);
2462                break;
2463        case TIPC_CONN_TIMEOUT:
2464                value = tsk->conn_timeout;
2465                /* no need to set "res", since already 0 at this point */
2466                break;
2467        case TIPC_NODE_RECVQ_DEPTH:
2468                value = 0; /* was tipc_queue_size, now obsolete */
2469                break;
2470        case TIPC_SOCK_RECVQ_DEPTH:
2471                value = skb_queue_len(&sk->sk_receive_queue);
2472                break;
2473        default:
2474                res = -EINVAL;
2475        }
2476
2477        release_sock(sk);
2478
2479        if (res)
2480                return res;     /* "get" failed */
2481
2482        if (len < sizeof(value))
2483                return -EINVAL;
2484
2485        if (copy_to_user(ov, &value, sizeof(value)))
2486                return -EFAULT;
2487
2488        return put_user(sizeof(value), ol);
2489}
2490
2491static int tipc_ioctl(struct socket *sock, unsigned int cmd, unsigned long arg)
2492{
2493        struct sock *sk = sock->sk;
2494        struct tipc_sioc_ln_req lnr;
2495        void __user *argp = (void __user *)arg;
2496
2497        switch (cmd) {
2498        case SIOCGETLINKNAME:
2499                if (copy_from_user(&lnr, argp, sizeof(lnr)))
2500                        return -EFAULT;
2501                if (!tipc_node_get_linkname(sock_net(sk),
2502                                            lnr.bearer_id & 0xffff, lnr.peer,
2503                                            lnr.linkname, TIPC_MAX_LINK_NAME)) {
2504                        if (copy_to_user(argp, &lnr, sizeof(lnr)))
2505                                return -EFAULT;
2506                        return 0;
2507                }
2508                return -EADDRNOTAVAIL;
2509        default:
2510                return -ENOIOCTLCMD;
2511        }
2512}
2513
2514/* Protocol switches for the various types of TIPC sockets */
2515
2516static const struct proto_ops msg_ops = {
2517        .owner          = THIS_MODULE,
2518        .family         = AF_TIPC,
2519        .release        = tipc_release,
2520        .bind           = tipc_bind,
2521        .connect        = tipc_connect,
2522        .socketpair     = sock_no_socketpair,
2523        .accept         = sock_no_accept,
2524        .getname        = tipc_getname,
2525        .poll           = tipc_poll,
2526        .ioctl          = tipc_ioctl,
2527        .listen         = sock_no_listen,
2528        .shutdown       = tipc_shutdown,
2529        .setsockopt     = tipc_setsockopt,
2530        .getsockopt     = tipc_getsockopt,
2531        .sendmsg        = tipc_sendmsg,
2532        .recvmsg        = tipc_recvmsg,
2533        .mmap           = sock_no_mmap,
2534        .sendpage       = sock_no_sendpage
2535};
2536
2537static const struct proto_ops packet_ops = {
2538        .owner          = THIS_MODULE,
2539        .family         = AF_TIPC,
2540        .release        = tipc_release,
2541        .bind           = tipc_bind,
2542        .connect        = tipc_connect,
2543        .socketpair     = sock_no_socketpair,
2544        .accept         = tipc_accept,
2545        .getname        = tipc_getname,
2546        .poll           = tipc_poll,
2547        .ioctl          = tipc_ioctl,
2548        .listen         = tipc_listen,
2549        .shutdown       = tipc_shutdown,
2550        .setsockopt     = tipc_setsockopt,
2551        .getsockopt     = tipc_getsockopt,
2552        .sendmsg        = tipc_send_packet,
2553        .recvmsg        = tipc_recvmsg,
2554        .mmap           = sock_no_mmap,
2555        .sendpage       = sock_no_sendpage
2556};
2557
2558static const struct proto_ops stream_ops = {
2559        .owner          = THIS_MODULE,
2560        .family         = AF_TIPC,
2561        .release        = tipc_release,
2562        .bind           = tipc_bind,
2563        .connect        = tipc_connect,
2564        .socketpair     = sock_no_socketpair,
2565        .accept         = tipc_accept,
2566        .getname        = tipc_getname,
2567        .poll           = tipc_poll,
2568        .ioctl          = tipc_ioctl,
2569        .listen         = tipc_listen,
2570        .shutdown       = tipc_shutdown,
2571        .setsockopt     = tipc_setsockopt,
2572        .getsockopt     = tipc_getsockopt,
2573        .sendmsg        = tipc_send_stream,
2574        .recvmsg        = tipc_recv_stream,
2575        .mmap           = sock_no_mmap,
2576        .sendpage       = sock_no_sendpage
2577};
2578
2579static const struct net_proto_family tipc_family_ops = {
2580        .owner          = THIS_MODULE,
2581        .family         = AF_TIPC,
2582        .create         = tipc_sk_create
2583};
2584
2585static struct proto tipc_proto = {
2586        .name           = "TIPC",
2587        .owner          = THIS_MODULE,
2588        .obj_size       = sizeof(struct tipc_sock),
2589        .sysctl_rmem    = sysctl_tipc_rmem
2590};
2591
2592/**
2593 * tipc_socket_init - initialize TIPC socket interface
2594 *
2595 * Returns 0 on success, errno otherwise
2596 */
2597int tipc_socket_init(void)
2598{
2599        int res;
2600
2601        res = proto_register(&tipc_proto, 1);
2602        if (res) {
2603                pr_err("Failed to register TIPC protocol type\n");
2604                goto out;
2605        }
2606
2607        res = sock_register(&tipc_family_ops);
2608        if (res) {
2609                pr_err("Failed to register TIPC socket type\n");
2610                proto_unregister(&tipc_proto);
2611                goto out;
2612        }
2613 out:
2614        return res;
2615}
2616
2617/**
2618 * tipc_socket_stop - stop TIPC socket interface
2619 */
2620void tipc_socket_stop(void)
2621{
2622        sock_unregister(tipc_family_ops.family);
2623        proto_unregister(&tipc_proto);
2624}
2625
2626/* Caller should hold socket lock for the passed tipc socket. */
2627static int __tipc_nl_add_sk_con(struct sk_buff *skb, struct tipc_sock *tsk)
2628{
2629        u32 peer_node;
2630        u32 peer_port;
2631        struct nlattr *nest;
2632
2633        peer_node = tsk_peer_node(tsk);
2634        peer_port = tsk_peer_port(tsk);
2635
2636        nest = nla_nest_start(skb, TIPC_NLA_SOCK_CON);
2637
2638        if (nla_put_u32(skb, TIPC_NLA_CON_NODE, peer_node))
2639                goto msg_full;
2640        if (nla_put_u32(skb, TIPC_NLA_CON_SOCK, peer_port))
2641                goto msg_full;
2642
2643        if (tsk->conn_type != 0) {
2644                if (nla_put_flag(skb, TIPC_NLA_CON_FLAG))
2645                        goto msg_full;
2646                if (nla_put_u32(skb, TIPC_NLA_CON_TYPE, tsk->conn_type))
2647                        goto msg_full;
2648                if (nla_put_u32(skb, TIPC_NLA_CON_INST, tsk->conn_instance))
2649                        goto msg_full;
2650        }
2651        nla_nest_end(skb, nest);
2652
2653        return 0;
2654
2655msg_full:
2656        nla_nest_cancel(skb, nest);
2657
2658        return -EMSGSIZE;
2659}
2660
2661/* Caller should hold socket lock for the passed tipc socket. */
2662static int __tipc_nl_add_sk(struct sk_buff *skb, struct netlink_callback *cb,
2663                            struct tipc_sock *tsk)
2664{
2665        int err;
2666        void *hdr;
2667        struct nlattr *attrs;
2668        struct net *net = sock_net(skb->sk);
2669        struct tipc_net *tn = net_generic(net, tipc_net_id);
2670
2671        hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2672                          &tipc_genl_family, NLM_F_MULTI, TIPC_NL_SOCK_GET);
2673        if (!hdr)
2674                goto msg_cancel;
2675
2676        attrs = nla_nest_start(skb, TIPC_NLA_SOCK);
2677        if (!attrs)
2678                goto genlmsg_cancel;
2679        if (nla_put_u32(skb, TIPC_NLA_SOCK_REF, tsk->portid))
2680                goto attr_msg_cancel;
2681        if (nla_put_u32(skb, TIPC_NLA_SOCK_ADDR, tn->own_addr))
2682                goto attr_msg_cancel;
2683
2684        if (tsk->connected) {
2685                err = __tipc_nl_add_sk_con(skb, tsk);
2686                if (err)
2687                        goto attr_msg_cancel;
2688        } else if (!list_empty(&tsk->publications)) {
2689                if (nla_put_flag(skb, TIPC_NLA_SOCK_HAS_PUBL))
2690                        goto attr_msg_cancel;
2691        }
2692        nla_nest_end(skb, attrs);
2693        genlmsg_end(skb, hdr);
2694
2695        return 0;
2696
2697attr_msg_cancel:
2698        nla_nest_cancel(skb, attrs);
2699genlmsg_cancel:
2700        genlmsg_cancel(skb, hdr);
2701msg_cancel:
2702        return -EMSGSIZE;
2703}
2704
2705int tipc_nl_sk_dump(struct sk_buff *skb, struct netlink_callback *cb)
2706{
2707        int err;
2708        struct tipc_sock *tsk;
2709        const struct bucket_table *tbl;
2710        struct rhash_head *pos;
2711        struct net *net = sock_net(skb->sk);
2712        struct tipc_net *tn = net_generic(net, tipc_net_id);
2713        u32 tbl_id = cb->args[0];
2714        u32 prev_portid = cb->args[1];
2715
2716        rcu_read_lock();
2717        tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht);
2718        for (; tbl_id < tbl->size; tbl_id++) {
2719                rht_for_each_entry_rcu(tsk, pos, tbl, tbl_id, node) {
2720                        spin_lock_bh(&tsk->sk.sk_lock.slock);
2721                        if (prev_portid && prev_portid != tsk->portid) {
2722                                spin_unlock_bh(&tsk->sk.sk_lock.slock);
2723                                continue;
2724                        }
2725
2726                        err = __tipc_nl_add_sk(skb, cb, tsk);
2727                        if (err) {
2728                                prev_portid = tsk->portid;
2729                                spin_unlock_bh(&tsk->sk.sk_lock.slock);
2730                                goto out;
2731                        }
2732                        prev_portid = 0;
2733                        spin_unlock_bh(&tsk->sk.sk_lock.slock);
2734                }
2735        }
2736out:
2737        rcu_read_unlock();
2738        cb->args[0] = tbl_id;
2739        cb->args[1] = prev_portid;
2740
2741        return skb->len;
2742}
2743
2744/* Caller should hold socket lock for the passed tipc socket. */
2745static int __tipc_nl_add_sk_publ(struct sk_buff *skb,
2746                                 struct netlink_callback *cb,
2747                                 struct publication *publ)
2748{
2749        void *hdr;
2750        struct nlattr *attrs;
2751
2752        hdr = genlmsg_put(skb, NETLINK_CB(cb->skb).portid, cb->nlh->nlmsg_seq,
2753                          &tipc_genl_family, NLM_F_MULTI, TIPC_NL_PUBL_GET);
2754        if (!hdr)
2755                goto msg_cancel;
2756
2757        attrs = nla_nest_start(skb, TIPC_NLA_PUBL);
2758        if (!attrs)
2759                goto genlmsg_cancel;
2760
2761        if (nla_put_u32(skb, TIPC_NLA_PUBL_KEY, publ->key))
2762                goto attr_msg_cancel;
2763        if (nla_put_u32(skb, TIPC_NLA_PUBL_TYPE, publ->type))
2764                goto attr_msg_cancel;
2765        if (nla_put_u32(skb, TIPC_NLA_PUBL_LOWER, publ->lower))
2766                goto attr_msg_cancel;
2767        if (nla_put_u32(skb, TIPC_NLA_PUBL_UPPER, publ->upper))
2768                goto attr_msg_cancel;
2769
2770        nla_nest_end(skb, attrs);
2771        genlmsg_end(skb, hdr);
2772
2773        return 0;
2774
2775attr_msg_cancel:
2776        nla_nest_cancel(skb, attrs);
2777genlmsg_cancel:
2778        genlmsg_cancel(skb, hdr);
2779msg_cancel:
2780        return -EMSGSIZE;
2781}
2782
2783/* Caller should hold socket lock for the passed tipc socket. */
2784static int __tipc_nl_list_sk_publ(struct sk_buff *skb,
2785                                  struct netlink_callback *cb,
2786                                  struct tipc_sock *tsk, u32 *last_publ)
2787{
2788        int err;
2789        struct publication *p;
2790
2791        if (*last_publ) {
2792                list_for_each_entry(p, &tsk->publications, pport_list) {
2793                        if (p->key == *last_publ)
2794                                break;
2795                }
2796                if (p->key != *last_publ) {
2797                        /* We never set seq or call nl_dump_check_consistent()
2798                         * this means that setting prev_seq here will cause the
2799                         * consistence check to fail in the netlink callback
2800                         * handler. Resulting in the last NLMSG_DONE message
2801                         * having the NLM_F_DUMP_INTR flag set.
2802                         */
2803                        cb->prev_seq = 1;
2804                        *last_publ = 0;
2805                        return -EPIPE;
2806                }
2807        } else {
2808                p = list_first_entry(&tsk->publications, struct publication,
2809                                     pport_list);
2810        }
2811
2812        list_for_each_entry_from(p, &tsk->publications, pport_list) {
2813                err = __tipc_nl_add_sk_publ(skb, cb, p);
2814                if (err) {
2815                        *last_publ = p->key;
2816                        return err;
2817                }
2818        }
2819        *last_publ = 0;
2820
2821        return 0;
2822}
2823
2824int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
2825{
2826        int err;
2827        u32 tsk_portid = cb->args[0];
2828        u32 last_publ = cb->args[1];
2829        u32 done = cb->args[2];
2830        struct net *net = sock_net(skb->sk);
2831        struct tipc_sock *tsk;
2832
2833        if (!tsk_portid) {
2834                struct nlattr **attrs;
2835                struct nlattr *sock[TIPC_NLA_SOCK_MAX + 1];
2836
2837                err = tipc_nlmsg_parse(cb->nlh, &attrs);
2838                if (err)
2839                        return err;
2840
2841                if (!attrs[TIPC_NLA_SOCK])
2842                        return -EINVAL;
2843
2844                err = nla_parse_nested(sock, TIPC_NLA_SOCK_MAX,
2845                                       attrs[TIPC_NLA_SOCK],
2846                                       tipc_nl_sock_policy);
2847                if (err)
2848                        return err;
2849
2850                if (!sock[TIPC_NLA_SOCK_REF])
2851                        return -EINVAL;
2852
2853                tsk_portid = nla_get_u32(sock[TIPC_NLA_SOCK_REF]);
2854        }
2855
2856        if (done)
2857                return 0;
2858
2859        tsk = tipc_sk_lookup(net, tsk_portid);
2860        if (!tsk)
2861                return -EINVAL;
2862
2863        lock_sock(&tsk->sk);
2864        err = __tipc_nl_list_sk_publ(skb, cb, tsk, &last_publ);
2865        if (!err)
2866                done = 1;
2867        release_sock(&tsk->sk);
2868        sock_put(&tsk->sk);
2869
2870        cb->args[0] = tsk_portid;
2871        cb->args[1] = last_publ;
2872        cb->args[2] = done;
2873
2874        return skb->len;
2875}
2876