linux/net/tipc/socket.c
<<
>>
Prefs
   1/*
   2 * net/tipc/socket.c: TIPC socket API
   3 *
   4 * Copyright (c) 2001-2007, Ericsson AB
   5 * Copyright (c) 2004-2008, 2010-2011, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include "core.h"
  38#include "port.h"
  39
  40#include <linux/export.h>
  41#include <net/sock.h>
  42
  43#define SS_LISTENING    -1      /* socket is listening */
  44#define SS_READY        -2      /* socket is connectionless */
  45
  46#define OVERLOAD_LIMIT_BASE     5000
  47#define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
  48
  49struct tipc_sock {
  50        struct sock sk;
  51        struct tipc_port *p;
  52        struct tipc_portid peer_name;
  53        unsigned int conn_timeout;
  54};
  55
  56#define tipc_sk(sk) ((struct tipc_sock *)(sk))
  57#define tipc_sk_port(sk) (tipc_sk(sk)->p)
  58
  59#define tipc_rx_ready(sock) (!skb_queue_empty(&sock->sk->sk_receive_queue) || \
  60                        (sock->state == SS_DISCONNECTING))
  61
  62static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
  63static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
  64static void wakeupdispatch(struct tipc_port *tport);
  65
  66static const struct proto_ops packet_ops;
  67static const struct proto_ops stream_ops;
  68static const struct proto_ops msg_ops;
  69
  70static struct proto tipc_proto;
  71
  72static int sockets_enabled;
  73
  74static atomic_t tipc_queue_size = ATOMIC_INIT(0);
  75
  76/*
  77 * Revised TIPC socket locking policy:
  78 *
  79 * Most socket operations take the standard socket lock when they start
  80 * and hold it until they finish (or until they need to sleep).  Acquiring
  81 * this lock grants the owner exclusive access to the fields of the socket
  82 * data structures, with the exception of the backlog queue.  A few socket
  83 * operations can be done without taking the socket lock because they only
  84 * read socket information that never changes during the life of the socket.
  85 *
  86 * Socket operations may acquire the lock for the associated TIPC port if they
  87 * need to perform an operation on the port.  If any routine needs to acquire
  88 * both the socket lock and the port lock it must take the socket lock first
  89 * to avoid the risk of deadlock.
  90 *
  91 * The dispatcher handling incoming messages cannot grab the socket lock in
  92 * the standard fashion, since invoked it runs at the BH level and cannot block.
  93 * Instead, it checks to see if the socket lock is currently owned by someone,
  94 * and either handles the message itself or adds it to the socket's backlog
  95 * queue; in the latter case the queued message is processed once the process
  96 * owning the socket lock releases it.
  97 *
  98 * NOTE: Releasing the socket lock while an operation is sleeping overcomes
  99 * the problem of a blocked socket operation preventing any other operations
 100 * from occurring.  However, applications must be careful if they have
 101 * multiple threads trying to send (or receive) on the same socket, as these
 102 * operations might interfere with each other.  For example, doing a connect
 103 * and a receive at the same time might allow the receive to consume the
 104 * ACK message meant for the connect.  While additional work could be done
 105 * to try and overcome this, it doesn't seem to be worthwhile at the present.
 106 *
 107 * NOTE: Releasing the socket lock while an operation is sleeping also ensures
 108 * that another operation that must be performed in a non-blocking manner is
 109 * not delayed for very long because the lock has already been taken.
 110 *
 111 * NOTE: This code assumes that certain fields of a port/socket pair are
 112 * constant over its lifetime; such fields can be examined without taking
 113 * the socket lock and/or port lock, and do not need to be re-read even
 114 * after resuming processing after waiting.  These fields include:
 115 *   - socket type
 116 *   - pointer to socket sk structure (aka tipc_sock structure)
 117 *   - pointer to port structure
 118 *   - port reference
 119 */
 120
 121/**
 122 * advance_rx_queue - discard first buffer in socket receive queue
 123 *
 124 * Caller must hold socket lock
 125 */
 126static void advance_rx_queue(struct sock *sk)
 127{
 128        kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
 129        atomic_dec(&tipc_queue_size);
 130}
 131
 132/**
 133 * discard_rx_queue - discard all buffers in socket receive queue
 134 *
 135 * Caller must hold socket lock
 136 */
 137static void discard_rx_queue(struct sock *sk)
 138{
 139        struct sk_buff *buf;
 140
 141        while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
 142                atomic_dec(&tipc_queue_size);
 143                kfree_skb(buf);
 144        }
 145}
 146
 147/**
 148 * reject_rx_queue - reject all buffers in socket receive queue
 149 *
 150 * Caller must hold socket lock
 151 */
 152static void reject_rx_queue(struct sock *sk)
 153{
 154        struct sk_buff *buf;
 155
 156        while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
 157                tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
 158                atomic_dec(&tipc_queue_size);
 159        }
 160}
 161
 162/**
 163 * tipc_create - create a TIPC socket
 164 * @net: network namespace (must be default network)
 165 * @sock: pre-allocated socket structure
 166 * @protocol: protocol indicator (must be 0)
 167 * @kern: caused by kernel or by userspace?
 168 *
 169 * This routine creates additional data structures used by the TIPC socket,
 170 * initializes them, and links them together.
 171 *
 172 * Returns 0 on success, errno otherwise
 173 */
 174static int tipc_create(struct net *net, struct socket *sock, int protocol,
 175                       int kern)
 176{
 177        const struct proto_ops *ops;
 178        socket_state state;
 179        struct sock *sk;
 180        struct tipc_port *tp_ptr;
 181
 182        /* Validate arguments */
 183        if (unlikely(protocol != 0))
 184                return -EPROTONOSUPPORT;
 185
 186        switch (sock->type) {
 187        case SOCK_STREAM:
 188                ops = &stream_ops;
 189                state = SS_UNCONNECTED;
 190                break;
 191        case SOCK_SEQPACKET:
 192                ops = &packet_ops;
 193                state = SS_UNCONNECTED;
 194                break;
 195        case SOCK_DGRAM:
 196        case SOCK_RDM:
 197                ops = &msg_ops;
 198                state = SS_READY;
 199                break;
 200        default:
 201                return -EPROTOTYPE;
 202        }
 203
 204        /* Allocate socket's protocol area */
 205        sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
 206        if (sk == NULL)
 207                return -ENOMEM;
 208
 209        /* Allocate TIPC port for socket to use */
 210        tp_ptr = tipc_createport_raw(sk, &dispatch, &wakeupdispatch,
 211                                     TIPC_LOW_IMPORTANCE);
 212        if (unlikely(!tp_ptr)) {
 213                sk_free(sk);
 214                return -ENOMEM;
 215        }
 216
 217        /* Finish initializing socket data structures */
 218        sock->ops = ops;
 219        sock->state = state;
 220
 221        sock_init_data(sock, sk);
 222        sk->sk_backlog_rcv = backlog_rcv;
 223        tipc_sk(sk)->p = tp_ptr;
 224        tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;
 225
 226        spin_unlock_bh(tp_ptr->lock);
 227
 228        if (sock->state == SS_READY) {
 229                tipc_set_portunreturnable(tp_ptr->ref, 1);
 230                if (sock->type == SOCK_DGRAM)
 231                        tipc_set_portunreliable(tp_ptr->ref, 1);
 232        }
 233
 234        return 0;
 235}
 236
 237/**
 238 * release - destroy a TIPC socket
 239 * @sock: socket to destroy
 240 *
 241 * This routine cleans up any messages that are still queued on the socket.
 242 * For DGRAM and RDM socket types, all queued messages are rejected.
 243 * For SEQPACKET and STREAM socket types, the first message is rejected
 244 * and any others are discarded.  (If the first message on a STREAM socket
 245 * is partially-read, it is discarded and the next one is rejected instead.)
 246 *
 247 * NOTE: Rejected messages are not necessarily returned to the sender!  They
 248 * are returned or discarded according to the "destination droppable" setting
 249 * specified for the message by the sender.
 250 *
 251 * Returns 0 on success, errno otherwise
 252 */
 253static int release(struct socket *sock)
 254{
 255        struct sock *sk = sock->sk;
 256        struct tipc_port *tport;
 257        struct sk_buff *buf;
 258        int res;
 259
 260        /*
 261         * Exit if socket isn't fully initialized (occurs when a failed accept()
 262         * releases a pre-allocated child socket that was never used)
 263         */
 264        if (sk == NULL)
 265                return 0;
 266
 267        tport = tipc_sk_port(sk);
 268        lock_sock(sk);
 269
 270        /*
 271         * Reject all unreceived messages, except on an active connection
 272         * (which disconnects locally & sends a 'FIN+' to peer)
 273         */
 274        while (sock->state != SS_DISCONNECTING) {
 275                buf = __skb_dequeue(&sk->sk_receive_queue);
 276                if (buf == NULL)
 277                        break;
 278                atomic_dec(&tipc_queue_size);
 279                if (TIPC_SKB_CB(buf)->handle != 0)
 280                        kfree_skb(buf);
 281                else {
 282                        if ((sock->state == SS_CONNECTING) ||
 283                            (sock->state == SS_CONNECTED)) {
 284                                sock->state = SS_DISCONNECTING;
 285                                tipc_disconnect(tport->ref);
 286                        }
 287                        tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
 288                }
 289        }
 290
 291        /*
 292         * Delete TIPC port; this ensures no more messages are queued
 293         * (also disconnects an active connection & sends a 'FIN-' to peer)
 294         */
 295        res = tipc_deleteport(tport->ref);
 296
 297        /* Discard any remaining (connection-based) messages in receive queue */
 298        discard_rx_queue(sk);
 299
 300        /* Reject any messages that accumulated in backlog queue */
 301        sock->state = SS_DISCONNECTING;
 302        release_sock(sk);
 303
 304        sock_put(sk);
 305        sock->sk = NULL;
 306
 307        return res;
 308}
 309
 310/**
 311 * bind - associate or disassocate TIPC name(s) with a socket
 312 * @sock: socket structure
 313 * @uaddr: socket address describing name(s) and desired operation
 314 * @uaddr_len: size of socket address data structure
 315 *
 316 * Name and name sequence binding is indicated using a positive scope value;
 317 * a negative scope value unbinds the specified name.  Specifying no name
 318 * (i.e. a socket address length of 0) unbinds all names from the socket.
 319 *
 320 * Returns 0 on success, errno otherwise
 321 *
 322 * NOTE: This routine doesn't need to take the socket lock since it doesn't
 323 *       access any non-constant socket information.
 324 */
 325static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
 326{
 327        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
 328        u32 portref = tipc_sk_port(sock->sk)->ref;
 329
 330        if (unlikely(!uaddr_len))
 331                return tipc_withdraw(portref, 0, NULL);
 332
 333        if (uaddr_len < sizeof(struct sockaddr_tipc))
 334                return -EINVAL;
 335        if (addr->family != AF_TIPC)
 336                return -EAFNOSUPPORT;
 337
 338        if (addr->addrtype == TIPC_ADDR_NAME)
 339                addr->addr.nameseq.upper = addr->addr.nameseq.lower;
 340        else if (addr->addrtype != TIPC_ADDR_NAMESEQ)
 341                return -EAFNOSUPPORT;
 342
 343        if (addr->addr.nameseq.type < TIPC_RESERVED_TYPES)
 344                return -EACCES;
 345
 346        return (addr->scope > 0) ?
 347                tipc_publish(portref, addr->scope, &addr->addr.nameseq) :
 348                tipc_withdraw(portref, -addr->scope, &addr->addr.nameseq);
 349}
 350
 351/**
 352 * get_name - get port ID of socket or peer socket
 353 * @sock: socket structure
 354 * @uaddr: area for returned socket address
 355 * @uaddr_len: area for returned length of socket address
 356 * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
 357 *
 358 * Returns 0 on success, errno otherwise
 359 *
 360 * NOTE: This routine doesn't need to take the socket lock since it only
 361 *       accesses socket information that is unchanging (or which changes in
 362 *       a completely predictable manner).
 363 */
 364static int get_name(struct socket *sock, struct sockaddr *uaddr,
 365                    int *uaddr_len, int peer)
 366{
 367        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
 368        struct tipc_sock *tsock = tipc_sk(sock->sk);
 369
 370        memset(addr, 0, sizeof(*addr));
 371        if (peer) {
 372                if ((sock->state != SS_CONNECTED) &&
 373                        ((peer != 2) || (sock->state != SS_DISCONNECTING)))
 374                        return -ENOTCONN;
 375                addr->addr.id.ref = tsock->peer_name.ref;
 376                addr->addr.id.node = tsock->peer_name.node;
 377        } else {
 378                addr->addr.id.ref = tsock->p->ref;
 379                addr->addr.id.node = tipc_own_addr;
 380        }
 381
 382        *uaddr_len = sizeof(*addr);
 383        addr->addrtype = TIPC_ADDR_ID;
 384        addr->family = AF_TIPC;
 385        addr->scope = 0;
 386        addr->addr.name.domain = 0;
 387
 388        return 0;
 389}
 390
 391/**
 392 * poll - read and possibly block on pollmask
 393 * @file: file structure associated with the socket
 394 * @sock: socket for which to calculate the poll bits
 395 * @wait: ???
 396 *
 397 * Returns pollmask value
 398 *
 399 * COMMENTARY:
 400 * It appears that the usual socket locking mechanisms are not useful here
 401 * since the pollmask info is potentially out-of-date the moment this routine
 402 * exits.  TCP and other protocols seem to rely on higher level poll routines
 403 * to handle any preventable race conditions, so TIPC will do the same ...
 404 *
 405 * TIPC sets the returned events as follows:
 406 *
 407 * socket state         flags set
 408 * ------------         ---------
 409 * unconnected          no read flags
 410 *                      no write flags
 411 *
 412 * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
 413 *                      no write flags
 414 *
 415 * connected            POLLIN/POLLRDNORM if data in rx queue
 416 *                      POLLOUT if port is not congested
 417 *
 418 * disconnecting        POLLIN/POLLRDNORM/POLLHUP
 419 *                      no write flags
 420 *
 421 * listening            POLLIN if SYN in rx queue
 422 *                      no write flags
 423 *
 424 * ready                POLLIN/POLLRDNORM if data in rx queue
 425 * [connectionless]     POLLOUT (since port cannot be congested)
 426 *
 427 * IMPORTANT: The fact that a read or write operation is indicated does NOT
 428 * imply that the operation will succeed, merely that it should be performed
 429 * and will not block.
 430 */
 431static unsigned int poll(struct file *file, struct socket *sock,
 432                         poll_table *wait)
 433{
 434        struct sock *sk = sock->sk;
 435        u32 mask = 0;
 436
 437        poll_wait(file, sk_sleep(sk), wait);
 438
 439        switch ((int)sock->state) {
 440        case SS_READY:
 441        case SS_CONNECTED:
 442                if (!tipc_sk_port(sk)->congested)
 443                        mask |= POLLOUT;
 444                /* fall thru' */
 445        case SS_CONNECTING:
 446        case SS_LISTENING:
 447                if (!skb_queue_empty(&sk->sk_receive_queue))
 448                        mask |= (POLLIN | POLLRDNORM);
 449                break;
 450        case SS_DISCONNECTING:
 451                mask = (POLLIN | POLLRDNORM | POLLHUP);
 452                break;
 453        }
 454
 455        return mask;
 456}
 457
 458/**
 459 * dest_name_check - verify user is permitted to send to specified port name
 460 * @dest: destination address
 461 * @m: descriptor for message to be sent
 462 *
 463 * Prevents restricted configuration commands from being issued by
 464 * unauthorized users.
 465 *
 466 * Returns 0 if permission is granted, otherwise errno
 467 */
 468static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
 469{
 470        struct tipc_cfg_msg_hdr hdr;
 471
 472        if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
 473                return 0;
 474        if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
 475                return 0;
 476        if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
 477                return -EACCES;
 478
 479        if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
 480                return -EMSGSIZE;
 481        if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
 482                return -EFAULT;
 483        if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
 484                return -EACCES;
 485
 486        return 0;
 487}
 488
 489/**
 490 * send_msg - send message in connectionless manner
 491 * @iocb: if NULL, indicates that socket lock is already held
 492 * @sock: socket structure
 493 * @m: message to send
 494 * @total_len: length of message
 495 *
 496 * Message must have an destination specified explicitly.
 497 * Used for SOCK_RDM and SOCK_DGRAM messages,
 498 * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
 499 * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
 500 *
 501 * Returns the number of bytes sent on success, or errno otherwise
 502 */
 503static int send_msg(struct kiocb *iocb, struct socket *sock,
 504                    struct msghdr *m, size_t total_len)
 505{
 506        struct sock *sk = sock->sk;
 507        struct tipc_port *tport = tipc_sk_port(sk);
 508        struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
 509        int needs_conn;
 510        long timeout_val;
 511        int res = -EINVAL;
 512
 513        if (unlikely(!dest))
 514                return -EDESTADDRREQ;
 515        if (unlikely((m->msg_namelen < sizeof(*dest)) ||
 516                     (dest->family != AF_TIPC)))
 517                return -EINVAL;
 518        if ((total_len > TIPC_MAX_USER_MSG_SIZE) ||
 519            (m->msg_iovlen > (unsigned int)INT_MAX))
 520                return -EMSGSIZE;
 521
 522        if (iocb)
 523                lock_sock(sk);
 524
 525        needs_conn = (sock->state != SS_READY);
 526        if (unlikely(needs_conn)) {
 527                if (sock->state == SS_LISTENING) {
 528                        res = -EPIPE;
 529                        goto exit;
 530                }
 531                if (sock->state != SS_UNCONNECTED) {
 532                        res = -EISCONN;
 533                        goto exit;
 534                }
 535                if ((tport->published) ||
 536                    ((sock->type == SOCK_STREAM) && (total_len != 0))) {
 537                        res = -EOPNOTSUPP;
 538                        goto exit;
 539                }
 540                if (dest->addrtype == TIPC_ADDR_NAME) {
 541                        tport->conn_type = dest->addr.name.name.type;
 542                        tport->conn_instance = dest->addr.name.name.instance;
 543                }
 544
 545                /* Abort any pending connection attempts (very unlikely) */
 546                reject_rx_queue(sk);
 547        }
 548
 549        timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
 550
 551        do {
 552                if (dest->addrtype == TIPC_ADDR_NAME) {
 553                        res = dest_name_check(dest, m);
 554                        if (res)
 555                                break;
 556                        res = tipc_send2name(tport->ref,
 557                                             &dest->addr.name.name,
 558                                             dest->addr.name.domain,
 559                                             m->msg_iovlen,
 560                                             m->msg_iov,
 561                                             total_len);
 562                } else if (dest->addrtype == TIPC_ADDR_ID) {
 563                        res = tipc_send2port(tport->ref,
 564                                             &dest->addr.id,
 565                                             m->msg_iovlen,
 566                                             m->msg_iov,
 567                                             total_len);
 568                } else if (dest->addrtype == TIPC_ADDR_MCAST) {
 569                        if (needs_conn) {
 570                                res = -EOPNOTSUPP;
 571                                break;
 572                        }
 573                        res = dest_name_check(dest, m);
 574                        if (res)
 575                                break;
 576                        res = tipc_multicast(tport->ref,
 577                                             &dest->addr.nameseq,
 578                                             m->msg_iovlen,
 579                                             m->msg_iov,
 580                                             total_len);
 581                }
 582                if (likely(res != -ELINKCONG)) {
 583                        if (needs_conn && (res >= 0))
 584                                sock->state = SS_CONNECTING;
 585                        break;
 586                }
 587                if (timeout_val <= 0L) {
 588                        res = timeout_val ? timeout_val : -EWOULDBLOCK;
 589                        break;
 590                }
 591                release_sock(sk);
 592                timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk),
 593                                               !tport->congested, timeout_val);
 594                lock_sock(sk);
 595        } while (1);
 596
 597exit:
 598        if (iocb)
 599                release_sock(sk);
 600        return res;
 601}
 602
 603/**
 604 * send_packet - send a connection-oriented message
 605 * @iocb: if NULL, indicates that socket lock is already held
 606 * @sock: socket structure
 607 * @m: message to send
 608 * @total_len: length of message
 609 *
 610 * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
 611 *
 612 * Returns the number of bytes sent on success, or errno otherwise
 613 */
 614static int send_packet(struct kiocb *iocb, struct socket *sock,
 615                       struct msghdr *m, size_t total_len)
 616{
 617        struct sock *sk = sock->sk;
 618        struct tipc_port *tport = tipc_sk_port(sk);
 619        struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
 620        long timeout_val;
 621        int res;
 622
 623        /* Handle implied connection establishment */
 624        if (unlikely(dest))
 625                return send_msg(iocb, sock, m, total_len);
 626
 627        if ((total_len > TIPC_MAX_USER_MSG_SIZE) ||
 628            (m->msg_iovlen > (unsigned int)INT_MAX))
 629                return -EMSGSIZE;
 630
 631        if (iocb)
 632                lock_sock(sk);
 633
 634        timeout_val = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
 635
 636        do {
 637                if (unlikely(sock->state != SS_CONNECTED)) {
 638                        if (sock->state == SS_DISCONNECTING)
 639                                res = -EPIPE;
 640                        else
 641                                res = -ENOTCONN;
 642                        break;
 643                }
 644
 645                res = tipc_send(tport->ref, m->msg_iovlen, m->msg_iov,
 646                                total_len);
 647                if (likely(res != -ELINKCONG))
 648                        break;
 649                if (timeout_val <= 0L) {
 650                        res = timeout_val ? timeout_val : -EWOULDBLOCK;
 651                        break;
 652                }
 653                release_sock(sk);
 654                timeout_val = wait_event_interruptible_timeout(*sk_sleep(sk),
 655                        (!tport->congested || !tport->connected), timeout_val);
 656                lock_sock(sk);
 657        } while (1);
 658
 659        if (iocb)
 660                release_sock(sk);
 661        return res;
 662}
 663
 664/**
 665 * send_stream - send stream-oriented data
 666 * @iocb: (unused)
 667 * @sock: socket structure
 668 * @m: data to send
 669 * @total_len: total length of data to be sent
 670 *
 671 * Used for SOCK_STREAM data.
 672 *
 673 * Returns the number of bytes sent on success (or partial success),
 674 * or errno if no data sent
 675 */
 676static int send_stream(struct kiocb *iocb, struct socket *sock,
 677                       struct msghdr *m, size_t total_len)
 678{
 679        struct sock *sk = sock->sk;
 680        struct tipc_port *tport = tipc_sk_port(sk);
 681        struct msghdr my_msg;
 682        struct iovec my_iov;
 683        struct iovec *curr_iov;
 684        int curr_iovlen;
 685        char __user *curr_start;
 686        u32 hdr_size;
 687        int curr_left;
 688        int bytes_to_send;
 689        int bytes_sent;
 690        int res;
 691
 692        lock_sock(sk);
 693
 694        /* Handle special cases where there is no connection */
 695        if (unlikely(sock->state != SS_CONNECTED)) {
 696                if (sock->state == SS_UNCONNECTED) {
 697                        res = send_packet(NULL, sock, m, total_len);
 698                        goto exit;
 699                } else if (sock->state == SS_DISCONNECTING) {
 700                        res = -EPIPE;
 701                        goto exit;
 702                } else {
 703                        res = -ENOTCONN;
 704                        goto exit;
 705                }
 706        }
 707
 708        if (unlikely(m->msg_name)) {
 709                res = -EISCONN;
 710                goto exit;
 711        }
 712
 713        if ((total_len > (unsigned int)INT_MAX) ||
 714            (m->msg_iovlen > (unsigned int)INT_MAX)) {
 715                res = -EMSGSIZE;
 716                goto exit;
 717        }
 718
 719        /*
 720         * Send each iovec entry using one or more messages
 721         *
 722         * Note: This algorithm is good for the most likely case
 723         * (i.e. one large iovec entry), but could be improved to pass sets
 724         * of small iovec entries into send_packet().
 725         */
 726        curr_iov = m->msg_iov;
 727        curr_iovlen = m->msg_iovlen;
 728        my_msg.msg_iov = &my_iov;
 729        my_msg.msg_iovlen = 1;
 730        my_msg.msg_flags = m->msg_flags;
 731        my_msg.msg_name = NULL;
 732        bytes_sent = 0;
 733
 734        hdr_size = msg_hdr_sz(&tport->phdr);
 735
 736        while (curr_iovlen--) {
 737                curr_start = curr_iov->iov_base;
 738                curr_left = curr_iov->iov_len;
 739
 740                while (curr_left) {
 741                        bytes_to_send = tport->max_pkt - hdr_size;
 742                        if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
 743                                bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
 744                        if (curr_left < bytes_to_send)
 745                                bytes_to_send = curr_left;
 746                        my_iov.iov_base = curr_start;
 747                        my_iov.iov_len = bytes_to_send;
 748                        res = send_packet(NULL, sock, &my_msg, bytes_to_send);
 749                        if (res < 0) {
 750                                if (bytes_sent)
 751                                        res = bytes_sent;
 752                                goto exit;
 753                        }
 754                        curr_left -= bytes_to_send;
 755                        curr_start += bytes_to_send;
 756                        bytes_sent += bytes_to_send;
 757                }
 758
 759                curr_iov++;
 760        }
 761        res = bytes_sent;
 762exit:
 763        release_sock(sk);
 764        return res;
 765}
 766
 767/**
 768 * auto_connect - complete connection setup to a remote port
 769 * @sock: socket structure
 770 * @msg: peer's response message
 771 *
 772 * Returns 0 on success, errno otherwise
 773 */
 774static int auto_connect(struct socket *sock, struct tipc_msg *msg)
 775{
 776        struct tipc_sock *tsock = tipc_sk(sock->sk);
 777
 778        if (msg_errcode(msg)) {
 779                sock->state = SS_DISCONNECTING;
 780                return -ECONNREFUSED;
 781        }
 782
 783        tsock->peer_name.ref = msg_origport(msg);
 784        tsock->peer_name.node = msg_orignode(msg);
 785        tipc_connect2port(tsock->p->ref, &tsock->peer_name);
 786        tipc_set_portimportance(tsock->p->ref, msg_importance(msg));
 787        sock->state = SS_CONNECTED;
 788        return 0;
 789}
 790
 791/**
 792 * set_orig_addr - capture sender's address for received message
 793 * @m: descriptor for message info
 794 * @msg: received message header
 795 *
 796 * Note: Address is not captured if not requested by receiver.
 797 */
 798static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
 799{
 800        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)m->msg_name;
 801
 802        if (addr) {
 803                addr->family = AF_TIPC;
 804                addr->addrtype = TIPC_ADDR_ID;
 805                addr->addr.id.ref = msg_origport(msg);
 806                addr->addr.id.node = msg_orignode(msg);
 807                addr->addr.name.domain = 0;     /* could leave uninitialized */
 808                addr->scope = 0;                /* could leave uninitialized */
 809                m->msg_namelen = sizeof(struct sockaddr_tipc);
 810        }
 811}
 812
 813/**
 814 * anc_data_recv - optionally capture ancillary data for received message
 815 * @m: descriptor for message info
 816 * @msg: received message header
 817 * @tport: TIPC port associated with message
 818 *
 819 * Note: Ancillary data is not captured if not requested by receiver.
 820 *
 821 * Returns 0 if successful, otherwise errno
 822 */
 823static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
 824                                struct tipc_port *tport)
 825{
 826        u32 anc_data[3];
 827        u32 err;
 828        u32 dest_type;
 829        int has_name;
 830        int res;
 831
 832        if (likely(m->msg_controllen == 0))
 833                return 0;
 834
 835        /* Optionally capture errored message object(s) */
 836        err = msg ? msg_errcode(msg) : 0;
 837        if (unlikely(err)) {
 838                anc_data[0] = err;
 839                anc_data[1] = msg_data_sz(msg);
 840                res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
 841                if (res)
 842                        return res;
 843                if (anc_data[1]) {
 844                        res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
 845                                       msg_data(msg));
 846                        if (res)
 847                                return res;
 848                }
 849        }
 850
 851        /* Optionally capture message destination object */
 852        dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
 853        switch (dest_type) {
 854        case TIPC_NAMED_MSG:
 855                has_name = 1;
 856                anc_data[0] = msg_nametype(msg);
 857                anc_data[1] = msg_namelower(msg);
 858                anc_data[2] = msg_namelower(msg);
 859                break;
 860        case TIPC_MCAST_MSG:
 861                has_name = 1;
 862                anc_data[0] = msg_nametype(msg);
 863                anc_data[1] = msg_namelower(msg);
 864                anc_data[2] = msg_nameupper(msg);
 865                break;
 866        case TIPC_CONN_MSG:
 867                has_name = (tport->conn_type != 0);
 868                anc_data[0] = tport->conn_type;
 869                anc_data[1] = tport->conn_instance;
 870                anc_data[2] = tport->conn_instance;
 871                break;
 872        default:
 873                has_name = 0;
 874        }
 875        if (has_name) {
 876                res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
 877                if (res)
 878                        return res;
 879        }
 880
 881        return 0;
 882}
 883
 884/**
 885 * recv_msg - receive packet-oriented message
 886 * @iocb: (unused)
 887 * @m: descriptor for message info
 888 * @buf_len: total size of user buffer area
 889 * @flags: receive flags
 890 *
 891 * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
 892 * If the complete message doesn't fit in user area, truncate it.
 893 *
 894 * Returns size of returned message data, errno otherwise
 895 */
 896static int recv_msg(struct kiocb *iocb, struct socket *sock,
 897                    struct msghdr *m, size_t buf_len, int flags)
 898{
 899        struct sock *sk = sock->sk;
 900        struct tipc_port *tport = tipc_sk_port(sk);
 901        struct sk_buff *buf;
 902        struct tipc_msg *msg;
 903        long timeout;
 904        unsigned int sz;
 905        u32 err;
 906        int res;
 907
 908        /* Catch invalid receive requests */
 909        if (unlikely(!buf_len))
 910                return -EINVAL;
 911
 912        lock_sock(sk);
 913
 914        if (unlikely(sock->state == SS_UNCONNECTED)) {
 915                res = -ENOTCONN;
 916                goto exit;
 917        }
 918
 919        timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
 920restart:
 921
 922        /* Look for a message in receive queue; wait if necessary */
 923        while (skb_queue_empty(&sk->sk_receive_queue)) {
 924                if (sock->state == SS_DISCONNECTING) {
 925                        res = -ENOTCONN;
 926                        goto exit;
 927                }
 928                if (timeout <= 0L) {
 929                        res = timeout ? timeout : -EWOULDBLOCK;
 930                        goto exit;
 931                }
 932                release_sock(sk);
 933                timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
 934                                                           tipc_rx_ready(sock),
 935                                                           timeout);
 936                lock_sock(sk);
 937        }
 938
 939        /* Look at first message in receive queue */
 940        buf = skb_peek(&sk->sk_receive_queue);
 941        msg = buf_msg(buf);
 942        sz = msg_data_sz(msg);
 943        err = msg_errcode(msg);
 944
 945        /* Complete connection setup for an implied connect */
 946        if (unlikely(sock->state == SS_CONNECTING)) {
 947                res = auto_connect(sock, msg);
 948                if (res)
 949                        goto exit;
 950        }
 951
 952        /* Discard an empty non-errored message & try again */
 953        if ((!sz) && (!err)) {
 954                advance_rx_queue(sk);
 955                goto restart;
 956        }
 957
 958        /* Capture sender's address (optional) */
 959        set_orig_addr(m, msg);
 960
 961        /* Capture ancillary data (optional) */
 962        res = anc_data_recv(m, msg, tport);
 963        if (res)
 964                goto exit;
 965
 966        /* Capture message data (if valid) & compute return value (always) */
 967        if (!err) {
 968                if (unlikely(buf_len < sz)) {
 969                        sz = buf_len;
 970                        m->msg_flags |= MSG_TRUNC;
 971                }
 972                res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg),
 973                                              m->msg_iov, sz);
 974                if (res)
 975                        goto exit;
 976                res = sz;
 977        } else {
 978                if ((sock->state == SS_READY) ||
 979                    ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
 980                        res = 0;
 981                else
 982                        res = -ECONNRESET;
 983        }
 984
 985        /* Consume received message (optional) */
 986        if (likely(!(flags & MSG_PEEK))) {
 987                if ((sock->state != SS_READY) &&
 988                    (++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
 989                        tipc_acknowledge(tport->ref, tport->conn_unacked);
 990                advance_rx_queue(sk);
 991        }
 992exit:
 993        release_sock(sk);
 994        return res;
 995}
 996
 997/**
 998 * recv_stream - receive stream-oriented data
 999 * @iocb: (unused)
1000 * @m: descriptor for message info
1001 * @buf_len: total size of user buffer area
1002 * @flags: receive flags
1003 *
1004 * Used for SOCK_STREAM messages only.  If not enough data is available
1005 * will optionally wait for more; never truncates data.
1006 *
1007 * Returns size of returned message data, errno otherwise
1008 */
1009static int recv_stream(struct kiocb *iocb, struct socket *sock,
1010                       struct msghdr *m, size_t buf_len, int flags)
1011{
1012        struct sock *sk = sock->sk;
1013        struct tipc_port *tport = tipc_sk_port(sk);
1014        struct sk_buff *buf;
1015        struct tipc_msg *msg;
1016        long timeout;
1017        unsigned int sz;
1018        int sz_to_copy, target, needed;
1019        int sz_copied = 0;
1020        u32 err;
1021        int res = 0;
1022
1023        /* Catch invalid receive attempts */
1024        if (unlikely(!buf_len))
1025                return -EINVAL;
1026
1027        lock_sock(sk);
1028
1029        if (unlikely((sock->state == SS_UNCONNECTED) ||
1030                     (sock->state == SS_CONNECTING))) {
1031                res = -ENOTCONN;
1032                goto exit;
1033        }
1034
1035        target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1036        timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1037
1038restart:
1039        /* Look for a message in receive queue; wait if necessary */
1040        while (skb_queue_empty(&sk->sk_receive_queue)) {
1041                if (sock->state == SS_DISCONNECTING) {
1042                        res = -ENOTCONN;
1043                        goto exit;
1044                }
1045                if (timeout <= 0L) {
1046                        res = timeout ? timeout : -EWOULDBLOCK;
1047                        goto exit;
1048                }
1049                release_sock(sk);
1050                timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
1051                                                           tipc_rx_ready(sock),
1052                                                           timeout);
1053                lock_sock(sk);
1054        }
1055
1056        /* Look at first message in receive queue */
1057        buf = skb_peek(&sk->sk_receive_queue);
1058        msg = buf_msg(buf);
1059        sz = msg_data_sz(msg);
1060        err = msg_errcode(msg);
1061
1062        /* Discard an empty non-errored message & try again */
1063        if ((!sz) && (!err)) {
1064                advance_rx_queue(sk);
1065                goto restart;
1066        }
1067
1068        /* Optionally capture sender's address & ancillary data of first msg */
1069        if (sz_copied == 0) {
1070                set_orig_addr(m, msg);
1071                res = anc_data_recv(m, msg, tport);
1072                if (res)
1073                        goto exit;
1074        }
1075
1076        /* Capture message data (if valid) & compute return value (always) */
1077        if (!err) {
1078                u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1079
1080                sz -= offset;
1081                needed = (buf_len - sz_copied);
1082                sz_to_copy = (sz <= needed) ? sz : needed;
1083
1084                res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset,
1085                                              m->msg_iov, sz_to_copy);
1086                if (res)
1087                        goto exit;
1088
1089                sz_copied += sz_to_copy;
1090
1091                if (sz_to_copy < sz) {
1092                        if (!(flags & MSG_PEEK))
1093                                TIPC_SKB_CB(buf)->handle =
1094                                (void *)(unsigned long)(offset + sz_to_copy);
1095                        goto exit;
1096                }
1097        } else {
1098                if (sz_copied != 0)
1099                        goto exit; /* can't add error msg to valid data */
1100
1101                if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1102                        res = 0;
1103                else
1104                        res = -ECONNRESET;
1105        }
1106
1107        /* Consume received message (optional) */
1108        if (likely(!(flags & MSG_PEEK))) {
1109                if (unlikely(++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
1110                        tipc_acknowledge(tport->ref, tport->conn_unacked);
1111                advance_rx_queue(sk);
1112        }
1113
1114        /* Loop around if more data is required */
1115        if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1116            (!skb_queue_empty(&sk->sk_receive_queue) ||
1117            (sz_copied < target)) &&    /* and more is ready or required */
1118            (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1119            (!err))                     /* and haven't reached a FIN */
1120                goto restart;
1121
1122exit:
1123        release_sock(sk);
1124        return sz_copied ? sz_copied : res;
1125}
1126
1127/**
1128 * rx_queue_full - determine if receive queue can accept another message
1129 * @msg: message to be added to queue
1130 * @queue_size: current size of queue
1131 * @base: nominal maximum size of queue
1132 *
1133 * Returns 1 if queue is unable to accept message, 0 otherwise
1134 */
1135static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base)
1136{
1137        u32 threshold;
1138        u32 imp = msg_importance(msg);
1139
1140        if (imp == TIPC_LOW_IMPORTANCE)
1141                threshold = base;
1142        else if (imp == TIPC_MEDIUM_IMPORTANCE)
1143                threshold = base * 2;
1144        else if (imp == TIPC_HIGH_IMPORTANCE)
1145                threshold = base * 100;
1146        else
1147                return 0;
1148
1149        if (msg_connected(msg))
1150                threshold *= 4;
1151
1152        return queue_size >= threshold;
1153}
1154
1155/**
1156 * filter_rcv - validate incoming message
1157 * @sk: socket
1158 * @buf: message
1159 *
1160 * Enqueues message on receive queue if acceptable; optionally handles
1161 * disconnect indication for a connected socket.
1162 *
1163 * Called with socket lock already taken; port lock may also be taken.
1164 *
1165 * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1166 */
1167static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1168{
1169        struct socket *sock = sk->sk_socket;
1170        struct tipc_msg *msg = buf_msg(buf);
1171        u32 recv_q_len;
1172
1173        /* Reject message if it is wrong sort of message for socket */
1174        if (msg_type(msg) > TIPC_DIRECT_MSG)
1175                return TIPC_ERR_NO_PORT;
1176
1177        if (sock->state == SS_READY) {
1178                if (msg_connected(msg))
1179                        return TIPC_ERR_NO_PORT;
1180        } else {
1181                if (msg_mcast(msg))
1182                        return TIPC_ERR_NO_PORT;
1183                if (sock->state == SS_CONNECTED) {
1184                        if (!msg_connected(msg) ||
1185                            !tipc_port_peer_msg(tipc_sk_port(sk), msg))
1186                                return TIPC_ERR_NO_PORT;
1187                } else if (sock->state == SS_CONNECTING) {
1188                        if (!msg_connected(msg) && (msg_errcode(msg) == 0))
1189                                return TIPC_ERR_NO_PORT;
1190                } else if (sock->state == SS_LISTENING) {
1191                        if (msg_connected(msg) || msg_errcode(msg))
1192                                return TIPC_ERR_NO_PORT;
1193                } else if (sock->state == SS_DISCONNECTING) {
1194                        return TIPC_ERR_NO_PORT;
1195                } else /* (sock->state == SS_UNCONNECTED) */ {
1196                        if (msg_connected(msg) || msg_errcode(msg))
1197                                return TIPC_ERR_NO_PORT;
1198                }
1199        }
1200
1201        /* Reject message if there isn't room to queue it */
1202        recv_q_len = (u32)atomic_read(&tipc_queue_size);
1203        if (unlikely(recv_q_len >= OVERLOAD_LIMIT_BASE)) {
1204                if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE))
1205                        return TIPC_ERR_OVERLOAD;
1206        }
1207        recv_q_len = skb_queue_len(&sk->sk_receive_queue);
1208        if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) {
1209                if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2))
1210                        return TIPC_ERR_OVERLOAD;
1211        }
1212
1213        /* Enqueue message (finally!) */
1214        TIPC_SKB_CB(buf)->handle = 0;
1215        atomic_inc(&tipc_queue_size);
1216        __skb_queue_tail(&sk->sk_receive_queue, buf);
1217
1218        /* Initiate connection termination for an incoming 'FIN' */
1219        if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) {
1220                sock->state = SS_DISCONNECTING;
1221                tipc_disconnect_port(tipc_sk_port(sk));
1222        }
1223
1224        if (waitqueue_active(sk_sleep(sk)))
1225                wake_up_interruptible(sk_sleep(sk));
1226        return TIPC_OK;
1227}
1228
1229/**
1230 * backlog_rcv - handle incoming message from backlog queue
1231 * @sk: socket
1232 * @buf: message
1233 *
1234 * Caller must hold socket lock, but not port lock.
1235 *
1236 * Returns 0
1237 */
1238static int backlog_rcv(struct sock *sk, struct sk_buff *buf)
1239{
1240        u32 res;
1241
1242        res = filter_rcv(sk, buf);
1243        if (res)
1244                tipc_reject_msg(buf, res);
1245        return 0;
1246}
1247
1248/**
1249 * dispatch - handle incoming message
1250 * @tport: TIPC port that received message
1251 * @buf: message
1252 *
1253 * Called with port lock already taken.
1254 *
1255 * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1256 */
1257static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
1258{
1259        struct sock *sk = (struct sock *)tport->usr_handle;
1260        u32 res;
1261
1262        /*
1263         * Process message if socket is unlocked; otherwise add to backlog queue
1264         *
1265         * This code is based on sk_receive_skb(), but must be distinct from it
1266         * since a TIPC-specific filter/reject mechanism is utilized
1267         */
1268        bh_lock_sock(sk);
1269        if (!sock_owned_by_user(sk)) {
1270                res = filter_rcv(sk, buf);
1271        } else {
1272                if (sk_add_backlog(sk, buf, sk->sk_rcvbuf))
1273                        res = TIPC_ERR_OVERLOAD;
1274                else
1275                        res = TIPC_OK;
1276        }
1277        bh_unlock_sock(sk);
1278
1279        return res;
1280}
1281
1282/**
1283 * wakeupdispatch - wake up port after congestion
1284 * @tport: port to wakeup
1285 *
1286 * Called with port lock already taken.
1287 */
1288static void wakeupdispatch(struct tipc_port *tport)
1289{
1290        struct sock *sk = (struct sock *)tport->usr_handle;
1291
1292        if (waitqueue_active(sk_sleep(sk)))
1293                wake_up_interruptible(sk_sleep(sk));
1294}
1295
1296/**
1297 * connect - establish a connection to another TIPC port
1298 * @sock: socket structure
1299 * @dest: socket address for destination port
1300 * @destlen: size of socket address data structure
1301 * @flags: file-related flags associated with socket
1302 *
1303 * Returns 0 on success, errno otherwise
1304 */
1305static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1306                   int flags)
1307{
1308        struct sock *sk = sock->sk;
1309        struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1310        struct msghdr m = {NULL,};
1311        struct sk_buff *buf;
1312        struct tipc_msg *msg;
1313        unsigned int timeout;
1314        int res;
1315
1316        lock_sock(sk);
1317
1318        /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1319        if (sock->state == SS_READY) {
1320                res = -EOPNOTSUPP;
1321                goto exit;
1322        }
1323
1324        /* For now, TIPC does not support the non-blocking form of connect() */
1325        if (flags & O_NONBLOCK) {
1326                res = -EOPNOTSUPP;
1327                goto exit;
1328        }
1329
1330        /* Issue Posix-compliant error code if socket is in the wrong state */
1331        if (sock->state == SS_LISTENING) {
1332                res = -EOPNOTSUPP;
1333                goto exit;
1334        }
1335        if (sock->state == SS_CONNECTING) {
1336                res = -EALREADY;
1337                goto exit;
1338        }
1339        if (sock->state != SS_UNCONNECTED) {
1340                res = -EISCONN;
1341                goto exit;
1342        }
1343
1344        /*
1345         * Reject connection attempt using multicast address
1346         *
1347         * Note: send_msg() validates the rest of the address fields,
1348         *       so there's no need to do it here
1349         */
1350        if (dst->addrtype == TIPC_ADDR_MCAST) {
1351                res = -EINVAL;
1352                goto exit;
1353        }
1354
1355        /* Reject any messages already in receive queue (very unlikely) */
1356        reject_rx_queue(sk);
1357
1358        /* Send a 'SYN-' to destination */
1359        m.msg_name = dest;
1360        m.msg_namelen = destlen;
1361        res = send_msg(NULL, sock, &m, 0);
1362        if (res < 0)
1363                goto exit;
1364
1365        /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1366        timeout = tipc_sk(sk)->conn_timeout;
1367        release_sock(sk);
1368        res = wait_event_interruptible_timeout(*sk_sleep(sk),
1369                        (!skb_queue_empty(&sk->sk_receive_queue) ||
1370                        (sock->state != SS_CONNECTING)),
1371                        timeout ? (long)msecs_to_jiffies(timeout)
1372                                : MAX_SCHEDULE_TIMEOUT);
1373        lock_sock(sk);
1374
1375        if (res > 0) {
1376                buf = skb_peek(&sk->sk_receive_queue);
1377                if (buf != NULL) {
1378                        msg = buf_msg(buf);
1379                        res = auto_connect(sock, msg);
1380                        if (!res) {
1381                                if (!msg_data_sz(msg))
1382                                        advance_rx_queue(sk);
1383                        }
1384                } else {
1385                        if (sock->state == SS_CONNECTED)
1386                                res = -EISCONN;
1387                        else
1388                                res = -ECONNREFUSED;
1389                }
1390        } else {
1391                if (res == 0)
1392                        res = -ETIMEDOUT;
1393                else
1394                        ; /* leave "res" unchanged */
1395                sock->state = SS_DISCONNECTING;
1396        }
1397
1398exit:
1399        release_sock(sk);
1400        return res;
1401}
1402
1403/**
1404 * listen - allow socket to listen for incoming connections
1405 * @sock: socket structure
1406 * @len: (unused)
1407 *
1408 * Returns 0 on success, errno otherwise
1409 */
1410static int listen(struct socket *sock, int len)
1411{
1412        struct sock *sk = sock->sk;
1413        int res;
1414
1415        lock_sock(sk);
1416
1417        if (sock->state != SS_UNCONNECTED)
1418                res = -EINVAL;
1419        else {
1420                sock->state = SS_LISTENING;
1421                res = 0;
1422        }
1423
1424        release_sock(sk);
1425        return res;
1426}
1427
1428/**
1429 * accept - wait for connection request
1430 * @sock: listening socket
1431 * @newsock: new socket that is to be connected
1432 * @flags: file-related flags associated with socket
1433 *
1434 * Returns 0 on success, errno otherwise
1435 */
1436static int accept(struct socket *sock, struct socket *new_sock, int flags)
1437{
1438        struct sock *sk = sock->sk;
1439        struct sk_buff *buf;
1440        int res;
1441
1442        lock_sock(sk);
1443
1444        if (sock->state != SS_LISTENING) {
1445                res = -EINVAL;
1446                goto exit;
1447        }
1448
1449        while (skb_queue_empty(&sk->sk_receive_queue)) {
1450                if (flags & O_NONBLOCK) {
1451                        res = -EWOULDBLOCK;
1452                        goto exit;
1453                }
1454                release_sock(sk);
1455                res = wait_event_interruptible(*sk_sleep(sk),
1456                                (!skb_queue_empty(&sk->sk_receive_queue)));
1457                lock_sock(sk);
1458                if (res)
1459                        goto exit;
1460        }
1461
1462        buf = skb_peek(&sk->sk_receive_queue);
1463
1464        res = tipc_create(sock_net(sock->sk), new_sock, 0, 0);
1465        if (!res) {
1466                struct sock *new_sk = new_sock->sk;
1467                struct tipc_sock *new_tsock = tipc_sk(new_sk);
1468                struct tipc_port *new_tport = new_tsock->p;
1469                u32 new_ref = new_tport->ref;
1470                struct tipc_msg *msg = buf_msg(buf);
1471
1472                lock_sock(new_sk);
1473
1474                /*
1475                 * Reject any stray messages received by new socket
1476                 * before the socket lock was taken (very, very unlikely)
1477                 */
1478                reject_rx_queue(new_sk);
1479
1480                /* Connect new socket to it's peer */
1481                new_tsock->peer_name.ref = msg_origport(msg);
1482                new_tsock->peer_name.node = msg_orignode(msg);
1483                tipc_connect2port(new_ref, &new_tsock->peer_name);
1484                new_sock->state = SS_CONNECTED;
1485
1486                tipc_set_portimportance(new_ref, msg_importance(msg));
1487                if (msg_named(msg)) {
1488                        new_tport->conn_type = msg_nametype(msg);
1489                        new_tport->conn_instance = msg_nameinst(msg);
1490                }
1491
1492                /*
1493                 * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1494                 * Respond to 'SYN+' by queuing it on new socket.
1495                 */
1496                if (!msg_data_sz(msg)) {
1497                        struct msghdr m = {NULL,};
1498
1499                        advance_rx_queue(sk);
1500                        send_packet(NULL, new_sock, &m, 0);
1501                } else {
1502                        __skb_dequeue(&sk->sk_receive_queue);
1503                        __skb_queue_head(&new_sk->sk_receive_queue, buf);
1504                }
1505                release_sock(new_sk);
1506        }
1507exit:
1508        release_sock(sk);
1509        return res;
1510}
1511
1512/**
1513 * shutdown - shutdown socket connection
1514 * @sock: socket structure
1515 * @how: direction to close (must be SHUT_RDWR)
1516 *
1517 * Terminates connection (if necessary), then purges socket's receive queue.
1518 *
1519 * Returns 0 on success, errno otherwise
1520 */
1521static int shutdown(struct socket *sock, int how)
1522{
1523        struct sock *sk = sock->sk;
1524        struct tipc_port *tport = tipc_sk_port(sk);
1525        struct sk_buff *buf;
1526        int res;
1527
1528        if (how != SHUT_RDWR)
1529                return -EINVAL;
1530
1531        lock_sock(sk);
1532
1533        switch (sock->state) {
1534        case SS_CONNECTING:
1535        case SS_CONNECTED:
1536
1537restart:
1538                /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1539                buf = __skb_dequeue(&sk->sk_receive_queue);
1540                if (buf) {
1541                        atomic_dec(&tipc_queue_size);
1542                        if (TIPC_SKB_CB(buf)->handle != 0) {
1543                                kfree_skb(buf);
1544                                goto restart;
1545                        }
1546                        tipc_disconnect(tport->ref);
1547                        tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
1548                } else {
1549                        tipc_shutdown(tport->ref);
1550                }
1551
1552                sock->state = SS_DISCONNECTING;
1553
1554                /* fall through */
1555
1556        case SS_DISCONNECTING:
1557
1558                /* Discard any unreceived messages; wake up sleeping tasks */
1559                discard_rx_queue(sk);
1560                if (waitqueue_active(sk_sleep(sk)))
1561                        wake_up_interruptible(sk_sleep(sk));
1562                res = 0;
1563                break;
1564
1565        default:
1566                res = -ENOTCONN;
1567        }
1568
1569        release_sock(sk);
1570        return res;
1571}
1572
1573/**
1574 * setsockopt - set socket option
1575 * @sock: socket structure
1576 * @lvl: option level
1577 * @opt: option identifier
1578 * @ov: pointer to new option value
1579 * @ol: length of option value
1580 *
1581 * For stream sockets only, accepts and ignores all IPPROTO_TCP options
1582 * (to ease compatibility).
1583 *
1584 * Returns 0 on success, errno otherwise
1585 */
1586static int setsockopt(struct socket *sock,
1587                      int lvl, int opt, char __user *ov, unsigned int ol)
1588{
1589        struct sock *sk = sock->sk;
1590        struct tipc_port *tport = tipc_sk_port(sk);
1591        u32 value;
1592        int res;
1593
1594        if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1595                return 0;
1596        if (lvl != SOL_TIPC)
1597                return -ENOPROTOOPT;
1598        if (ol < sizeof(value))
1599                return -EINVAL;
1600        res = get_user(value, (u32 __user *)ov);
1601        if (res)
1602                return res;
1603
1604        lock_sock(sk);
1605
1606        switch (opt) {
1607        case TIPC_IMPORTANCE:
1608                res = tipc_set_portimportance(tport->ref, value);
1609                break;
1610        case TIPC_SRC_DROPPABLE:
1611                if (sock->type != SOCK_STREAM)
1612                        res = tipc_set_portunreliable(tport->ref, value);
1613                else
1614                        res = -ENOPROTOOPT;
1615                break;
1616        case TIPC_DEST_DROPPABLE:
1617                res = tipc_set_portunreturnable(tport->ref, value);
1618                break;
1619        case TIPC_CONN_TIMEOUT:
1620                tipc_sk(sk)->conn_timeout = value;
1621                /* no need to set "res", since already 0 at this point */
1622                break;
1623        default:
1624                res = -EINVAL;
1625        }
1626
1627        release_sock(sk);
1628
1629        return res;
1630}
1631
1632/**
1633 * getsockopt - get socket option
1634 * @sock: socket structure
1635 * @lvl: option level
1636 * @opt: option identifier
1637 * @ov: receptacle for option value
1638 * @ol: receptacle for length of option value
1639 *
1640 * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
1641 * (to ease compatibility).
1642 *
1643 * Returns 0 on success, errno otherwise
1644 */
1645static int getsockopt(struct socket *sock,
1646                      int lvl, int opt, char __user *ov, int __user *ol)
1647{
1648        struct sock *sk = sock->sk;
1649        struct tipc_port *tport = tipc_sk_port(sk);
1650        int len;
1651        u32 value;
1652        int res;
1653
1654        if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1655                return put_user(0, ol);
1656        if (lvl != SOL_TIPC)
1657                return -ENOPROTOOPT;
1658        res = get_user(len, ol);
1659        if (res)
1660                return res;
1661
1662        lock_sock(sk);
1663
1664        switch (opt) {
1665        case TIPC_IMPORTANCE:
1666                res = tipc_portimportance(tport->ref, &value);
1667                break;
1668        case TIPC_SRC_DROPPABLE:
1669                res = tipc_portunreliable(tport->ref, &value);
1670                break;
1671        case TIPC_DEST_DROPPABLE:
1672                res = tipc_portunreturnable(tport->ref, &value);
1673                break;
1674        case TIPC_CONN_TIMEOUT:
1675                value = tipc_sk(sk)->conn_timeout;
1676                /* no need to set "res", since already 0 at this point */
1677                break;
1678        case TIPC_NODE_RECVQ_DEPTH:
1679                value = (u32)atomic_read(&tipc_queue_size);
1680                break;
1681        case TIPC_SOCK_RECVQ_DEPTH:
1682                value = skb_queue_len(&sk->sk_receive_queue);
1683                break;
1684        default:
1685                res = -EINVAL;
1686        }
1687
1688        release_sock(sk);
1689
1690        if (res)
1691                return res;     /* "get" failed */
1692
1693        if (len < sizeof(value))
1694                return -EINVAL;
1695
1696        if (copy_to_user(ov, &value, sizeof(value)))
1697                return -EFAULT;
1698
1699        return put_user(sizeof(value), ol);
1700}
1701
1702/* Protocol switches for the various types of TIPC sockets */
1703
1704static const struct proto_ops msg_ops = {
1705        .owner          = THIS_MODULE,
1706        .family         = AF_TIPC,
1707        .release        = release,
1708        .bind           = bind,
1709        .connect        = connect,
1710        .socketpair     = sock_no_socketpair,
1711        .accept         = sock_no_accept,
1712        .getname        = get_name,
1713        .poll           = poll,
1714        .ioctl          = sock_no_ioctl,
1715        .listen         = sock_no_listen,
1716        .shutdown       = shutdown,
1717        .setsockopt     = setsockopt,
1718        .getsockopt     = getsockopt,
1719        .sendmsg        = send_msg,
1720        .recvmsg        = recv_msg,
1721        .mmap           = sock_no_mmap,
1722        .sendpage       = sock_no_sendpage
1723};
1724
1725static const struct proto_ops packet_ops = {
1726        .owner          = THIS_MODULE,
1727        .family         = AF_TIPC,
1728        .release        = release,
1729        .bind           = bind,
1730        .connect        = connect,
1731        .socketpair     = sock_no_socketpair,
1732        .accept         = accept,
1733        .getname        = get_name,
1734        .poll           = poll,
1735        .ioctl          = sock_no_ioctl,
1736        .listen         = listen,
1737        .shutdown       = shutdown,
1738        .setsockopt     = setsockopt,
1739        .getsockopt     = getsockopt,
1740        .sendmsg        = send_packet,
1741        .recvmsg        = recv_msg,
1742        .mmap           = sock_no_mmap,
1743        .sendpage       = sock_no_sendpage
1744};
1745
1746static const struct proto_ops stream_ops = {
1747        .owner          = THIS_MODULE,
1748        .family         = AF_TIPC,
1749        .release        = release,
1750        .bind           = bind,
1751        .connect        = connect,
1752        .socketpair     = sock_no_socketpair,
1753        .accept         = accept,
1754        .getname        = get_name,
1755        .poll           = poll,
1756        .ioctl          = sock_no_ioctl,
1757        .listen         = listen,
1758        .shutdown       = shutdown,
1759        .setsockopt     = setsockopt,
1760        .getsockopt     = getsockopt,
1761        .sendmsg        = send_stream,
1762        .recvmsg        = recv_stream,
1763        .mmap           = sock_no_mmap,
1764        .sendpage       = sock_no_sendpage
1765};
1766
1767static const struct net_proto_family tipc_family_ops = {
1768        .owner          = THIS_MODULE,
1769        .family         = AF_TIPC,
1770        .create         = tipc_create
1771};
1772
1773static struct proto tipc_proto = {
1774        .name           = "TIPC",
1775        .owner          = THIS_MODULE,
1776        .obj_size       = sizeof(struct tipc_sock)
1777};
1778
1779/**
1780 * tipc_socket_init - initialize TIPC socket interface
1781 *
1782 * Returns 0 on success, errno otherwise
1783 */
1784int tipc_socket_init(void)
1785{
1786        int res;
1787
1788        res = proto_register(&tipc_proto, 1);
1789        if (res) {
1790                pr_err("Failed to register TIPC protocol type\n");
1791                goto out;
1792        }
1793
1794        res = sock_register(&tipc_family_ops);
1795        if (res) {
1796                pr_err("Failed to register TIPC socket type\n");
1797                proto_unregister(&tipc_proto);
1798                goto out;
1799        }
1800
1801        sockets_enabled = 1;
1802 out:
1803        return res;
1804}
1805
1806/**
1807 * tipc_socket_stop - stop TIPC socket interface
1808 */
1809void tipc_socket_stop(void)
1810{
1811        if (!sockets_enabled)
1812                return;
1813
1814        sockets_enabled = 0;
1815        sock_unregister(tipc_family_ops.family);
1816        proto_unregister(&tipc_proto);
1817}
1818