linux/net/tipc/socket.c
<<
>>
Prefs
   1/*
   2 * net/tipc/socket.c: TIPC socket API
   3 *
   4 * Copyright (c) 2001-2007, 2012 Ericsson AB
   5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include "core.h"
  38#include "port.h"
  39
  40#include <linux/export.h>
  41#include <net/sock.h>
  42
  43#define SS_LISTENING    -1      /* socket is listening */
  44#define SS_READY        -2      /* socket is connectionless */
  45
  46#define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
  47
  48struct tipc_sock {
  49        struct sock sk;
  50        struct tipc_port *p;
  51        struct tipc_portid peer_name;
  52        unsigned int conn_timeout;
  53};
  54
  55#define tipc_sk(sk) ((struct tipc_sock *)(sk))
  56#define tipc_sk_port(sk) (tipc_sk(sk)->p)
  57
  58#define tipc_rx_ready(sock) (!skb_queue_empty(&sock->sk->sk_receive_queue) || \
  59                        (sock->state == SS_DISCONNECTING))
  60
  61static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
  62static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
  63static void wakeupdispatch(struct tipc_port *tport);
  64static void tipc_data_ready(struct sock *sk, int len);
  65static void tipc_write_space(struct sock *sk);
  66static int release(struct socket *sock);
  67static int accept(struct socket *sock, struct socket *new_sock, int flags);
  68
  69static const struct proto_ops packet_ops;
  70static const struct proto_ops stream_ops;
  71static const struct proto_ops msg_ops;
  72
  73static struct proto tipc_proto;
  74static struct proto tipc_proto_kern;
  75
  76static int sockets_enabled;
  77
  78/*
  79 * Revised TIPC socket locking policy:
  80 *
  81 * Most socket operations take the standard socket lock when they start
  82 * and hold it until they finish (or until they need to sleep).  Acquiring
  83 * this lock grants the owner exclusive access to the fields of the socket
  84 * data structures, with the exception of the backlog queue.  A few socket
  85 * operations can be done without taking the socket lock because they only
  86 * read socket information that never changes during the life of the socket.
  87 *
  88 * Socket operations may acquire the lock for the associated TIPC port if they
  89 * need to perform an operation on the port.  If any routine needs to acquire
  90 * both the socket lock and the port lock it must take the socket lock first
  91 * to avoid the risk of deadlock.
  92 *
  93 * The dispatcher handling incoming messages cannot grab the socket lock in
  94 * the standard fashion, since invoked it runs at the BH level and cannot block.
  95 * Instead, it checks to see if the socket lock is currently owned by someone,
  96 * and either handles the message itself or adds it to the socket's backlog
  97 * queue; in the latter case the queued message is processed once the process
  98 * owning the socket lock releases it.
  99 *
 100 * NOTE: Releasing the socket lock while an operation is sleeping overcomes
 101 * the problem of a blocked socket operation preventing any other operations
 102 * from occurring.  However, applications must be careful if they have
 103 * multiple threads trying to send (or receive) on the same socket, as these
 104 * operations might interfere with each other.  For example, doing a connect
 105 * and a receive at the same time might allow the receive to consume the
 106 * ACK message meant for the connect.  While additional work could be done
 107 * to try and overcome this, it doesn't seem to be worthwhile at the present.
 108 *
 109 * NOTE: Releasing the socket lock while an operation is sleeping also ensures
 110 * that another operation that must be performed in a non-blocking manner is
 111 * not delayed for very long because the lock has already been taken.
 112 *
 113 * NOTE: This code assumes that certain fields of a port/socket pair are
 114 * constant over its lifetime; such fields can be examined without taking
 115 * the socket lock and/or port lock, and do not need to be re-read even
 116 * after resuming processing after waiting.  These fields include:
 117 *   - socket type
 118 *   - pointer to socket sk structure (aka tipc_sock structure)
 119 *   - pointer to port structure
 120 *   - port reference
 121 */
 122
 123/**
 124 * advance_rx_queue - discard first buffer in socket receive queue
 125 *
 126 * Caller must hold socket lock
 127 */
 128static void advance_rx_queue(struct sock *sk)
 129{
 130        kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
 131}
 132
 133/**
 134 * reject_rx_queue - reject all buffers in socket receive queue
 135 *
 136 * Caller must hold socket lock
 137 */
 138static void reject_rx_queue(struct sock *sk)
 139{
 140        struct sk_buff *buf;
 141
 142        while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
 143                tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
 144}
 145
 146/**
 147 * tipc_sk_create - create a TIPC socket
 148 * @net: network namespace (must be default network)
 149 * @sock: pre-allocated socket structure
 150 * @protocol: protocol indicator (must be 0)
 151 * @kern: caused by kernel or by userspace?
 152 *
 153 * This routine creates additional data structures used by the TIPC socket,
 154 * initializes them, and links them together.
 155 *
 156 * Returns 0 on success, errno otherwise
 157 */
 158static int tipc_sk_create(struct net *net, struct socket *sock, int protocol,
 159                          int kern)
 160{
 161        const struct proto_ops *ops;
 162        socket_state state;
 163        struct sock *sk;
 164        struct tipc_port *tp_ptr;
 165
 166        /* Validate arguments */
 167        if (unlikely(protocol != 0))
 168                return -EPROTONOSUPPORT;
 169
 170        switch (sock->type) {
 171        case SOCK_STREAM:
 172                ops = &stream_ops;
 173                state = SS_UNCONNECTED;
 174                break;
 175        case SOCK_SEQPACKET:
 176                ops = &packet_ops;
 177                state = SS_UNCONNECTED;
 178                break;
 179        case SOCK_DGRAM:
 180        case SOCK_RDM:
 181                ops = &msg_ops;
 182                state = SS_READY;
 183                break;
 184        default:
 185                return -EPROTOTYPE;
 186        }
 187
 188        /* Allocate socket's protocol area */
 189        if (!kern)
 190                sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
 191        else
 192                sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
 193
 194        if (sk == NULL)
 195                return -ENOMEM;
 196
 197        /* Allocate TIPC port for socket to use */
 198        tp_ptr = tipc_createport(sk, &dispatch, &wakeupdispatch,
 199                                 TIPC_LOW_IMPORTANCE);
 200        if (unlikely(!tp_ptr)) {
 201                sk_free(sk);
 202                return -ENOMEM;
 203        }
 204
 205        /* Finish initializing socket data structures */
 206        sock->ops = ops;
 207        sock->state = state;
 208
 209        sock_init_data(sock, sk);
 210        sk->sk_backlog_rcv = backlog_rcv;
 211        sk->sk_rcvbuf = sysctl_tipc_rmem[1];
 212        sk->sk_data_ready = tipc_data_ready;
 213        sk->sk_write_space = tipc_write_space;
 214        tipc_sk(sk)->p = tp_ptr;
 215        tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;
 216
 217        spin_unlock_bh(tp_ptr->lock);
 218
 219        if (sock->state == SS_READY) {
 220                tipc_set_portunreturnable(tp_ptr->ref, 1);
 221                if (sock->type == SOCK_DGRAM)
 222                        tipc_set_portunreliable(tp_ptr->ref, 1);
 223        }
 224
 225        return 0;
 226}
 227
 228/**
 229 * tipc_sock_create_local - create TIPC socket from inside TIPC module
 230 * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
 231 *
 232 * We cannot use sock_creat_kern here because it bumps module user count.
 233 * Since socket owner and creator is the same module we must make sure
 234 * that module count remains zero for module local sockets, otherwise
 235 * we cannot do rmmod.
 236 *
 237 * Returns 0 on success, errno otherwise
 238 */
 239int tipc_sock_create_local(int type, struct socket **res)
 240{
 241        int rc;
 242        struct sock *sk;
 243
 244        rc = sock_create_lite(AF_TIPC, type, 0, res);
 245        if (rc < 0) {
 246                pr_err("Failed to create kernel socket\n");
 247                return rc;
 248        }
 249        tipc_sk_create(&init_net, *res, 0, 1);
 250
 251        sk = (*res)->sk;
 252
 253        return 0;
 254}
 255
 256/**
 257 * tipc_sock_release_local - release socket created by tipc_sock_create_local
 258 * @sock: the socket to be released.
 259 *
 260 * Module reference count is not incremented when such sockets are created,
 261 * so we must keep it from being decremented when they are released.
 262 */
 263void tipc_sock_release_local(struct socket *sock)
 264{
 265        release(sock);
 266        sock->ops = NULL;
 267        sock_release(sock);
 268}
 269
 270/**
 271 * tipc_sock_accept_local - accept a connection on a socket created
 272 * with tipc_sock_create_local. Use this function to avoid that
 273 * module reference count is inadvertently incremented.
 274 *
 275 * @sock:    the accepting socket
 276 * @newsock: reference to the new socket to be created
 277 * @flags:   socket flags
 278 */
 279
 280int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
 281                           int flags)
 282{
 283        struct sock *sk = sock->sk;
 284        int ret;
 285
 286        ret = sock_create_lite(sk->sk_family, sk->sk_type,
 287                               sk->sk_protocol, newsock);
 288        if (ret < 0)
 289                return ret;
 290
 291        ret = accept(sock, *newsock, flags);
 292        if (ret < 0) {
 293                sock_release(*newsock);
 294                return ret;
 295        }
 296        (*newsock)->ops = sock->ops;
 297        return ret;
 298}
 299
 300/**
 301 * release - destroy a TIPC socket
 302 * @sock: socket to destroy
 303 *
 304 * This routine cleans up any messages that are still queued on the socket.
 305 * For DGRAM and RDM socket types, all queued messages are rejected.
 306 * For SEQPACKET and STREAM socket types, the first message is rejected
 307 * and any others are discarded.  (If the first message on a STREAM socket
 308 * is partially-read, it is discarded and the next one is rejected instead.)
 309 *
 310 * NOTE: Rejected messages are not necessarily returned to the sender!  They
 311 * are returned or discarded according to the "destination droppable" setting
 312 * specified for the message by the sender.
 313 *
 314 * Returns 0 on success, errno otherwise
 315 */
 316static int release(struct socket *sock)
 317{
 318        struct sock *sk = sock->sk;
 319        struct tipc_port *tport;
 320        struct sk_buff *buf;
 321        int res;
 322
 323        /*
 324         * Exit if socket isn't fully initialized (occurs when a failed accept()
 325         * releases a pre-allocated child socket that was never used)
 326         */
 327        if (sk == NULL)
 328                return 0;
 329
 330        tport = tipc_sk_port(sk);
 331        lock_sock(sk);
 332
 333        /*
 334         * Reject all unreceived messages, except on an active connection
 335         * (which disconnects locally & sends a 'FIN+' to peer)
 336         */
 337        while (sock->state != SS_DISCONNECTING) {
 338                buf = __skb_dequeue(&sk->sk_receive_queue);
 339                if (buf == NULL)
 340                        break;
 341                if (TIPC_SKB_CB(buf)->handle != 0)
 342                        kfree_skb(buf);
 343                else {
 344                        if ((sock->state == SS_CONNECTING) ||
 345                            (sock->state == SS_CONNECTED)) {
 346                                sock->state = SS_DISCONNECTING;
 347                                tipc_disconnect(tport->ref);
 348                        }
 349                        tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
 350                }
 351        }
 352
 353        /*
 354         * Delete TIPC port; this ensures no more messages are queued
 355         * (also disconnects an active connection & sends a 'FIN-' to peer)
 356         */
 357        res = tipc_deleteport(tport->ref);
 358
 359        /* Discard any remaining (connection-based) messages in receive queue */
 360        __skb_queue_purge(&sk->sk_receive_queue);
 361
 362        /* Reject any messages that accumulated in backlog queue */
 363        sock->state = SS_DISCONNECTING;
 364        release_sock(sk);
 365
 366        sock_put(sk);
 367        sock->sk = NULL;
 368
 369        return res;
 370}
 371
 372/**
 373 * bind - associate or disassocate TIPC name(s) with a socket
 374 * @sock: socket structure
 375 * @uaddr: socket address describing name(s) and desired operation
 376 * @uaddr_len: size of socket address data structure
 377 *
 378 * Name and name sequence binding is indicated using a positive scope value;
 379 * a negative scope value unbinds the specified name.  Specifying no name
 380 * (i.e. a socket address length of 0) unbinds all names from the socket.
 381 *
 382 * Returns 0 on success, errno otherwise
 383 *
 384 * NOTE: This routine doesn't need to take the socket lock since it doesn't
 385 *       access any non-constant socket information.
 386 */
 387static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
 388{
 389        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
 390        u32 portref = tipc_sk_port(sock->sk)->ref;
 391
 392        if (unlikely(!uaddr_len))
 393                return tipc_withdraw(portref, 0, NULL);
 394
 395        if (uaddr_len < sizeof(struct sockaddr_tipc))
 396                return -EINVAL;
 397        if (addr->family != AF_TIPC)
 398                return -EAFNOSUPPORT;
 399
 400        if (addr->addrtype == TIPC_ADDR_NAME)
 401                addr->addr.nameseq.upper = addr->addr.nameseq.lower;
 402        else if (addr->addrtype != TIPC_ADDR_NAMESEQ)
 403                return -EAFNOSUPPORT;
 404
 405        if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
 406            (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
 407            (addr->addr.nameseq.type != TIPC_CFG_SRV))
 408                return -EACCES;
 409
 410        return (addr->scope > 0) ?
 411                tipc_publish(portref, addr->scope, &addr->addr.nameseq) :
 412                tipc_withdraw(portref, -addr->scope, &addr->addr.nameseq);
 413}
 414
 415/**
 416 * get_name - get port ID of socket or peer socket
 417 * @sock: socket structure
 418 * @uaddr: area for returned socket address
 419 * @uaddr_len: area for returned length of socket address
 420 * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
 421 *
 422 * Returns 0 on success, errno otherwise
 423 *
 424 * NOTE: This routine doesn't need to take the socket lock since it only
 425 *       accesses socket information that is unchanging (or which changes in
 426 *       a completely predictable manner).
 427 */
 428static int get_name(struct socket *sock, struct sockaddr *uaddr,
 429                    int *uaddr_len, int peer)
 430{
 431        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
 432        struct tipc_sock *tsock = tipc_sk(sock->sk);
 433
 434        memset(addr, 0, sizeof(*addr));
 435        if (peer) {
 436                if ((sock->state != SS_CONNECTED) &&
 437                        ((peer != 2) || (sock->state != SS_DISCONNECTING)))
 438                        return -ENOTCONN;
 439                addr->addr.id.ref = tsock->peer_name.ref;
 440                addr->addr.id.node = tsock->peer_name.node;
 441        } else {
 442                addr->addr.id.ref = tsock->p->ref;
 443                addr->addr.id.node = tipc_own_addr;
 444        }
 445
 446        *uaddr_len = sizeof(*addr);
 447        addr->addrtype = TIPC_ADDR_ID;
 448        addr->family = AF_TIPC;
 449        addr->scope = 0;
 450        addr->addr.name.domain = 0;
 451
 452        return 0;
 453}
 454
 455/**
 456 * poll - read and possibly block on pollmask
 457 * @file: file structure associated with the socket
 458 * @sock: socket for which to calculate the poll bits
 459 * @wait: ???
 460 *
 461 * Returns pollmask value
 462 *
 463 * COMMENTARY:
 464 * It appears that the usual socket locking mechanisms are not useful here
 465 * since the pollmask info is potentially out-of-date the moment this routine
 466 * exits.  TCP and other protocols seem to rely on higher level poll routines
 467 * to handle any preventable race conditions, so TIPC will do the same ...
 468 *
 469 * TIPC sets the returned events as follows:
 470 *
 471 * socket state         flags set
 472 * ------------         ---------
 473 * unconnected          no read flags
 474 *                      POLLOUT if port is not congested
 475 *
 476 * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
 477 *                      no write flags
 478 *
 479 * connected            POLLIN/POLLRDNORM if data in rx queue
 480 *                      POLLOUT if port is not congested
 481 *
 482 * disconnecting        POLLIN/POLLRDNORM/POLLHUP
 483 *                      no write flags
 484 *
 485 * listening            POLLIN if SYN in rx queue
 486 *                      no write flags
 487 *
 488 * ready                POLLIN/POLLRDNORM if data in rx queue
 489 * [connectionless]     POLLOUT (since port cannot be congested)
 490 *
 491 * IMPORTANT: The fact that a read or write operation is indicated does NOT
 492 * imply that the operation will succeed, merely that it should be performed
 493 * and will not block.
 494 */
 495static unsigned int poll(struct file *file, struct socket *sock,
 496                         poll_table *wait)
 497{
 498        struct sock *sk = sock->sk;
 499        u32 mask = 0;
 500
 501        sock_poll_wait(file, sk_sleep(sk), wait);
 502
 503        switch ((int)sock->state) {
 504        case SS_UNCONNECTED:
 505                if (!tipc_sk_port(sk)->congested)
 506                        mask |= POLLOUT;
 507                break;
 508        case SS_READY:
 509        case SS_CONNECTED:
 510                if (!tipc_sk_port(sk)->congested)
 511                        mask |= POLLOUT;
 512                /* fall thru' */
 513        case SS_CONNECTING:
 514        case SS_LISTENING:
 515                if (!skb_queue_empty(&sk->sk_receive_queue))
 516                        mask |= (POLLIN | POLLRDNORM);
 517                break;
 518        case SS_DISCONNECTING:
 519                mask = (POLLIN | POLLRDNORM | POLLHUP);
 520                break;
 521        }
 522
 523        return mask;
 524}
 525
 526/**
 527 * dest_name_check - verify user is permitted to send to specified port name
 528 * @dest: destination address
 529 * @m: descriptor for message to be sent
 530 *
 531 * Prevents restricted configuration commands from being issued by
 532 * unauthorized users.
 533 *
 534 * Returns 0 if permission is granted, otherwise errno
 535 */
 536static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
 537{
 538        struct tipc_cfg_msg_hdr hdr;
 539
 540        if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
 541                return 0;
 542        if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
 543                return 0;
 544        if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
 545                return -EACCES;
 546
 547        if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
 548                return -EMSGSIZE;
 549        if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
 550                return -EFAULT;
 551        if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
 552                return -EACCES;
 553
 554        return 0;
 555}
 556
 557/**
 558 * send_msg - send message in connectionless manner
 559 * @iocb: if NULL, indicates that socket lock is already held
 560 * @sock: socket structure
 561 * @m: message to send
 562 * @total_len: length of message
 563 *
 564 * Message must have an destination specified explicitly.
 565 * Used for SOCK_RDM and SOCK_DGRAM messages,
 566 * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
 567 * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
 568 *
 569 * Returns the number of bytes sent on success, or errno otherwise
 570 */
 571static int send_msg(struct kiocb *iocb, struct socket *sock,
 572                    struct msghdr *m, size_t total_len)
 573{
 574        struct sock *sk = sock->sk;
 575        struct tipc_port *tport = tipc_sk_port(sk);
 576        struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
 577        int needs_conn;
 578        long timeout_val;
 579        int res = -EINVAL;
 580
 581        if (unlikely(!dest))
 582                return -EDESTADDRREQ;
 583        if (unlikely((m->msg_namelen < sizeof(*dest)) ||
 584                     (dest->family != AF_TIPC)))
 585                return -EINVAL;
 586        if (total_len > TIPC_MAX_USER_MSG_SIZE)
 587                return -EMSGSIZE;
 588
 589        if (iocb)
 590                lock_sock(sk);
 591
 592        needs_conn = (sock->state != SS_READY);
 593        if (unlikely(needs_conn)) {
 594                if (sock->state == SS_LISTENING) {
 595                        res = -EPIPE;
 596                        goto exit;
 597                }
 598                if (sock->state != SS_UNCONNECTED) {
 599                        res = -EISCONN;
 600                        goto exit;
 601                }
 602                if (tport->published) {
 603                        res = -EOPNOTSUPP;
 604                        goto exit;
 605                }
 606                if (dest->addrtype == TIPC_ADDR_NAME) {
 607                        tport->conn_type = dest->addr.name.name.type;
 608                        tport->conn_instance = dest->addr.name.name.instance;
 609                }
 610
 611                /* Abort any pending connection attempts (very unlikely) */
 612                reject_rx_queue(sk);
 613        }
 614
 615        timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
 616
 617        do {
 618                if (dest->addrtype == TIPC_ADDR_NAME) {
 619                        res = dest_name_check(dest, m);
 620                        if (res)
 621                                break;
 622                        res = tipc_send2name(tport->ref,
 623                                             &dest->addr.name.name,
 624                                             dest->addr.name.domain,
 625                                             m->msg_iovlen,
 626                                             m->msg_iov,
 627                                             total_len);
 628                } else if (dest->addrtype == TIPC_ADDR_ID) {
 629                        res = tipc_send2port(tport->ref,
 630                                             &dest->addr.id,
 631                                             m->msg_iovlen,
 632                                             m->msg_iov,
 633                                             total_len);
 634                } else if (dest->addrtype == TIPC_ADDR_MCAST) {
 635                        if (needs_conn) {
 636                                res = -EOPNOTSUPP;
 637                                break;
 638                        }
 639                        res = dest_name_check(dest, m);
 640                        if (res)
 641                                break;
 642                        res = tipc_multicast(tport->ref,
 643                                             &dest->addr.nameseq,
 644                                             m->msg_iovlen,
 645                                             m->msg_iov,
 646                                             total_len);
 647                }
 648                if (likely(res != -ELINKCONG)) {
 649                        if (needs_conn && (res >= 0))
 650                                sock->state = SS_CONNECTING;
 651                        break;
 652                }
 653                if (timeout_val <= 0L) {
 654                        res = timeout_val ? timeout_val : -EWOULDBLOCK;
 655                        break;
 656                }
 657                release_sock(sk);
 658                timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk),
 659                                               !tport->congested, timeout_val);
 660                lock_sock(sk);
 661        } while (1);
 662
 663exit:
 664        if (iocb)
 665                release_sock(sk);
 666        return res;
 667}
 668
 669/**
 670 * send_packet - send a connection-oriented message
 671 * @iocb: if NULL, indicates that socket lock is already held
 672 * @sock: socket structure
 673 * @m: message to send
 674 * @total_len: length of message
 675 *
 676 * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
 677 *
 678 * Returns the number of bytes sent on success, or errno otherwise
 679 */
 680static int send_packet(struct kiocb *iocb, struct socket *sock,
 681                       struct msghdr *m, size_t total_len)
 682{
 683        struct sock *sk = sock->sk;
 684        struct tipc_port *tport = tipc_sk_port(sk);
 685        struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
 686        long timeout_val;
 687        int res;
 688
 689        /* Handle implied connection establishment */
 690        if (unlikely(dest))
 691                return send_msg(iocb, sock, m, total_len);
 692
 693        if (total_len > TIPC_MAX_USER_MSG_SIZE)
 694                return -EMSGSIZE;
 695
 696        if (iocb)
 697                lock_sock(sk);
 698
 699        timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
 700
 701        do {
 702                if (unlikely(sock->state != SS_CONNECTED)) {
 703                        if (sock->state == SS_DISCONNECTING)
 704                                res = -EPIPE;
 705                        else
 706                                res = -ENOTCONN;
 707                        break;
 708                }
 709
 710                res = tipc_send(tport->ref, m->msg_iovlen, m->msg_iov,
 711                                total_len);
 712                if (likely(res != -ELINKCONG))
 713                        break;
 714                if (timeout_val <= 0L) {
 715                        res = timeout_val ? timeout_val : -EWOULDBLOCK;
 716                        break;
 717                }
 718                release_sock(sk);
 719                timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk),
 720                        (!tport->congested || !tport->connected), timeout_val);
 721                lock_sock(sk);
 722        } while (1);
 723
 724        if (iocb)
 725                release_sock(sk);
 726        return res;
 727}
 728
 729/**
 730 * send_stream - send stream-oriented data
 731 * @iocb: (unused)
 732 * @sock: socket structure
 733 * @m: data to send
 734 * @total_len: total length of data to be sent
 735 *
 736 * Used for SOCK_STREAM data.
 737 *
 738 * Returns the number of bytes sent on success (or partial success),
 739 * or errno if no data sent
 740 */
 741static int send_stream(struct kiocb *iocb, struct socket *sock,
 742                       struct msghdr *m, size_t total_len)
 743{
 744        struct sock *sk = sock->sk;
 745        struct tipc_port *tport = tipc_sk_port(sk);
 746        struct msghdr my_msg;
 747        struct iovec my_iov;
 748        struct iovec *curr_iov;
 749        int curr_iovlen;
 750        char __user *curr_start;
 751        u32 hdr_size;
 752        int curr_left;
 753        int bytes_to_send;
 754        int bytes_sent;
 755        int res;
 756
 757        lock_sock(sk);
 758
 759        /* Handle special cases where there is no connection */
 760        if (unlikely(sock->state != SS_CONNECTED)) {
 761                if (sock->state == SS_UNCONNECTED) {
 762                        res = send_packet(NULL, sock, m, total_len);
 763                        goto exit;
 764                } else if (sock->state == SS_DISCONNECTING) {
 765                        res = -EPIPE;
 766                        goto exit;
 767                } else {
 768                        res = -ENOTCONN;
 769                        goto exit;
 770                }
 771        }
 772
 773        if (unlikely(m->msg_name)) {
 774                res = -EISCONN;
 775                goto exit;
 776        }
 777
 778        if (total_len > (unsigned int)INT_MAX) {
 779                res = -EMSGSIZE;
 780                goto exit;
 781        }
 782
 783        /*
 784         * Send each iovec entry using one or more messages
 785         *
 786         * Note: This algorithm is good for the most likely case
 787         * (i.e. one large iovec entry), but could be improved to pass sets
 788         * of small iovec entries into send_packet().
 789         */
 790        curr_iov = m->msg_iov;
 791        curr_iovlen = m->msg_iovlen;
 792        my_msg.msg_iov = &my_iov;
 793        my_msg.msg_iovlen = 1;
 794        my_msg.msg_flags = m->msg_flags;
 795        my_msg.msg_name = NULL;
 796        bytes_sent = 0;
 797
 798        hdr_size = msg_hdr_sz(&tport->phdr);
 799
 800        while (curr_iovlen--) {
 801                curr_start = curr_iov->iov_base;
 802                curr_left = curr_iov->iov_len;
 803
 804                while (curr_left) {
 805                        bytes_to_send = tport->max_pkt - hdr_size;
 806                        if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
 807                                bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
 808                        if (curr_left < bytes_to_send)
 809                                bytes_to_send = curr_left;
 810                        my_iov.iov_base = curr_start;
 811                        my_iov.iov_len = bytes_to_send;
 812                        res = send_packet(NULL, sock, &my_msg, bytes_to_send);
 813                        if (res < 0) {
 814                                if (bytes_sent)
 815                                        res = bytes_sent;
 816                                goto exit;
 817                        }
 818                        curr_left -= bytes_to_send;
 819                        curr_start += bytes_to_send;
 820                        bytes_sent += bytes_to_send;
 821                }
 822
 823                curr_iov++;
 824        }
 825        res = bytes_sent;
 826exit:
 827        release_sock(sk);
 828        return res;
 829}
 830
 831/**
 832 * auto_connect - complete connection setup to a remote port
 833 * @sock: socket structure
 834 * @msg: peer's response message
 835 *
 836 * Returns 0 on success, errno otherwise
 837 */
 838static int auto_connect(struct socket *sock, struct tipc_msg *msg)
 839{
 840        struct tipc_sock *tsock = tipc_sk(sock->sk);
 841        struct tipc_port *p_ptr;
 842
 843        tsock->peer_name.ref = msg_origport(msg);
 844        tsock->peer_name.node = msg_orignode(msg);
 845        p_ptr = tipc_port_deref(tsock->p->ref);
 846        if (!p_ptr)
 847                return -EINVAL;
 848
 849        __tipc_connect(tsock->p->ref, p_ptr, &tsock->peer_name);
 850
 851        if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)
 852                return -EINVAL;
 853        msg_set_importance(&p_ptr->phdr, (u32)msg_importance(msg));
 854        sock->state = SS_CONNECTED;
 855        return 0;
 856}
 857
 858/**
 859 * set_orig_addr - capture sender's address for received message
 860 * @m: descriptor for message info
 861 * @msg: received message header
 862 *
 863 * Note: Address is not captured if not requested by receiver.
 864 */
 865static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
 866{
 867        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)m->msg_name;
 868
 869        if (addr) {
 870                addr->family = AF_TIPC;
 871                addr->addrtype = TIPC_ADDR_ID;
 872                memset(&addr->addr, 0, sizeof(addr->addr));
 873                addr->addr.id.ref = msg_origport(msg);
 874                addr->addr.id.node = msg_orignode(msg);
 875                addr->addr.name.domain = 0;     /* could leave uninitialized */
 876                addr->scope = 0;                /* could leave uninitialized */
 877                m->msg_namelen = sizeof(struct sockaddr_tipc);
 878        }
 879}
 880
 881/**
 882 * anc_data_recv - optionally capture ancillary data for received message
 883 * @m: descriptor for message info
 884 * @msg: received message header
 885 * @tport: TIPC port associated with message
 886 *
 887 * Note: Ancillary data is not captured if not requested by receiver.
 888 *
 889 * Returns 0 if successful, otherwise errno
 890 */
 891static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
 892                         struct tipc_port *tport)
 893{
 894        u32 anc_data[3];
 895        u32 err;
 896        u32 dest_type;
 897        int has_name;
 898        int res;
 899
 900        if (likely(m->msg_controllen == 0))
 901                return 0;
 902
 903        /* Optionally capture errored message object(s) */
 904        err = msg ? msg_errcode(msg) : 0;
 905        if (unlikely(err)) {
 906                anc_data[0] = err;
 907                anc_data[1] = msg_data_sz(msg);
 908                res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
 909                if (res)
 910                        return res;
 911                if (anc_data[1]) {
 912                        res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
 913                                       msg_data(msg));
 914                        if (res)
 915                                return res;
 916                }
 917        }
 918
 919        /* Optionally capture message destination object */
 920        dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
 921        switch (dest_type) {
 922        case TIPC_NAMED_MSG:
 923                has_name = 1;
 924                anc_data[0] = msg_nametype(msg);
 925                anc_data[1] = msg_namelower(msg);
 926                anc_data[2] = msg_namelower(msg);
 927                break;
 928        case TIPC_MCAST_MSG:
 929                has_name = 1;
 930                anc_data[0] = msg_nametype(msg);
 931                anc_data[1] = msg_namelower(msg);
 932                anc_data[2] = msg_nameupper(msg);
 933                break;
 934        case TIPC_CONN_MSG:
 935                has_name = (tport->conn_type != 0);
 936                anc_data[0] = tport->conn_type;
 937                anc_data[1] = tport->conn_instance;
 938                anc_data[2] = tport->conn_instance;
 939                break;
 940        default:
 941                has_name = 0;
 942        }
 943        if (has_name) {
 944                res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
 945                if (res)
 946                        return res;
 947        }
 948
 949        return 0;
 950}
 951
 952/**
 953 * recv_msg - receive packet-oriented message
 954 * @iocb: (unused)
 955 * @m: descriptor for message info
 956 * @buf_len: total size of user buffer area
 957 * @flags: receive flags
 958 *
 959 * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
 960 * If the complete message doesn't fit in user area, truncate it.
 961 *
 962 * Returns size of returned message data, errno otherwise
 963 */
 964static int recv_msg(struct kiocb *iocb, struct socket *sock,
 965                    struct msghdr *m, size_t buf_len, int flags)
 966{
 967        struct sock *sk = sock->sk;
 968        struct tipc_port *tport = tipc_sk_port(sk);
 969        struct sk_buff *buf;
 970        struct tipc_msg *msg;
 971        long timeout;
 972        unsigned int sz;
 973        u32 err;
 974        int res;
 975
 976        /* Catch invalid receive requests */
 977        if (unlikely(!buf_len))
 978                return -EINVAL;
 979
 980        lock_sock(sk);
 981
 982        if (unlikely(sock->state == SS_UNCONNECTED)) {
 983                res = -ENOTCONN;
 984                goto exit;
 985        }
 986
 987        /* will be updated in set_orig_addr() if needed */
 988        m->msg_namelen = 0;
 989
 990        timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
 991restart:
 992
 993        /* Look for a message in receive queue; wait if necessary */
 994        while (skb_queue_empty(&sk->sk_receive_queue)) {
 995                if (sock->state == SS_DISCONNECTING) {
 996                        res = -ENOTCONN;
 997                        goto exit;
 998                }
 999                if (timeout <= 0L) {
1000                        res = timeout ? timeout : -EWOULDBLOCK;
1001                        goto exit;
1002                }
1003                release_sock(sk);
1004                timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
1005                                                           tipc_rx_ready(sock),
1006                                                           timeout);
1007                lock_sock(sk);
1008        }
1009
1010        /* Look at first message in receive queue */
1011        buf = skb_peek(&sk->sk_receive_queue);
1012        msg = buf_msg(buf);
1013        sz = msg_data_sz(msg);
1014        err = msg_errcode(msg);
1015
1016        /* Discard an empty non-errored message & try again */
1017        if ((!sz) && (!err)) {
1018                advance_rx_queue(sk);
1019                goto restart;
1020        }
1021
1022        /* Capture sender's address (optional) */
1023        set_orig_addr(m, msg);
1024
1025        /* Capture ancillary data (optional) */
1026        res = anc_data_recv(m, msg, tport);
1027        if (res)
1028                goto exit;
1029
1030        /* Capture message data (if valid) & compute return value (always) */
1031        if (!err) {
1032                if (unlikely(buf_len < sz)) {
1033                        sz = buf_len;
1034                        m->msg_flags |= MSG_TRUNC;
1035                }
1036                res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg),
1037                                              m->msg_iov, sz);
1038                if (res)
1039                        goto exit;
1040                res = sz;
1041        } else {
1042                if ((sock->state == SS_READY) ||
1043                    ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1044                        res = 0;
1045                else
1046                        res = -ECONNRESET;
1047        }
1048
1049        /* Consume received message (optional) */
1050        if (likely(!(flags & MSG_PEEK))) {
1051                if ((sock->state != SS_READY) &&
1052                    (++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
1053                        tipc_acknowledge(tport->ref, tport->conn_unacked);
1054                advance_rx_queue(sk);
1055        }
1056exit:
1057        release_sock(sk);
1058        return res;
1059}
1060
1061/**
1062 * recv_stream - receive stream-oriented data
1063 * @iocb: (unused)
1064 * @m: descriptor for message info
1065 * @buf_len: total size of user buffer area
1066 * @flags: receive flags
1067 *
1068 * Used for SOCK_STREAM messages only.  If not enough data is available
1069 * will optionally wait for more; never truncates data.
1070 *
1071 * Returns size of returned message data, errno otherwise
1072 */
1073static int recv_stream(struct kiocb *iocb, struct socket *sock,
1074                       struct msghdr *m, size_t buf_len, int flags)
1075{
1076        struct sock *sk = sock->sk;
1077        struct tipc_port *tport = tipc_sk_port(sk);
1078        struct sk_buff *buf;
1079        struct tipc_msg *msg;
1080        long timeout;
1081        unsigned int sz;
1082        int sz_to_copy, target, needed;
1083        int sz_copied = 0;
1084        u32 err;
1085        int res = 0;
1086
1087        /* Catch invalid receive attempts */
1088        if (unlikely(!buf_len))
1089                return -EINVAL;
1090
1091        lock_sock(sk);
1092
1093        if (unlikely((sock->state == SS_UNCONNECTED))) {
1094                res = -ENOTCONN;
1095                goto exit;
1096        }
1097
1098        /* will be updated in set_orig_addr() if needed */
1099        m->msg_namelen = 0;
1100
1101        target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1102        timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1103
1104restart:
1105        /* Look for a message in receive queue; wait if necessary */
1106        while (skb_queue_empty(&sk->sk_receive_queue)) {
1107                if (sock->state == SS_DISCONNECTING) {
1108                        res = -ENOTCONN;
1109                        goto exit;
1110                }
1111                if (timeout <= 0L) {
1112                        res = timeout ? timeout : -EWOULDBLOCK;
1113                        goto exit;
1114                }
1115                release_sock(sk);
1116                timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
1117                                                           tipc_rx_ready(sock),
1118                                                           timeout);
1119                lock_sock(sk);
1120        }
1121
1122        /* Look at first message in receive queue */
1123        buf = skb_peek(&sk->sk_receive_queue);
1124        msg = buf_msg(buf);
1125        sz = msg_data_sz(msg);
1126        err = msg_errcode(msg);
1127
1128        /* Discard an empty non-errored message & try again */
1129        if ((!sz) && (!err)) {
1130                advance_rx_queue(sk);
1131                goto restart;
1132        }
1133
1134        /* Optionally capture sender's address & ancillary data of first msg */
1135        if (sz_copied == 0) {
1136                set_orig_addr(m, msg);
1137                res = anc_data_recv(m, msg, tport);
1138                if (res)
1139                        goto exit;
1140        }
1141
1142        /* Capture message data (if valid) & compute return value (always) */
1143        if (!err) {
1144                u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1145
1146                sz -= offset;
1147                needed = (buf_len - sz_copied);
1148                sz_to_copy = (sz <= needed) ? sz : needed;
1149
1150                res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset,
1151                                              m->msg_iov, sz_to_copy);
1152                if (res)
1153                        goto exit;
1154
1155                sz_copied += sz_to_copy;
1156
1157                if (sz_to_copy < sz) {
1158                        if (!(flags & MSG_PEEK))
1159                                TIPC_SKB_CB(buf)->handle =
1160                                (void *)(unsigned long)(offset + sz_to_copy);
1161                        goto exit;
1162                }
1163        } else {
1164                if (sz_copied != 0)
1165                        goto exit; /* can't add error msg to valid data */
1166
1167                if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1168                        res = 0;
1169                else
1170                        res = -ECONNRESET;
1171        }
1172
1173        /* Consume received message (optional) */
1174        if (likely(!(flags & MSG_PEEK))) {
1175                if (unlikely(++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
1176                        tipc_acknowledge(tport->ref, tport->conn_unacked);
1177                advance_rx_queue(sk);
1178        }
1179
1180        /* Loop around if more data is required */
1181        if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1182            (!skb_queue_empty(&sk->sk_receive_queue) ||
1183            (sz_copied < target)) &&    /* and more is ready or required */
1184            (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1185            (!err))                     /* and haven't reached a FIN */
1186                goto restart;
1187
1188exit:
1189        release_sock(sk);
1190        return sz_copied ? sz_copied : res;
1191}
1192
1193/**
1194 * tipc_write_space - wake up thread if port congestion is released
1195 * @sk: socket
1196 */
1197static void tipc_write_space(struct sock *sk)
1198{
1199        struct socket_wq *wq;
1200
1201        rcu_read_lock();
1202        wq = rcu_dereference(sk->sk_wq);
1203        if (wq_has_sleeper(wq))
1204                wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1205                                                POLLWRNORM | POLLWRBAND);
1206        rcu_read_unlock();
1207}
1208
1209/**
1210 * tipc_data_ready - wake up threads to indicate messages have been received
1211 * @sk: socket
1212 * @len: the length of messages
1213 */
1214static void tipc_data_ready(struct sock *sk, int len)
1215{
1216        struct socket_wq *wq;
1217
1218        rcu_read_lock();
1219        wq = rcu_dereference(sk->sk_wq);
1220        if (wq_has_sleeper(wq))
1221                wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1222                                                POLLRDNORM | POLLRDBAND);
1223        rcu_read_unlock();
1224}
1225
1226/**
1227 * filter_connect - Handle all incoming messages for a connection-based socket
1228 * @tsock: TIPC socket
1229 * @msg: message
1230 *
1231 * Returns TIPC error status code and socket error status code
1232 * once it encounters some errors
1233 */
1234static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf)
1235{
1236        struct socket *sock = tsock->sk.sk_socket;
1237        struct tipc_msg *msg = buf_msg(*buf);
1238        struct sock *sk = &tsock->sk;
1239        u32 retval = TIPC_ERR_NO_PORT;
1240        int res;
1241
1242        if (msg_mcast(msg))
1243                return retval;
1244
1245        switch ((int)sock->state) {
1246        case SS_CONNECTED:
1247                /* Accept only connection-based messages sent by peer */
1248                if (msg_connected(msg) && tipc_port_peer_msg(tsock->p, msg)) {
1249                        if (unlikely(msg_errcode(msg))) {
1250                                sock->state = SS_DISCONNECTING;
1251                                __tipc_disconnect(tsock->p);
1252                        }
1253                        retval = TIPC_OK;
1254                }
1255                break;
1256        case SS_CONNECTING:
1257                /* Accept only ACK or NACK message */
1258                if (unlikely(msg_errcode(msg))) {
1259                        sock->state = SS_DISCONNECTING;
1260                        sk->sk_err = ECONNREFUSED;
1261                        retval = TIPC_OK;
1262                        break;
1263                }
1264
1265                if (unlikely(!msg_connected(msg)))
1266                        break;
1267
1268                res = auto_connect(sock, msg);
1269                if (res) {
1270                        sock->state = SS_DISCONNECTING;
1271                        sk->sk_err = -res;
1272                        retval = TIPC_OK;
1273                        break;
1274                }
1275
1276                /* If an incoming message is an 'ACK-', it should be
1277                 * discarded here because it doesn't contain useful
1278                 * data. In addition, we should try to wake up
1279                 * connect() routine if sleeping.
1280                 */
1281                if (msg_data_sz(msg) == 0) {
1282                        kfree_skb(*buf);
1283                        *buf = NULL;
1284                        if (waitqueue_active(sk_sleep(sk)))
1285                                wake_up_interruptible(sk_sleep(sk));
1286                }
1287                retval = TIPC_OK;
1288                break;
1289        case SS_LISTENING:
1290        case SS_UNCONNECTED:
1291                /* Accept only SYN message */
1292                if (!msg_connected(msg) && !(msg_errcode(msg)))
1293                        retval = TIPC_OK;
1294                break;
1295        case SS_DISCONNECTING:
1296                break;
1297        default:
1298                pr_err("Unknown socket state %u\n", sock->state);
1299        }
1300        return retval;
1301}
1302
1303/**
1304 * rcvbuf_limit - get proper overload limit of socket receive queue
1305 * @sk: socket
1306 * @buf: message
1307 *
1308 * For all connection oriented messages, irrespective of importance,
1309 * the default overload value (i.e. 67MB) is set as limit.
1310 *
1311 * For all connectionless messages, by default new queue limits are
1312 * as belows:
1313 *
1314 * TIPC_LOW_IMPORTANCE       (4 MB)
1315 * TIPC_MEDIUM_IMPORTANCE    (8 MB)
1316 * TIPC_HIGH_IMPORTANCE      (16 MB)
1317 * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1318 *
1319 * Returns overload limit according to corresponding message importance
1320 */
1321static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1322{
1323        struct tipc_msg *msg = buf_msg(buf);
1324        unsigned int limit;
1325
1326        if (msg_connected(msg))
1327                limit = sysctl_tipc_rmem[2];
1328        else
1329                limit = sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1330                        msg_importance(msg);
1331        return limit;
1332}
1333
1334/**
1335 * filter_rcv - validate incoming message
1336 * @sk: socket
1337 * @buf: message
1338 *
1339 * Enqueues message on receive queue if acceptable; optionally handles
1340 * disconnect indication for a connected socket.
1341 *
1342 * Called with socket lock already taken; port lock may also be taken.
1343 *
1344 * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1345 */
1346static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1347{
1348        struct socket *sock = sk->sk_socket;
1349        struct tipc_msg *msg = buf_msg(buf);
1350        unsigned int limit = rcvbuf_limit(sk, buf);
1351        u32 res = TIPC_OK;
1352
1353        /* Reject message if it is wrong sort of message for socket */
1354        if (msg_type(msg) > TIPC_DIRECT_MSG)
1355                return TIPC_ERR_NO_PORT;
1356
1357        if (sock->state == SS_READY) {
1358                if (msg_connected(msg))
1359                        return TIPC_ERR_NO_PORT;
1360        } else {
1361                res = filter_connect(tipc_sk(sk), &buf);
1362                if (res != TIPC_OK || buf == NULL)
1363                        return res;
1364        }
1365
1366        /* Reject message if there isn't room to queue it */
1367        if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1368                return TIPC_ERR_OVERLOAD;
1369
1370        /* Enqueue message */
1371        TIPC_SKB_CB(buf)->handle = 0;
1372        __skb_queue_tail(&sk->sk_receive_queue, buf);
1373        skb_set_owner_r(buf, sk);
1374
1375        sk->sk_data_ready(sk, 0);
1376        return TIPC_OK;
1377}
1378
1379/**
1380 * backlog_rcv - handle incoming message from backlog queue
1381 * @sk: socket
1382 * @buf: message
1383 *
1384 * Caller must hold socket lock, but not port lock.
1385 *
1386 * Returns 0
1387 */
1388static int backlog_rcv(struct sock *sk, struct sk_buff *buf)
1389{
1390        u32 res;
1391
1392        res = filter_rcv(sk, buf);
1393        if (res)
1394                tipc_reject_msg(buf, res);
1395        return 0;
1396}
1397
1398/**
1399 * dispatch - handle incoming message
1400 * @tport: TIPC port that received message
1401 * @buf: message
1402 *
1403 * Called with port lock already taken.
1404 *
1405 * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1406 */
1407static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
1408{
1409        struct sock *sk = tport->sk;
1410        u32 res;
1411
1412        /*
1413         * Process message if socket is unlocked; otherwise add to backlog queue
1414         *
1415         * This code is based on sk_receive_skb(), but must be distinct from it
1416         * since a TIPC-specific filter/reject mechanism is utilized
1417         */
1418        bh_lock_sock(sk);
1419        if (!sock_owned_by_user(sk)) {
1420                res = filter_rcv(sk, buf);
1421        } else {
1422                if (sk_add_backlog(sk, buf, rcvbuf_limit(sk, buf)))
1423                        res = TIPC_ERR_OVERLOAD;
1424                else
1425                        res = TIPC_OK;
1426        }
1427        bh_unlock_sock(sk);
1428
1429        return res;
1430}
1431
1432/**
1433 * wakeupdispatch - wake up port after congestion
1434 * @tport: port to wakeup
1435 *
1436 * Called with port lock already taken.
1437 */
1438static void wakeupdispatch(struct tipc_port *tport)
1439{
1440        struct sock *sk = tport->sk;
1441
1442        sk->sk_write_space(sk);
1443}
1444
1445/**
1446 * connect - establish a connection to another TIPC port
1447 * @sock: socket structure
1448 * @dest: socket address for destination port
1449 * @destlen: size of socket address data structure
1450 * @flags: file-related flags associated with socket
1451 *
1452 * Returns 0 on success, errno otherwise
1453 */
1454static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1455                   int flags)
1456{
1457        struct sock *sk = sock->sk;
1458        struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1459        struct msghdr m = {NULL,};
1460        unsigned int timeout;
1461        int res;
1462
1463        lock_sock(sk);
1464
1465        /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1466        if (sock->state == SS_READY) {
1467                res = -EOPNOTSUPP;
1468                goto exit;
1469        }
1470
1471        /*
1472         * Reject connection attempt using multicast address
1473         *
1474         * Note: send_msg() validates the rest of the address fields,
1475         *       so there's no need to do it here
1476         */
1477        if (dst->addrtype == TIPC_ADDR_MCAST) {
1478                res = -EINVAL;
1479                goto exit;
1480        }
1481
1482        timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1483
1484        switch (sock->state) {
1485        case SS_UNCONNECTED:
1486                /* Send a 'SYN-' to destination */
1487                m.msg_name = dest;
1488                m.msg_namelen = destlen;
1489
1490                /* If connect is in non-blocking case, set MSG_DONTWAIT to
1491                 * indicate send_msg() is never blocked.
1492                 */
1493                if (!timeout)
1494                        m.msg_flags = MSG_DONTWAIT;
1495
1496                res = send_msg(NULL, sock, &m, 0);
1497                if ((res < 0) && (res != -EWOULDBLOCK))
1498                        goto exit;
1499
1500                /* Just entered SS_CONNECTING state; the only
1501                 * difference is that return value in non-blocking
1502                 * case is EINPROGRESS, rather than EALREADY.
1503                 */
1504                res = -EINPROGRESS;
1505                break;
1506        case SS_CONNECTING:
1507                res = -EALREADY;
1508                break;
1509        case SS_CONNECTED:
1510                res = -EISCONN;
1511                break;
1512        default:
1513                res = -EINVAL;
1514                goto exit;
1515        }
1516
1517        if (sock->state == SS_CONNECTING) {
1518                if (!timeout)
1519                        goto exit;
1520
1521                /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1522                release_sock(sk);
1523                res = wait_event_interruptible_timeout(*sk_sleep(sk),
1524                                sock->state != SS_CONNECTING,
1525                                timeout ? (long)msecs_to_jiffies(timeout)
1526                                        : MAX_SCHEDULE_TIMEOUT);
1527                lock_sock(sk);
1528                if (res <= 0) {
1529                        if (res == 0)
1530                                res = -ETIMEDOUT;
1531                        else
1532                                ; /* leave "res" unchanged */
1533                        goto exit;
1534                }
1535        }
1536
1537        if (unlikely(sock->state == SS_DISCONNECTING))
1538                res = sock_error(sk);
1539        else
1540                res = 0;
1541
1542exit:
1543        release_sock(sk);
1544        return res;
1545}
1546
1547/**
1548 * listen - allow socket to listen for incoming connections
1549 * @sock: socket structure
1550 * @len: (unused)
1551 *
1552 * Returns 0 on success, errno otherwise
1553 */
1554static int listen(struct socket *sock, int len)
1555{
1556        struct sock *sk = sock->sk;
1557        int res;
1558
1559        lock_sock(sk);
1560
1561        if (sock->state != SS_UNCONNECTED)
1562                res = -EINVAL;
1563        else {
1564                sock->state = SS_LISTENING;
1565                res = 0;
1566        }
1567
1568        release_sock(sk);
1569        return res;
1570}
1571
1572/**
1573 * accept - wait for connection request
1574 * @sock: listening socket
1575 * @newsock: new socket that is to be connected
1576 * @flags: file-related flags associated with socket
1577 *
1578 * Returns 0 on success, errno otherwise
1579 */
1580static int accept(struct socket *sock, struct socket *new_sock, int flags)
1581{
1582        struct sock *new_sk, *sk = sock->sk;
1583        struct sk_buff *buf;
1584        struct tipc_sock *new_tsock;
1585        struct tipc_port *new_tport;
1586        struct tipc_msg *msg;
1587        u32 new_ref;
1588
1589        int res;
1590
1591        lock_sock(sk);
1592
1593        if (sock->state != SS_LISTENING) {
1594                res = -EINVAL;
1595                goto exit;
1596        }
1597
1598        while (skb_queue_empty(&sk->sk_receive_queue)) {
1599                if (flags & O_NONBLOCK) {
1600                        res = -EWOULDBLOCK;
1601                        goto exit;
1602                }
1603                release_sock(sk);
1604                res = wait_event_interruptible(*sk_sleep(sk),
1605                                (!skb_queue_empty(&sk->sk_receive_queue)));
1606                lock_sock(sk);
1607                if (res)
1608                        goto exit;
1609        }
1610
1611        buf = skb_peek(&sk->sk_receive_queue);
1612
1613        res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
1614        if (res)
1615                goto exit;
1616
1617        new_sk = new_sock->sk;
1618        new_tsock = tipc_sk(new_sk);
1619        new_tport = new_tsock->p;
1620        new_ref = new_tport->ref;
1621        msg = buf_msg(buf);
1622
1623        /* we lock on new_sk; but lockdep sees the lock on sk */
1624        lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
1625
1626        /*
1627         * Reject any stray messages received by new socket
1628         * before the socket lock was taken (very, very unlikely)
1629         */
1630        reject_rx_queue(new_sk);
1631
1632        /* Connect new socket to it's peer */
1633        new_tsock->peer_name.ref = msg_origport(msg);
1634        new_tsock->peer_name.node = msg_orignode(msg);
1635        tipc_connect(new_ref, &new_tsock->peer_name);
1636        new_sock->state = SS_CONNECTED;
1637
1638        tipc_set_portimportance(new_ref, msg_importance(msg));
1639        if (msg_named(msg)) {
1640                new_tport->conn_type = msg_nametype(msg);
1641                new_tport->conn_instance = msg_nameinst(msg);
1642        }
1643
1644        /*
1645         * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1646         * Respond to 'SYN+' by queuing it on new socket.
1647         */
1648        if (!msg_data_sz(msg)) {
1649                struct msghdr m = {NULL,};
1650
1651                advance_rx_queue(sk);
1652                send_packet(NULL, new_sock, &m, 0);
1653        } else {
1654                __skb_dequeue(&sk->sk_receive_queue);
1655                __skb_queue_head(&new_sk->sk_receive_queue, buf);
1656                skb_set_owner_r(buf, new_sk);
1657        }
1658        release_sock(new_sk);
1659
1660exit:
1661        release_sock(sk);
1662        return res;
1663}
1664
1665/**
1666 * shutdown - shutdown socket connection
1667 * @sock: socket structure
1668 * @how: direction to close (must be SHUT_RDWR)
1669 *
1670 * Terminates connection (if necessary), then purges socket's receive queue.
1671 *
1672 * Returns 0 on success, errno otherwise
1673 */
1674static int shutdown(struct socket *sock, int how)
1675{
1676        struct sock *sk = sock->sk;
1677        struct tipc_port *tport = tipc_sk_port(sk);
1678        struct sk_buff *buf;
1679        int res;
1680
1681        if (how != SHUT_RDWR)
1682                return -EINVAL;
1683
1684        lock_sock(sk);
1685
1686        switch (sock->state) {
1687        case SS_CONNECTING:
1688        case SS_CONNECTED:
1689
1690restart:
1691                /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1692                buf = __skb_dequeue(&sk->sk_receive_queue);
1693                if (buf) {
1694                        if (TIPC_SKB_CB(buf)->handle != 0) {
1695                                kfree_skb(buf);
1696                                goto restart;
1697                        }
1698                        tipc_disconnect(tport->ref);
1699                        tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
1700                } else {
1701                        tipc_shutdown(tport->ref);
1702                }
1703
1704                sock->state = SS_DISCONNECTING;
1705
1706                /* fall through */
1707
1708        case SS_DISCONNECTING:
1709
1710                /* Discard any unreceived messages */
1711                __skb_queue_purge(&sk->sk_receive_queue);
1712
1713                /* Wake up anyone sleeping in poll */
1714                sk->sk_state_change(sk);
1715                res = 0;
1716                break;
1717
1718        default:
1719                res = -ENOTCONN;
1720        }
1721
1722        release_sock(sk);
1723        return res;
1724}
1725
1726/**
1727 * setsockopt - set socket option
1728 * @sock: socket structure
1729 * @lvl: option level
1730 * @opt: option identifier
1731 * @ov: pointer to new option value
1732 * @ol: length of option value
1733 *
1734 * For stream sockets only, accepts and ignores all IPPROTO_TCP options
1735 * (to ease compatibility).
1736 *
1737 * Returns 0 on success, errno otherwise
1738 */
1739static int setsockopt(struct socket *sock, int lvl, int opt, char __user *ov,
1740                      unsigned int ol)
1741{
1742        struct sock *sk = sock->sk;
1743        struct tipc_port *tport = tipc_sk_port(sk);
1744        u32 value;
1745        int res;
1746
1747        if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1748                return 0;
1749        if (lvl != SOL_TIPC)
1750                return -ENOPROTOOPT;
1751        if (ol < sizeof(value))
1752                return -EINVAL;
1753        res = get_user(value, (u32 __user *)ov);
1754        if (res)
1755                return res;
1756
1757        lock_sock(sk);
1758
1759        switch (opt) {
1760        case TIPC_IMPORTANCE:
1761                res = tipc_set_portimportance(tport->ref, value);
1762                break;
1763        case TIPC_SRC_DROPPABLE:
1764                if (sock->type != SOCK_STREAM)
1765                        res = tipc_set_portunreliable(tport->ref, value);
1766                else
1767                        res = -ENOPROTOOPT;
1768                break;
1769        case TIPC_DEST_DROPPABLE:
1770                res = tipc_set_portunreturnable(tport->ref, value);
1771                break;
1772        case TIPC_CONN_TIMEOUT:
1773                tipc_sk(sk)->conn_timeout = value;
1774                /* no need to set "res", since already 0 at this point */
1775                break;
1776        default:
1777                res = -EINVAL;
1778        }
1779
1780        release_sock(sk);
1781
1782        return res;
1783}
1784
1785/**
1786 * getsockopt - get socket option
1787 * @sock: socket structure
1788 * @lvl: option level
1789 * @opt: option identifier
1790 * @ov: receptacle for option value
1791 * @ol: receptacle for length of option value
1792 *
1793 * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
1794 * (to ease compatibility).
1795 *
1796 * Returns 0 on success, errno otherwise
1797 */
1798static int getsockopt(struct socket *sock, int lvl, int opt, char __user *ov,
1799                      int __user *ol)
1800{
1801        struct sock *sk = sock->sk;
1802        struct tipc_port *tport = tipc_sk_port(sk);
1803        int len;
1804        u32 value;
1805        int res;
1806
1807        if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1808                return put_user(0, ol);
1809        if (lvl != SOL_TIPC)
1810                return -ENOPROTOOPT;
1811        res = get_user(len, ol);
1812        if (res)
1813                return res;
1814
1815        lock_sock(sk);
1816
1817        switch (opt) {
1818        case TIPC_IMPORTANCE:
1819                res = tipc_portimportance(tport->ref, &value);
1820                break;
1821        case TIPC_SRC_DROPPABLE:
1822                res = tipc_portunreliable(tport->ref, &value);
1823                break;
1824        case TIPC_DEST_DROPPABLE:
1825                res = tipc_portunreturnable(tport->ref, &value);
1826                break;
1827        case TIPC_CONN_TIMEOUT:
1828                value = tipc_sk(sk)->conn_timeout;
1829                /* no need to set "res", since already 0 at this point */
1830                break;
1831        case TIPC_NODE_RECVQ_DEPTH:
1832                value = 0; /* was tipc_queue_size, now obsolete */
1833                break;
1834        case TIPC_SOCK_RECVQ_DEPTH:
1835                value = skb_queue_len(&sk->sk_receive_queue);
1836                break;
1837        default:
1838                res = -EINVAL;
1839        }
1840
1841        release_sock(sk);
1842
1843        if (res)
1844                return res;     /* "get" failed */
1845
1846        if (len < sizeof(value))
1847                return -EINVAL;
1848
1849        if (copy_to_user(ov, &value, sizeof(value)))
1850                return -EFAULT;
1851
1852        return put_user(sizeof(value), ol);
1853}
1854
1855/* Protocol switches for the various types of TIPC sockets */
1856
1857static const struct proto_ops msg_ops = {
1858        .owner          = THIS_MODULE,
1859        .family         = AF_TIPC,
1860        .release        = release,
1861        .bind           = bind,
1862        .connect        = connect,
1863        .socketpair     = sock_no_socketpair,
1864        .accept         = sock_no_accept,
1865        .getname        = get_name,
1866        .poll           = poll,
1867        .ioctl          = sock_no_ioctl,
1868        .listen         = sock_no_listen,
1869        .shutdown       = shutdown,
1870        .setsockopt     = setsockopt,
1871        .getsockopt     = getsockopt,
1872        .sendmsg        = send_msg,
1873        .recvmsg        = recv_msg,
1874        .mmap           = sock_no_mmap,
1875        .sendpage       = sock_no_sendpage
1876};
1877
1878static const struct proto_ops packet_ops = {
1879        .owner          = THIS_MODULE,
1880        .family         = AF_TIPC,
1881        .release        = release,
1882        .bind           = bind,
1883        .connect        = connect,
1884        .socketpair     = sock_no_socketpair,
1885        .accept         = accept,
1886        .getname        = get_name,
1887        .poll           = poll,
1888        .ioctl          = sock_no_ioctl,
1889        .listen         = listen,
1890        .shutdown       = shutdown,
1891        .setsockopt     = setsockopt,
1892        .getsockopt     = getsockopt,
1893        .sendmsg        = send_packet,
1894        .recvmsg        = recv_msg,
1895        .mmap           = sock_no_mmap,
1896        .sendpage       = sock_no_sendpage
1897};
1898
1899static const struct proto_ops stream_ops = {
1900        .owner          = THIS_MODULE,
1901        .family         = AF_TIPC,
1902        .release        = release,
1903        .bind           = bind,
1904        .connect        = connect,
1905        .socketpair     = sock_no_socketpair,
1906        .accept         = accept,
1907        .getname        = get_name,
1908        .poll           = poll,
1909        .ioctl          = sock_no_ioctl,
1910        .listen         = listen,
1911        .shutdown       = shutdown,
1912        .setsockopt     = setsockopt,
1913        .getsockopt     = getsockopt,
1914        .sendmsg        = send_stream,
1915        .recvmsg        = recv_stream,
1916        .mmap           = sock_no_mmap,
1917        .sendpage       = sock_no_sendpage
1918};
1919
1920static const struct net_proto_family tipc_family_ops = {
1921        .owner          = THIS_MODULE,
1922        .family         = AF_TIPC,
1923        .create         = tipc_sk_create
1924};
1925
1926static struct proto tipc_proto = {
1927        .name           = "TIPC",
1928        .owner          = THIS_MODULE,
1929        .obj_size       = sizeof(struct tipc_sock),
1930        .sysctl_rmem    = sysctl_tipc_rmem
1931};
1932
1933static struct proto tipc_proto_kern = {
1934        .name           = "TIPC",
1935        .obj_size       = sizeof(struct tipc_sock),
1936        .sysctl_rmem    = sysctl_tipc_rmem
1937};
1938
1939/**
1940 * tipc_socket_init - initialize TIPC socket interface
1941 *
1942 * Returns 0 on success, errno otherwise
1943 */
1944int tipc_socket_init(void)
1945{
1946        int res;
1947
1948        res = proto_register(&tipc_proto, 1);
1949        if (res) {
1950                pr_err("Failed to register TIPC protocol type\n");
1951                goto out;
1952        }
1953
1954        res = sock_register(&tipc_family_ops);
1955        if (res) {
1956                pr_err("Failed to register TIPC socket type\n");
1957                proto_unregister(&tipc_proto);
1958                goto out;
1959        }
1960
1961        sockets_enabled = 1;
1962 out:
1963        return res;
1964}
1965
1966/**
1967 * tipc_socket_stop - stop TIPC socket interface
1968 */
1969void tipc_socket_stop(void)
1970{
1971        if (!sockets_enabled)
1972                return;
1973
1974        sockets_enabled = 0;
1975        sock_unregister(tipc_family_ops.family);
1976        proto_unregister(&tipc_proto);
1977}
1978