linux/net/tipc/socket.c
<<
>>
Prefs
   1/*
   2 * net/tipc/socket.c: TIPC socket API
   3 *
   4 * Copyright (c) 2001-2007, Ericsson AB
   5 * Copyright (c) 2004-2008, 2010-2011, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include <net/sock.h>
  38
  39#include <linux/tipc.h>
  40#include <linux/tipc_config.h>
  41
  42#include "core.h"
  43#include "port.h"
  44
  45#define SS_LISTENING    -1      /* socket is listening */
  46#define SS_READY        -2      /* socket is connectionless */
  47
  48#define OVERLOAD_LIMIT_BASE     5000
  49#define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
  50
  51struct tipc_sock {
  52        struct sock sk;
  53        struct tipc_port *p;
  54        struct tipc_portid peer_name;
  55        long conn_timeout;
  56};
  57
  58#define tipc_sk(sk) ((struct tipc_sock *)(sk))
  59#define tipc_sk_port(sk) ((struct tipc_port *)(tipc_sk(sk)->p))
  60
  61#define tipc_rx_ready(sock) (!skb_queue_empty(&sock->sk->sk_receive_queue) || \
  62                        (sock->state == SS_DISCONNECTING))
  63
  64static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
  65static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
  66static void wakeupdispatch(struct tipc_port *tport);
  67
  68static const struct proto_ops packet_ops;
  69static const struct proto_ops stream_ops;
  70static const struct proto_ops msg_ops;
  71
  72static struct proto tipc_proto;
  73
  74static int sockets_enabled;
  75
  76static atomic_t tipc_queue_size = ATOMIC_INIT(0);
  77
  78/*
  79 * Revised TIPC socket locking policy:
  80 *
  81 * Most socket operations take the standard socket lock when they start
  82 * and hold it until they finish (or until they need to sleep).  Acquiring
  83 * this lock grants the owner exclusive access to the fields of the socket
  84 * data structures, with the exception of the backlog queue.  A few socket
  85 * operations can be done without taking the socket lock because they only
  86 * read socket information that never changes during the life of the socket.
  87 *
  88 * Socket operations may acquire the lock for the associated TIPC port if they
  89 * need to perform an operation on the port.  If any routine needs to acquire
  90 * both the socket lock and the port lock it must take the socket lock first
  91 * to avoid the risk of deadlock.
  92 *
  93 * The dispatcher handling incoming messages cannot grab the socket lock in
  94 * the standard fashion, since invoked it runs at the BH level and cannot block.
  95 * Instead, it checks to see if the socket lock is currently owned by someone,
  96 * and either handles the message itself or adds it to the socket's backlog
  97 * queue; in the latter case the queued message is processed once the process
  98 * owning the socket lock releases it.
  99 *
 100 * NOTE: Releasing the socket lock while an operation is sleeping overcomes
 101 * the problem of a blocked socket operation preventing any other operations
 102 * from occurring.  However, applications must be careful if they have
 103 * multiple threads trying to send (or receive) on the same socket, as these
 104 * operations might interfere with each other.  For example, doing a connect
 105 * and a receive at the same time might allow the receive to consume the
 106 * ACK message meant for the connect.  While additional work could be done
 107 * to try and overcome this, it doesn't seem to be worthwhile at the present.
 108 *
 109 * NOTE: Releasing the socket lock while an operation is sleeping also ensures
 110 * that another operation that must be performed in a non-blocking manner is
 111 * not delayed for very long because the lock has already been taken.
 112 *
 113 * NOTE: This code assumes that certain fields of a port/socket pair are
 114 * constant over its lifetime; such fields can be examined without taking
 115 * the socket lock and/or port lock, and do not need to be re-read even
 116 * after resuming processing after waiting.  These fields include:
 117 *   - socket type
 118 *   - pointer to socket sk structure (aka tipc_sock structure)
 119 *   - pointer to port structure
 120 *   - port reference
 121 */
 122
 123/**
 124 * advance_rx_queue - discard first buffer in socket receive queue
 125 *
 126 * Caller must hold socket lock
 127 */
 128
 129static void advance_rx_queue(struct sock *sk)
 130{
 131        buf_discard(__skb_dequeue(&sk->sk_receive_queue));
 132        atomic_dec(&tipc_queue_size);
 133}
 134
 135/**
 136 * discard_rx_queue - discard all buffers in socket receive queue
 137 *
 138 * Caller must hold socket lock
 139 */
 140
 141static void discard_rx_queue(struct sock *sk)
 142{
 143        struct sk_buff *buf;
 144
 145        while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
 146                atomic_dec(&tipc_queue_size);
 147                buf_discard(buf);
 148        }
 149}
 150
 151/**
 152 * reject_rx_queue - reject all buffers in socket receive queue
 153 *
 154 * Caller must hold socket lock
 155 */
 156
 157static void reject_rx_queue(struct sock *sk)
 158{
 159        struct sk_buff *buf;
 160
 161        while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
 162                tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
 163                atomic_dec(&tipc_queue_size);
 164        }
 165}
 166
 167/**
 168 * tipc_create - create a TIPC socket
 169 * @net: network namespace (must be default network)
 170 * @sock: pre-allocated socket structure
 171 * @protocol: protocol indicator (must be 0)
 172 * @kern: caused by kernel or by userspace?
 173 *
 174 * This routine creates additional data structures used by the TIPC socket,
 175 * initializes them, and links them together.
 176 *
 177 * Returns 0 on success, errno otherwise
 178 */
 179
 180static int tipc_create(struct net *net, struct socket *sock, int protocol,
 181                       int kern)
 182{
 183        const struct proto_ops *ops;
 184        socket_state state;
 185        struct sock *sk;
 186        struct tipc_port *tp_ptr;
 187
 188        /* Validate arguments */
 189
 190        if (!net_eq(net, &init_net))
 191                return -EAFNOSUPPORT;
 192
 193        if (unlikely(protocol != 0))
 194                return -EPROTONOSUPPORT;
 195
 196        switch (sock->type) {
 197        case SOCK_STREAM:
 198                ops = &stream_ops;
 199                state = SS_UNCONNECTED;
 200                break;
 201        case SOCK_SEQPACKET:
 202                ops = &packet_ops;
 203                state = SS_UNCONNECTED;
 204                break;
 205        case SOCK_DGRAM:
 206        case SOCK_RDM:
 207                ops = &msg_ops;
 208                state = SS_READY;
 209                break;
 210        default:
 211                return -EPROTOTYPE;
 212        }
 213
 214        /* Allocate socket's protocol area */
 215
 216        sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
 217        if (sk == NULL)
 218                return -ENOMEM;
 219
 220        /* Allocate TIPC port for socket to use */
 221
 222        tp_ptr = tipc_createport_raw(sk, &dispatch, &wakeupdispatch,
 223                                     TIPC_LOW_IMPORTANCE);
 224        if (unlikely(!tp_ptr)) {
 225                sk_free(sk);
 226                return -ENOMEM;
 227        }
 228
 229        /* Finish initializing socket data structures */
 230
 231        sock->ops = ops;
 232        sock->state = state;
 233
 234        sock_init_data(sock, sk);
 235        sk->sk_backlog_rcv = backlog_rcv;
 236        tipc_sk(sk)->p = tp_ptr;
 237        tipc_sk(sk)->conn_timeout = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT);
 238
 239        spin_unlock_bh(tp_ptr->lock);
 240
 241        if (sock->state == SS_READY) {
 242                tipc_set_portunreturnable(tp_ptr->ref, 1);
 243                if (sock->type == SOCK_DGRAM)
 244                        tipc_set_portunreliable(tp_ptr->ref, 1);
 245        }
 246
 247        return 0;
 248}
 249
 250/**
 251 * release - destroy a TIPC socket
 252 * @sock: socket to destroy
 253 *
 254 * This routine cleans up any messages that are still queued on the socket.
 255 * For DGRAM and RDM socket types, all queued messages are rejected.
 256 * For SEQPACKET and STREAM socket types, the first message is rejected
 257 * and any others are discarded.  (If the first message on a STREAM socket
 258 * is partially-read, it is discarded and the next one is rejected instead.)
 259 *
 260 * NOTE: Rejected messages are not necessarily returned to the sender!  They
 261 * are returned or discarded according to the "destination droppable" setting
 262 * specified for the message by the sender.
 263 *
 264 * Returns 0 on success, errno otherwise
 265 */
 266
 267static int release(struct socket *sock)
 268{
 269        struct sock *sk = sock->sk;
 270        struct tipc_port *tport;
 271        struct sk_buff *buf;
 272        int res;
 273
 274        /*
 275         * Exit if socket isn't fully initialized (occurs when a failed accept()
 276         * releases a pre-allocated child socket that was never used)
 277         */
 278
 279        if (sk == NULL)
 280                return 0;
 281
 282        tport = tipc_sk_port(sk);
 283        lock_sock(sk);
 284
 285        /*
 286         * Reject all unreceived messages, except on an active connection
 287         * (which disconnects locally & sends a 'FIN+' to peer)
 288         */
 289
 290        while (sock->state != SS_DISCONNECTING) {
 291                buf = __skb_dequeue(&sk->sk_receive_queue);
 292                if (buf == NULL)
 293                        break;
 294                atomic_dec(&tipc_queue_size);
 295                if (TIPC_SKB_CB(buf)->handle != 0)
 296                        buf_discard(buf);
 297                else {
 298                        if ((sock->state == SS_CONNECTING) ||
 299                            (sock->state == SS_CONNECTED)) {
 300                                sock->state = SS_DISCONNECTING;
 301                                tipc_disconnect(tport->ref);
 302                        }
 303                        tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
 304                }
 305        }
 306
 307        /*
 308         * Delete TIPC port; this ensures no more messages are queued
 309         * (also disconnects an active connection & sends a 'FIN-' to peer)
 310         */
 311
 312        res = tipc_deleteport(tport->ref);
 313
 314        /* Discard any remaining (connection-based) messages in receive queue */
 315
 316        discard_rx_queue(sk);
 317
 318        /* Reject any messages that accumulated in backlog queue */
 319
 320        sock->state = SS_DISCONNECTING;
 321        release_sock(sk);
 322
 323        sock_put(sk);
 324        sock->sk = NULL;
 325
 326        return res;
 327}
 328
 329/**
 330 * bind - associate or disassocate TIPC name(s) with a socket
 331 * @sock: socket structure
 332 * @uaddr: socket address describing name(s) and desired operation
 333 * @uaddr_len: size of socket address data structure
 334 *
 335 * Name and name sequence binding is indicated using a positive scope value;
 336 * a negative scope value unbinds the specified name.  Specifying no name
 337 * (i.e. a socket address length of 0) unbinds all names from the socket.
 338 *
 339 * Returns 0 on success, errno otherwise
 340 *
 341 * NOTE: This routine doesn't need to take the socket lock since it doesn't
 342 *       access any non-constant socket information.
 343 */
 344
 345static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
 346{
 347        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
 348        u32 portref = tipc_sk_port(sock->sk)->ref;
 349
 350        if (unlikely(!uaddr_len))
 351                return tipc_withdraw(portref, 0, NULL);
 352
 353        if (uaddr_len < sizeof(struct sockaddr_tipc))
 354                return -EINVAL;
 355        if (addr->family != AF_TIPC)
 356                return -EAFNOSUPPORT;
 357
 358        if (addr->addrtype == TIPC_ADDR_NAME)
 359                addr->addr.nameseq.upper = addr->addr.nameseq.lower;
 360        else if (addr->addrtype != TIPC_ADDR_NAMESEQ)
 361                return -EAFNOSUPPORT;
 362
 363        return (addr->scope > 0) ?
 364                tipc_publish(portref, addr->scope, &addr->addr.nameseq) :
 365                tipc_withdraw(portref, -addr->scope, &addr->addr.nameseq);
 366}
 367
 368/**
 369 * get_name - get port ID of socket or peer socket
 370 * @sock: socket structure
 371 * @uaddr: area for returned socket address
 372 * @uaddr_len: area for returned length of socket address
 373 * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
 374 *
 375 * Returns 0 on success, errno otherwise
 376 *
 377 * NOTE: This routine doesn't need to take the socket lock since it only
 378 *       accesses socket information that is unchanging (or which changes in
 379 *       a completely predictable manner).
 380 */
 381
 382static int get_name(struct socket *sock, struct sockaddr *uaddr,
 383                    int *uaddr_len, int peer)
 384{
 385        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
 386        struct tipc_sock *tsock = tipc_sk(sock->sk);
 387
 388        memset(addr, 0, sizeof(*addr));
 389        if (peer) {
 390                if ((sock->state != SS_CONNECTED) &&
 391                        ((peer != 2) || (sock->state != SS_DISCONNECTING)))
 392                        return -ENOTCONN;
 393                addr->addr.id.ref = tsock->peer_name.ref;
 394                addr->addr.id.node = tsock->peer_name.node;
 395        } else {
 396                addr->addr.id.ref = tsock->p->ref;
 397                addr->addr.id.node = tipc_own_addr;
 398        }
 399
 400        *uaddr_len = sizeof(*addr);
 401        addr->addrtype = TIPC_ADDR_ID;
 402        addr->family = AF_TIPC;
 403        addr->scope = 0;
 404        addr->addr.name.domain = 0;
 405
 406        return 0;
 407}
 408
 409/**
 410 * poll - read and possibly block on pollmask
 411 * @file: file structure associated with the socket
 412 * @sock: socket for which to calculate the poll bits
 413 * @wait: ???
 414 *
 415 * Returns pollmask value
 416 *
 417 * COMMENTARY:
 418 * It appears that the usual socket locking mechanisms are not useful here
 419 * since the pollmask info is potentially out-of-date the moment this routine
 420 * exits.  TCP and other protocols seem to rely on higher level poll routines
 421 * to handle any preventable race conditions, so TIPC will do the same ...
 422 *
 423 * TIPC sets the returned events as follows:
 424 *
 425 * socket state         flags set
 426 * ------------         ---------
 427 * unconnected          no read flags
 428 *                      no write flags
 429 *
 430 * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
 431 *                      no write flags
 432 *
 433 * connected            POLLIN/POLLRDNORM if data in rx queue
 434 *                      POLLOUT if port is not congested
 435 *
 436 * disconnecting        POLLIN/POLLRDNORM/POLLHUP
 437 *                      no write flags
 438 *
 439 * listening            POLLIN if SYN in rx queue
 440 *                      no write flags
 441 *
 442 * ready                POLLIN/POLLRDNORM if data in rx queue
 443 * [connectionless]     POLLOUT (since port cannot be congested)
 444 *
 445 * IMPORTANT: The fact that a read or write operation is indicated does NOT
 446 * imply that the operation will succeed, merely that it should be performed
 447 * and will not block.
 448 */
 449
 450static unsigned int poll(struct file *file, struct socket *sock,
 451                         poll_table *wait)
 452{
 453        struct sock *sk = sock->sk;
 454        u32 mask = 0;
 455
 456        poll_wait(file, sk_sleep(sk), wait);
 457
 458        switch ((int)sock->state) {
 459        case SS_READY:
 460        case SS_CONNECTED:
 461                if (!tipc_sk_port(sk)->congested)
 462                        mask |= POLLOUT;
 463                /* fall thru' */
 464        case SS_CONNECTING:
 465        case SS_LISTENING:
 466                if (!skb_queue_empty(&sk->sk_receive_queue))
 467                        mask |= (POLLIN | POLLRDNORM);
 468                break;
 469        case SS_DISCONNECTING:
 470                mask = (POLLIN | POLLRDNORM | POLLHUP);
 471                break;
 472        }
 473
 474        return mask;
 475}
 476
 477/**
 478 * dest_name_check - verify user is permitted to send to specified port name
 479 * @dest: destination address
 480 * @m: descriptor for message to be sent
 481 *
 482 * Prevents restricted configuration commands from being issued by
 483 * unauthorized users.
 484 *
 485 * Returns 0 if permission is granted, otherwise errno
 486 */
 487
 488static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
 489{
 490        struct tipc_cfg_msg_hdr hdr;
 491
 492        if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
 493                return 0;
 494        if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
 495                return 0;
 496        if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
 497                return -EACCES;
 498
 499        if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
 500                return -EMSGSIZE;
 501        if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
 502                return -EFAULT;
 503        if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
 504                return -EACCES;
 505
 506        return 0;
 507}
 508
 509/**
 510 * send_msg - send message in connectionless manner
 511 * @iocb: if NULL, indicates that socket lock is already held
 512 * @sock: socket structure
 513 * @m: message to send
 514 * @total_len: length of message
 515 *
 516 * Message must have an destination specified explicitly.
 517 * Used for SOCK_RDM and SOCK_DGRAM messages,
 518 * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
 519 * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
 520 *
 521 * Returns the number of bytes sent on success, or errno otherwise
 522 */
 523
 524static int send_msg(struct kiocb *iocb, struct socket *sock,
 525                    struct msghdr *m, size_t total_len)
 526{
 527        struct sock *sk = sock->sk;
 528        struct tipc_port *tport = tipc_sk_port(sk);
 529        struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
 530        int needs_conn;
 531        int res = -EINVAL;
 532
 533        if (unlikely(!dest))
 534                return -EDESTADDRREQ;
 535        if (unlikely((m->msg_namelen < sizeof(*dest)) ||
 536                     (dest->family != AF_TIPC)))
 537                return -EINVAL;
 538        if ((total_len > TIPC_MAX_USER_MSG_SIZE) ||
 539            (m->msg_iovlen > (unsigned)INT_MAX))
 540                return -EMSGSIZE;
 541
 542        if (iocb)
 543                lock_sock(sk);
 544
 545        needs_conn = (sock->state != SS_READY);
 546        if (unlikely(needs_conn)) {
 547                if (sock->state == SS_LISTENING) {
 548                        res = -EPIPE;
 549                        goto exit;
 550                }
 551                if (sock->state != SS_UNCONNECTED) {
 552                        res = -EISCONN;
 553                        goto exit;
 554                }
 555                if ((tport->published) ||
 556                    ((sock->type == SOCK_STREAM) && (total_len != 0))) {
 557                        res = -EOPNOTSUPP;
 558                        goto exit;
 559                }
 560                if (dest->addrtype == TIPC_ADDR_NAME) {
 561                        tport->conn_type = dest->addr.name.name.type;
 562                        tport->conn_instance = dest->addr.name.name.instance;
 563                }
 564
 565                /* Abort any pending connection attempts (very unlikely) */
 566
 567                reject_rx_queue(sk);
 568        }
 569
 570        do {
 571                if (dest->addrtype == TIPC_ADDR_NAME) {
 572                        res = dest_name_check(dest, m);
 573                        if (res)
 574                                break;
 575                        res = tipc_send2name(tport->ref,
 576                                             &dest->addr.name.name,
 577                                             dest->addr.name.domain,
 578                                             m->msg_iovlen,
 579                                             m->msg_iov,
 580                                             total_len);
 581                } else if (dest->addrtype == TIPC_ADDR_ID) {
 582                        res = tipc_send2port(tport->ref,
 583                                             &dest->addr.id,
 584                                             m->msg_iovlen,
 585                                             m->msg_iov,
 586                                             total_len);
 587                } else if (dest->addrtype == TIPC_ADDR_MCAST) {
 588                        if (needs_conn) {
 589                                res = -EOPNOTSUPP;
 590                                break;
 591                        }
 592                        res = dest_name_check(dest, m);
 593                        if (res)
 594                                break;
 595                        res = tipc_multicast(tport->ref,
 596                                             &dest->addr.nameseq,
 597                                             m->msg_iovlen,
 598                                             m->msg_iov,
 599                                             total_len);
 600                }
 601                if (likely(res != -ELINKCONG)) {
 602                        if (needs_conn && (res >= 0))
 603                                sock->state = SS_CONNECTING;
 604                        break;
 605                }
 606                if (m->msg_flags & MSG_DONTWAIT) {
 607                        res = -EWOULDBLOCK;
 608                        break;
 609                }
 610                release_sock(sk);
 611                res = wait_event_interruptible(*sk_sleep(sk),
 612                                               !tport->congested);
 613                lock_sock(sk);
 614                if (res)
 615                        break;
 616        } while (1);
 617
 618exit:
 619        if (iocb)
 620                release_sock(sk);
 621        return res;
 622}
 623
 624/**
 625 * send_packet - send a connection-oriented message
 626 * @iocb: if NULL, indicates that socket lock is already held
 627 * @sock: socket structure
 628 * @m: message to send
 629 * @total_len: length of message
 630 *
 631 * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
 632 *
 633 * Returns the number of bytes sent on success, or errno otherwise
 634 */
 635
 636static int send_packet(struct kiocb *iocb, struct socket *sock,
 637                       struct msghdr *m, size_t total_len)
 638{
 639        struct sock *sk = sock->sk;
 640        struct tipc_port *tport = tipc_sk_port(sk);
 641        struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
 642        int res;
 643
 644        /* Handle implied connection establishment */
 645
 646        if (unlikely(dest))
 647                return send_msg(iocb, sock, m, total_len);
 648
 649        if ((total_len > TIPC_MAX_USER_MSG_SIZE) ||
 650            (m->msg_iovlen > (unsigned)INT_MAX))
 651                return -EMSGSIZE;
 652
 653        if (iocb)
 654                lock_sock(sk);
 655
 656        do {
 657                if (unlikely(sock->state != SS_CONNECTED)) {
 658                        if (sock->state == SS_DISCONNECTING)
 659                                res = -EPIPE;
 660                        else
 661                                res = -ENOTCONN;
 662                        break;
 663                }
 664
 665                res = tipc_send(tport->ref, m->msg_iovlen, m->msg_iov,
 666                                total_len);
 667                if (likely(res != -ELINKCONG))
 668                        break;
 669                if (m->msg_flags & MSG_DONTWAIT) {
 670                        res = -EWOULDBLOCK;
 671                        break;
 672                }
 673                release_sock(sk);
 674                res = wait_event_interruptible(*sk_sleep(sk),
 675                        (!tport->congested || !tport->connected));
 676                lock_sock(sk);
 677                if (res)
 678                        break;
 679        } while (1);
 680
 681        if (iocb)
 682                release_sock(sk);
 683        return res;
 684}
 685
 686/**
 687 * send_stream - send stream-oriented data
 688 * @iocb: (unused)
 689 * @sock: socket structure
 690 * @m: data to send
 691 * @total_len: total length of data to be sent
 692 *
 693 * Used for SOCK_STREAM data.
 694 *
 695 * Returns the number of bytes sent on success (or partial success),
 696 * or errno if no data sent
 697 */
 698
 699static int send_stream(struct kiocb *iocb, struct socket *sock,
 700                       struct msghdr *m, size_t total_len)
 701{
 702        struct sock *sk = sock->sk;
 703        struct tipc_port *tport = tipc_sk_port(sk);
 704        struct msghdr my_msg;
 705        struct iovec my_iov;
 706        struct iovec *curr_iov;
 707        int curr_iovlen;
 708        char __user *curr_start;
 709        u32 hdr_size;
 710        int curr_left;
 711        int bytes_to_send;
 712        int bytes_sent;
 713        int res;
 714
 715        lock_sock(sk);
 716
 717        /* Handle special cases where there is no connection */
 718
 719        if (unlikely(sock->state != SS_CONNECTED)) {
 720                if (sock->state == SS_UNCONNECTED) {
 721                        res = send_packet(NULL, sock, m, total_len);
 722                        goto exit;
 723                } else if (sock->state == SS_DISCONNECTING) {
 724                        res = -EPIPE;
 725                        goto exit;
 726                } else {
 727                        res = -ENOTCONN;
 728                        goto exit;
 729                }
 730        }
 731
 732        if (unlikely(m->msg_name)) {
 733                res = -EISCONN;
 734                goto exit;
 735        }
 736
 737        if ((total_len > (unsigned)INT_MAX) ||
 738            (m->msg_iovlen > (unsigned)INT_MAX)) {
 739                res = -EMSGSIZE;
 740                goto exit;
 741        }
 742
 743        /*
 744         * Send each iovec entry using one or more messages
 745         *
 746         * Note: This algorithm is good for the most likely case
 747         * (i.e. one large iovec entry), but could be improved to pass sets
 748         * of small iovec entries into send_packet().
 749         */
 750
 751        curr_iov = m->msg_iov;
 752        curr_iovlen = m->msg_iovlen;
 753        my_msg.msg_iov = &my_iov;
 754        my_msg.msg_iovlen = 1;
 755        my_msg.msg_flags = m->msg_flags;
 756        my_msg.msg_name = NULL;
 757        bytes_sent = 0;
 758
 759        hdr_size = msg_hdr_sz(&tport->phdr);
 760
 761        while (curr_iovlen--) {
 762                curr_start = curr_iov->iov_base;
 763                curr_left = curr_iov->iov_len;
 764
 765                while (curr_left) {
 766                        bytes_to_send = tport->max_pkt - hdr_size;
 767                        if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
 768                                bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
 769                        if (curr_left < bytes_to_send)
 770                                bytes_to_send = curr_left;
 771                        my_iov.iov_base = curr_start;
 772                        my_iov.iov_len = bytes_to_send;
 773                        res = send_packet(NULL, sock, &my_msg, bytes_to_send);
 774                        if (res < 0) {
 775                                if (bytes_sent)
 776                                        res = bytes_sent;
 777                                goto exit;
 778                        }
 779                        curr_left -= bytes_to_send;
 780                        curr_start += bytes_to_send;
 781                        bytes_sent += bytes_to_send;
 782                }
 783
 784                curr_iov++;
 785        }
 786        res = bytes_sent;
 787exit:
 788        release_sock(sk);
 789        return res;
 790}
 791
 792/**
 793 * auto_connect - complete connection setup to a remote port
 794 * @sock: socket structure
 795 * @msg: peer's response message
 796 *
 797 * Returns 0 on success, errno otherwise
 798 */
 799
 800static int auto_connect(struct socket *sock, struct tipc_msg *msg)
 801{
 802        struct tipc_sock *tsock = tipc_sk(sock->sk);
 803
 804        if (msg_errcode(msg)) {
 805                sock->state = SS_DISCONNECTING;
 806                return -ECONNREFUSED;
 807        }
 808
 809        tsock->peer_name.ref = msg_origport(msg);
 810        tsock->peer_name.node = msg_orignode(msg);
 811        tipc_connect2port(tsock->p->ref, &tsock->peer_name);
 812        tipc_set_portimportance(tsock->p->ref, msg_importance(msg));
 813        sock->state = SS_CONNECTED;
 814        return 0;
 815}
 816
 817/**
 818 * set_orig_addr - capture sender's address for received message
 819 * @m: descriptor for message info
 820 * @msg: received message header
 821 *
 822 * Note: Address is not captured if not requested by receiver.
 823 */
 824
 825static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
 826{
 827        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)m->msg_name;
 828
 829        if (addr) {
 830                addr->family = AF_TIPC;
 831                addr->addrtype = TIPC_ADDR_ID;
 832                addr->addr.id.ref = msg_origport(msg);
 833                addr->addr.id.node = msg_orignode(msg);
 834                addr->addr.name.domain = 0;     /* could leave uninitialized */
 835                addr->scope = 0;                /* could leave uninitialized */
 836                m->msg_namelen = sizeof(struct sockaddr_tipc);
 837        }
 838}
 839
 840/**
 841 * anc_data_recv - optionally capture ancillary data for received message
 842 * @m: descriptor for message info
 843 * @msg: received message header
 844 * @tport: TIPC port associated with message
 845 *
 846 * Note: Ancillary data is not captured if not requested by receiver.
 847 *
 848 * Returns 0 if successful, otherwise errno
 849 */
 850
 851static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
 852                                struct tipc_port *tport)
 853{
 854        u32 anc_data[3];
 855        u32 err;
 856        u32 dest_type;
 857        int has_name;
 858        int res;
 859
 860        if (likely(m->msg_controllen == 0))
 861                return 0;
 862
 863        /* Optionally capture errored message object(s) */
 864
 865        err = msg ? msg_errcode(msg) : 0;
 866        if (unlikely(err)) {
 867                anc_data[0] = err;
 868                anc_data[1] = msg_data_sz(msg);
 869                res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
 870                if (res)
 871                        return res;
 872                if (anc_data[1]) {
 873                        res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
 874                                       msg_data(msg));
 875                        if (res)
 876                                return res;
 877                }
 878        }
 879
 880        /* Optionally capture message destination object */
 881
 882        dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
 883        switch (dest_type) {
 884        case TIPC_NAMED_MSG:
 885                has_name = 1;
 886                anc_data[0] = msg_nametype(msg);
 887                anc_data[1] = msg_namelower(msg);
 888                anc_data[2] = msg_namelower(msg);
 889                break;
 890        case TIPC_MCAST_MSG:
 891                has_name = 1;
 892                anc_data[0] = msg_nametype(msg);
 893                anc_data[1] = msg_namelower(msg);
 894                anc_data[2] = msg_nameupper(msg);
 895                break;
 896        case TIPC_CONN_MSG:
 897                has_name = (tport->conn_type != 0);
 898                anc_data[0] = tport->conn_type;
 899                anc_data[1] = tport->conn_instance;
 900                anc_data[2] = tport->conn_instance;
 901                break;
 902        default:
 903                has_name = 0;
 904        }
 905        if (has_name) {
 906                res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
 907                if (res)
 908                        return res;
 909        }
 910
 911        return 0;
 912}
 913
 914/**
 915 * recv_msg - receive packet-oriented message
 916 * @iocb: (unused)
 917 * @m: descriptor for message info
 918 * @buf_len: total size of user buffer area
 919 * @flags: receive flags
 920 *
 921 * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
 922 * If the complete message doesn't fit in user area, truncate it.
 923 *
 924 * Returns size of returned message data, errno otherwise
 925 */
 926
 927static int recv_msg(struct kiocb *iocb, struct socket *sock,
 928                    struct msghdr *m, size_t buf_len, int flags)
 929{
 930        struct sock *sk = sock->sk;
 931        struct tipc_port *tport = tipc_sk_port(sk);
 932        struct sk_buff *buf;
 933        struct tipc_msg *msg;
 934        long timeout;
 935        unsigned int sz;
 936        u32 err;
 937        int res;
 938
 939        /* Catch invalid receive requests */
 940
 941        if (unlikely(!buf_len))
 942                return -EINVAL;
 943
 944        lock_sock(sk);
 945
 946        if (unlikely(sock->state == SS_UNCONNECTED)) {
 947                res = -ENOTCONN;
 948                goto exit;
 949        }
 950
 951        timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
 952restart:
 953
 954        /* Look for a message in receive queue; wait if necessary */
 955
 956        while (skb_queue_empty(&sk->sk_receive_queue)) {
 957                if (sock->state == SS_DISCONNECTING) {
 958                        res = -ENOTCONN;
 959                        goto exit;
 960                }
 961                if (timeout <= 0L) {
 962                        res = timeout ? timeout : -EWOULDBLOCK;
 963                        goto exit;
 964                }
 965                release_sock(sk);
 966                timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
 967                                                           tipc_rx_ready(sock),
 968                                                           timeout);
 969                lock_sock(sk);
 970        }
 971
 972        /* Look at first message in receive queue */
 973
 974        buf = skb_peek(&sk->sk_receive_queue);
 975        msg = buf_msg(buf);
 976        sz = msg_data_sz(msg);
 977        err = msg_errcode(msg);
 978
 979        /* Complete connection setup for an implied connect */
 980
 981        if (unlikely(sock->state == SS_CONNECTING)) {
 982                res = auto_connect(sock, msg);
 983                if (res)
 984                        goto exit;
 985        }
 986
 987        /* Discard an empty non-errored message & try again */
 988
 989        if ((!sz) && (!err)) {
 990                advance_rx_queue(sk);
 991                goto restart;
 992        }
 993
 994        /* Capture sender's address (optional) */
 995
 996        set_orig_addr(m, msg);
 997
 998        /* Capture ancillary data (optional) */
 999
1000        res = anc_data_recv(m, msg, tport);
1001        if (res)
1002                goto exit;
1003
1004        /* Capture message data (if valid) & compute return value (always) */
1005
1006        if (!err) {
1007                if (unlikely(buf_len < sz)) {
1008                        sz = buf_len;
1009                        m->msg_flags |= MSG_TRUNC;
1010                }
1011                res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg),
1012                                              m->msg_iov, sz);
1013                if (res)
1014                        goto exit;
1015                res = sz;
1016        } else {
1017                if ((sock->state == SS_READY) ||
1018                    ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1019                        res = 0;
1020                else
1021                        res = -ECONNRESET;
1022        }
1023
1024        /* Consume received message (optional) */
1025
1026        if (likely(!(flags & MSG_PEEK))) {
1027                if ((sock->state != SS_READY) &&
1028                    (++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
1029                        tipc_acknowledge(tport->ref, tport->conn_unacked);
1030                advance_rx_queue(sk);
1031        }
1032exit:
1033        release_sock(sk);
1034        return res;
1035}
1036
1037/**
1038 * recv_stream - receive stream-oriented data
1039 * @iocb: (unused)
1040 * @m: descriptor for message info
1041 * @buf_len: total size of user buffer area
1042 * @flags: receive flags
1043 *
1044 * Used for SOCK_STREAM messages only.  If not enough data is available
1045 * will optionally wait for more; never truncates data.
1046 *
1047 * Returns size of returned message data, errno otherwise
1048 */
1049
1050static int recv_stream(struct kiocb *iocb, struct socket *sock,
1051                       struct msghdr *m, size_t buf_len, int flags)
1052{
1053        struct sock *sk = sock->sk;
1054        struct tipc_port *tport = tipc_sk_port(sk);
1055        struct sk_buff *buf;
1056        struct tipc_msg *msg;
1057        long timeout;
1058        unsigned int sz;
1059        int sz_to_copy, target, needed;
1060        int sz_copied = 0;
1061        u32 err;
1062        int res = 0;
1063
1064        /* Catch invalid receive attempts */
1065
1066        if (unlikely(!buf_len))
1067                return -EINVAL;
1068
1069        lock_sock(sk);
1070
1071        if (unlikely((sock->state == SS_UNCONNECTED) ||
1072                     (sock->state == SS_CONNECTING))) {
1073                res = -ENOTCONN;
1074                goto exit;
1075        }
1076
1077        target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1078        timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1079restart:
1080
1081        /* Look for a message in receive queue; wait if necessary */
1082
1083        while (skb_queue_empty(&sk->sk_receive_queue)) {
1084                if (sock->state == SS_DISCONNECTING) {
1085                        res = -ENOTCONN;
1086                        goto exit;
1087                }
1088                if (timeout <= 0L) {
1089                        res = timeout ? timeout : -EWOULDBLOCK;
1090                        goto exit;
1091                }
1092                release_sock(sk);
1093                timeout = wait_event_interruptible_timeout(*sk_sleep(sk),
1094                                                           tipc_rx_ready(sock),
1095                                                           timeout);
1096                lock_sock(sk);
1097        }
1098
1099        /* Look at first message in receive queue */
1100
1101        buf = skb_peek(&sk->sk_receive_queue);
1102        msg = buf_msg(buf);
1103        sz = msg_data_sz(msg);
1104        err = msg_errcode(msg);
1105
1106        /* Discard an empty non-errored message & try again */
1107
1108        if ((!sz) && (!err)) {
1109                advance_rx_queue(sk);
1110                goto restart;
1111        }
1112
1113        /* Optionally capture sender's address & ancillary data of first msg */
1114
1115        if (sz_copied == 0) {
1116                set_orig_addr(m, msg);
1117                res = anc_data_recv(m, msg, tport);
1118                if (res)
1119                        goto exit;
1120        }
1121
1122        /* Capture message data (if valid) & compute return value (always) */
1123
1124        if (!err) {
1125                u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1126
1127                sz -= offset;
1128                needed = (buf_len - sz_copied);
1129                sz_to_copy = (sz <= needed) ? sz : needed;
1130
1131                res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset,
1132                                              m->msg_iov, sz_to_copy);
1133                if (res)
1134                        goto exit;
1135
1136                sz_copied += sz_to_copy;
1137
1138                if (sz_to_copy < sz) {
1139                        if (!(flags & MSG_PEEK))
1140                                TIPC_SKB_CB(buf)->handle =
1141                                (void *)(unsigned long)(offset + sz_to_copy);
1142                        goto exit;
1143                }
1144        } else {
1145                if (sz_copied != 0)
1146                        goto exit; /* can't add error msg to valid data */
1147
1148                if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1149                        res = 0;
1150                else
1151                        res = -ECONNRESET;
1152        }
1153
1154        /* Consume received message (optional) */
1155
1156        if (likely(!(flags & MSG_PEEK))) {
1157                if (unlikely(++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
1158                        tipc_acknowledge(tport->ref, tport->conn_unacked);
1159                advance_rx_queue(sk);
1160        }
1161
1162        /* Loop around if more data is required */
1163
1164        if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1165            (!skb_queue_empty(&sk->sk_receive_queue) ||
1166            (sz_copied < target)) &&    /* and more is ready or required */
1167            (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1168            (!err))                     /* and haven't reached a FIN */
1169                goto restart;
1170
1171exit:
1172        release_sock(sk);
1173        return sz_copied ? sz_copied : res;
1174}
1175
1176/**
1177 * rx_queue_full - determine if receive queue can accept another message
1178 * @msg: message to be added to queue
1179 * @queue_size: current size of queue
1180 * @base: nominal maximum size of queue
1181 *
1182 * Returns 1 if queue is unable to accept message, 0 otherwise
1183 */
1184
1185static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base)
1186{
1187        u32 threshold;
1188        u32 imp = msg_importance(msg);
1189
1190        if (imp == TIPC_LOW_IMPORTANCE)
1191                threshold = base;
1192        else if (imp == TIPC_MEDIUM_IMPORTANCE)
1193                threshold = base * 2;
1194        else if (imp == TIPC_HIGH_IMPORTANCE)
1195                threshold = base * 100;
1196        else
1197                return 0;
1198
1199        if (msg_connected(msg))
1200                threshold *= 4;
1201
1202        return queue_size >= threshold;
1203}
1204
1205/**
1206 * filter_rcv - validate incoming message
1207 * @sk: socket
1208 * @buf: message
1209 *
1210 * Enqueues message on receive queue if acceptable; optionally handles
1211 * disconnect indication for a connected socket.
1212 *
1213 * Called with socket lock already taken; port lock may also be taken.
1214 *
1215 * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1216 */
1217
1218static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1219{
1220        struct socket *sock = sk->sk_socket;
1221        struct tipc_msg *msg = buf_msg(buf);
1222        u32 recv_q_len;
1223
1224        /* Reject message if it is wrong sort of message for socket */
1225
1226        /*
1227         * WOULD IT BE BETTER TO JUST DISCARD THESE MESSAGES INSTEAD?
1228         * "NO PORT" ISN'T REALLY THE RIGHT ERROR CODE, AND THERE MAY
1229         * BE SECURITY IMPLICATIONS INHERENT IN REJECTING INVALID TRAFFIC
1230         */
1231
1232        if (sock->state == SS_READY) {
1233                if (msg_connected(msg))
1234                        return TIPC_ERR_NO_PORT;
1235        } else {
1236                if (msg_mcast(msg))
1237                        return TIPC_ERR_NO_PORT;
1238                if (sock->state == SS_CONNECTED) {
1239                        if (!msg_connected(msg))
1240                                return TIPC_ERR_NO_PORT;
1241                } else if (sock->state == SS_CONNECTING) {
1242                        if (!msg_connected(msg) && (msg_errcode(msg) == 0))
1243                                return TIPC_ERR_NO_PORT;
1244                } else if (sock->state == SS_LISTENING) {
1245                        if (msg_connected(msg) || msg_errcode(msg))
1246                                return TIPC_ERR_NO_PORT;
1247                } else if (sock->state == SS_DISCONNECTING) {
1248                        return TIPC_ERR_NO_PORT;
1249                } else /* (sock->state == SS_UNCONNECTED) */ {
1250                        if (msg_connected(msg) || msg_errcode(msg))
1251                                return TIPC_ERR_NO_PORT;
1252                }
1253        }
1254
1255        /* Reject message if there isn't room to queue it */
1256
1257        recv_q_len = (u32)atomic_read(&tipc_queue_size);
1258        if (unlikely(recv_q_len >= OVERLOAD_LIMIT_BASE)) {
1259                if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE))
1260                        return TIPC_ERR_OVERLOAD;
1261        }
1262        recv_q_len = skb_queue_len(&sk->sk_receive_queue);
1263        if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) {
1264                if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2))
1265                        return TIPC_ERR_OVERLOAD;
1266        }
1267
1268        /* Enqueue message (finally!) */
1269
1270        TIPC_SKB_CB(buf)->handle = 0;
1271        atomic_inc(&tipc_queue_size);
1272        __skb_queue_tail(&sk->sk_receive_queue, buf);
1273
1274        /* Initiate connection termination for an incoming 'FIN' */
1275
1276        if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) {
1277                sock->state = SS_DISCONNECTING;
1278                tipc_disconnect_port(tipc_sk_port(sk));
1279        }
1280
1281        if (waitqueue_active(sk_sleep(sk)))
1282                wake_up_interruptible(sk_sleep(sk));
1283        return TIPC_OK;
1284}
1285
1286/**
1287 * backlog_rcv - handle incoming message from backlog queue
1288 * @sk: socket
1289 * @buf: message
1290 *
1291 * Caller must hold socket lock, but not port lock.
1292 *
1293 * Returns 0
1294 */
1295
1296static int backlog_rcv(struct sock *sk, struct sk_buff *buf)
1297{
1298        u32 res;
1299
1300        res = filter_rcv(sk, buf);
1301        if (res)
1302                tipc_reject_msg(buf, res);
1303        return 0;
1304}
1305
1306/**
1307 * dispatch - handle incoming message
1308 * @tport: TIPC port that received message
1309 * @buf: message
1310 *
1311 * Called with port lock already taken.
1312 *
1313 * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1314 */
1315
1316static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
1317{
1318        struct sock *sk = (struct sock *)tport->usr_handle;
1319        u32 res;
1320
1321        /*
1322         * Process message if socket is unlocked; otherwise add to backlog queue
1323         *
1324         * This code is based on sk_receive_skb(), but must be distinct from it
1325         * since a TIPC-specific filter/reject mechanism is utilized
1326         */
1327
1328        bh_lock_sock(sk);
1329        if (!sock_owned_by_user(sk)) {
1330                res = filter_rcv(sk, buf);
1331        } else {
1332                if (sk_add_backlog(sk, buf))
1333                        res = TIPC_ERR_OVERLOAD;
1334                else
1335                        res = TIPC_OK;
1336        }
1337        bh_unlock_sock(sk);
1338
1339        return res;
1340}
1341
1342/**
1343 * wakeupdispatch - wake up port after congestion
1344 * @tport: port to wakeup
1345 *
1346 * Called with port lock already taken.
1347 */
1348
1349static void wakeupdispatch(struct tipc_port *tport)
1350{
1351        struct sock *sk = (struct sock *)tport->usr_handle;
1352
1353        if (waitqueue_active(sk_sleep(sk)))
1354                wake_up_interruptible(sk_sleep(sk));
1355}
1356
1357/**
1358 * connect - establish a connection to another TIPC port
1359 * @sock: socket structure
1360 * @dest: socket address for destination port
1361 * @destlen: size of socket address data structure
1362 * @flags: file-related flags associated with socket
1363 *
1364 * Returns 0 on success, errno otherwise
1365 */
1366
1367static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1368                   int flags)
1369{
1370        struct sock *sk = sock->sk;
1371        struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1372        struct msghdr m = {NULL,};
1373        struct sk_buff *buf;
1374        struct tipc_msg *msg;
1375        long timeout;
1376        int res;
1377
1378        lock_sock(sk);
1379
1380        /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1381
1382        if (sock->state == SS_READY) {
1383                res = -EOPNOTSUPP;
1384                goto exit;
1385        }
1386
1387        /* For now, TIPC does not support the non-blocking form of connect() */
1388
1389        if (flags & O_NONBLOCK) {
1390                res = -EOPNOTSUPP;
1391                goto exit;
1392        }
1393
1394        /* Issue Posix-compliant error code if socket is in the wrong state */
1395
1396        if (sock->state == SS_LISTENING) {
1397                res = -EOPNOTSUPP;
1398                goto exit;
1399        }
1400        if (sock->state == SS_CONNECTING) {
1401                res = -EALREADY;
1402                goto exit;
1403        }
1404        if (sock->state != SS_UNCONNECTED) {
1405                res = -EISCONN;
1406                goto exit;
1407        }
1408
1409        /*
1410         * Reject connection attempt using multicast address
1411         *
1412         * Note: send_msg() validates the rest of the address fields,
1413         *       so there's no need to do it here
1414         */
1415
1416        if (dst->addrtype == TIPC_ADDR_MCAST) {
1417                res = -EINVAL;
1418                goto exit;
1419        }
1420
1421        /* Reject any messages already in receive queue (very unlikely) */
1422
1423        reject_rx_queue(sk);
1424
1425        /* Send a 'SYN-' to destination */
1426
1427        m.msg_name = dest;
1428        m.msg_namelen = destlen;
1429        res = send_msg(NULL, sock, &m, 0);
1430        if (res < 0)
1431                goto exit;
1432
1433        /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1434
1435        timeout = tipc_sk(sk)->conn_timeout;
1436        release_sock(sk);
1437        res = wait_event_interruptible_timeout(*sk_sleep(sk),
1438                        (!skb_queue_empty(&sk->sk_receive_queue) ||
1439                        (sock->state != SS_CONNECTING)),
1440                        timeout ? timeout : MAX_SCHEDULE_TIMEOUT);
1441        lock_sock(sk);
1442
1443        if (res > 0) {
1444                buf = skb_peek(&sk->sk_receive_queue);
1445                if (buf != NULL) {
1446                        msg = buf_msg(buf);
1447                        res = auto_connect(sock, msg);
1448                        if (!res) {
1449                                if (!msg_data_sz(msg))
1450                                        advance_rx_queue(sk);
1451                        }
1452                } else {
1453                        if (sock->state == SS_CONNECTED)
1454                                res = -EISCONN;
1455                        else
1456                                res = -ECONNREFUSED;
1457                }
1458        } else {
1459                if (res == 0)
1460                        res = -ETIMEDOUT;
1461                else
1462                        ; /* leave "res" unchanged */
1463                sock->state = SS_DISCONNECTING;
1464        }
1465
1466exit:
1467        release_sock(sk);
1468        return res;
1469}
1470
1471/**
1472 * listen - allow socket to listen for incoming connections
1473 * @sock: socket structure
1474 * @len: (unused)
1475 *
1476 * Returns 0 on success, errno otherwise
1477 */
1478
1479static int listen(struct socket *sock, int len)
1480{
1481        struct sock *sk = sock->sk;
1482        int res;
1483
1484        lock_sock(sk);
1485
1486        if (sock->state == SS_READY)
1487                res = -EOPNOTSUPP;
1488        else if (sock->state != SS_UNCONNECTED)
1489                res = -EINVAL;
1490        else {
1491                sock->state = SS_LISTENING;
1492                res = 0;
1493        }
1494
1495        release_sock(sk);
1496        return res;
1497}
1498
1499/**
1500 * accept - wait for connection request
1501 * @sock: listening socket
1502 * @newsock: new socket that is to be connected
1503 * @flags: file-related flags associated with socket
1504 *
1505 * Returns 0 on success, errno otherwise
1506 */
1507
1508static int accept(struct socket *sock, struct socket *new_sock, int flags)
1509{
1510        struct sock *sk = sock->sk;
1511        struct sk_buff *buf;
1512        int res;
1513
1514        lock_sock(sk);
1515
1516        if (sock->state == SS_READY) {
1517                res = -EOPNOTSUPP;
1518                goto exit;
1519        }
1520        if (sock->state != SS_LISTENING) {
1521                res = -EINVAL;
1522                goto exit;
1523        }
1524
1525        while (skb_queue_empty(&sk->sk_receive_queue)) {
1526                if (flags & O_NONBLOCK) {
1527                        res = -EWOULDBLOCK;
1528                        goto exit;
1529                }
1530                release_sock(sk);
1531                res = wait_event_interruptible(*sk_sleep(sk),
1532                                (!skb_queue_empty(&sk->sk_receive_queue)));
1533                lock_sock(sk);
1534                if (res)
1535                        goto exit;
1536        }
1537
1538        buf = skb_peek(&sk->sk_receive_queue);
1539
1540        res = tipc_create(sock_net(sock->sk), new_sock, 0, 0);
1541        if (!res) {
1542                struct sock *new_sk = new_sock->sk;
1543                struct tipc_sock *new_tsock = tipc_sk(new_sk);
1544                struct tipc_port *new_tport = new_tsock->p;
1545                u32 new_ref = new_tport->ref;
1546                struct tipc_msg *msg = buf_msg(buf);
1547
1548                lock_sock(new_sk);
1549
1550                /*
1551                 * Reject any stray messages received by new socket
1552                 * before the socket lock was taken (very, very unlikely)
1553                 */
1554
1555                reject_rx_queue(new_sk);
1556
1557                /* Connect new socket to it's peer */
1558
1559                new_tsock->peer_name.ref = msg_origport(msg);
1560                new_tsock->peer_name.node = msg_orignode(msg);
1561                tipc_connect2port(new_ref, &new_tsock->peer_name);
1562                new_sock->state = SS_CONNECTED;
1563
1564                tipc_set_portimportance(new_ref, msg_importance(msg));
1565                if (msg_named(msg)) {
1566                        new_tport->conn_type = msg_nametype(msg);
1567                        new_tport->conn_instance = msg_nameinst(msg);
1568                }
1569
1570                /*
1571                 * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1572                 * Respond to 'SYN+' by queuing it on new socket.
1573                 */
1574
1575                if (!msg_data_sz(msg)) {
1576                        struct msghdr m = {NULL,};
1577
1578                        advance_rx_queue(sk);
1579                        send_packet(NULL, new_sock, &m, 0);
1580                } else {
1581                        __skb_dequeue(&sk->sk_receive_queue);
1582                        __skb_queue_head(&new_sk->sk_receive_queue, buf);
1583                }
1584                release_sock(new_sk);
1585        }
1586exit:
1587        release_sock(sk);
1588        return res;
1589}
1590
1591/**
1592 * shutdown - shutdown socket connection
1593 * @sock: socket structure
1594 * @how: direction to close (must be SHUT_RDWR)
1595 *
1596 * Terminates connection (if necessary), then purges socket's receive queue.
1597 *
1598 * Returns 0 on success, errno otherwise
1599 */
1600
1601static int shutdown(struct socket *sock, int how)
1602{
1603        struct sock *sk = sock->sk;
1604        struct tipc_port *tport = tipc_sk_port(sk);
1605        struct sk_buff *buf;
1606        int res;
1607
1608        if (how != SHUT_RDWR)
1609                return -EINVAL;
1610
1611        lock_sock(sk);
1612
1613        switch (sock->state) {
1614        case SS_CONNECTING:
1615        case SS_CONNECTED:
1616
1617                /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1618restart:
1619                buf = __skb_dequeue(&sk->sk_receive_queue);
1620                if (buf) {
1621                        atomic_dec(&tipc_queue_size);
1622                        if (TIPC_SKB_CB(buf)->handle != 0) {
1623                                buf_discard(buf);
1624                                goto restart;
1625                        }
1626                        tipc_disconnect(tport->ref);
1627                        tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
1628                } else {
1629                        tipc_shutdown(tport->ref);
1630                }
1631
1632                sock->state = SS_DISCONNECTING;
1633
1634                /* fall through */
1635
1636        case SS_DISCONNECTING:
1637
1638                /* Discard any unreceived messages; wake up sleeping tasks */
1639
1640                discard_rx_queue(sk);
1641                if (waitqueue_active(sk_sleep(sk)))
1642                        wake_up_interruptible(sk_sleep(sk));
1643                res = 0;
1644                break;
1645
1646        default:
1647                res = -ENOTCONN;
1648        }
1649
1650        release_sock(sk);
1651        return res;
1652}
1653
1654/**
1655 * setsockopt - set socket option
1656 * @sock: socket structure
1657 * @lvl: option level
1658 * @opt: option identifier
1659 * @ov: pointer to new option value
1660 * @ol: length of option value
1661 *
1662 * For stream sockets only, accepts and ignores all IPPROTO_TCP options
1663 * (to ease compatibility).
1664 *
1665 * Returns 0 on success, errno otherwise
1666 */
1667
1668static int setsockopt(struct socket *sock,
1669                      int lvl, int opt, char __user *ov, unsigned int ol)
1670{
1671        struct sock *sk = sock->sk;
1672        struct tipc_port *tport = tipc_sk_port(sk);
1673        u32 value;
1674        int res;
1675
1676        if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1677                return 0;
1678        if (lvl != SOL_TIPC)
1679                return -ENOPROTOOPT;
1680        if (ol < sizeof(value))
1681                return -EINVAL;
1682        res = get_user(value, (u32 __user *)ov);
1683        if (res)
1684                return res;
1685
1686        lock_sock(sk);
1687
1688        switch (opt) {
1689        case TIPC_IMPORTANCE:
1690                res = tipc_set_portimportance(tport->ref, value);
1691                break;
1692        case TIPC_SRC_DROPPABLE:
1693                if (sock->type != SOCK_STREAM)
1694                        res = tipc_set_portunreliable(tport->ref, value);
1695                else
1696                        res = -ENOPROTOOPT;
1697                break;
1698        case TIPC_DEST_DROPPABLE:
1699                res = tipc_set_portunreturnable(tport->ref, value);
1700                break;
1701        case TIPC_CONN_TIMEOUT:
1702                tipc_sk(sk)->conn_timeout = msecs_to_jiffies(value);
1703                /* no need to set "res", since already 0 at this point */
1704                break;
1705        default:
1706                res = -EINVAL;
1707        }
1708
1709        release_sock(sk);
1710
1711        return res;
1712}
1713
1714/**
1715 * getsockopt - get socket option
1716 * @sock: socket structure
1717 * @lvl: option level
1718 * @opt: option identifier
1719 * @ov: receptacle for option value
1720 * @ol: receptacle for length of option value
1721 *
1722 * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
1723 * (to ease compatibility).
1724 *
1725 * Returns 0 on success, errno otherwise
1726 */
1727
1728static int getsockopt(struct socket *sock,
1729                      int lvl, int opt, char __user *ov, int __user *ol)
1730{
1731        struct sock *sk = sock->sk;
1732        struct tipc_port *tport = tipc_sk_port(sk);
1733        int len;
1734        u32 value;
1735        int res;
1736
1737        if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1738                return put_user(0, ol);
1739        if (lvl != SOL_TIPC)
1740                return -ENOPROTOOPT;
1741        res = get_user(len, ol);
1742        if (res)
1743                return res;
1744
1745        lock_sock(sk);
1746
1747        switch (opt) {
1748        case TIPC_IMPORTANCE:
1749                res = tipc_portimportance(tport->ref, &value);
1750                break;
1751        case TIPC_SRC_DROPPABLE:
1752                res = tipc_portunreliable(tport->ref, &value);
1753                break;
1754        case TIPC_DEST_DROPPABLE:
1755                res = tipc_portunreturnable(tport->ref, &value);
1756                break;
1757        case TIPC_CONN_TIMEOUT:
1758                value = jiffies_to_msecs(tipc_sk(sk)->conn_timeout);
1759                /* no need to set "res", since already 0 at this point */
1760                break;
1761        case TIPC_NODE_RECVQ_DEPTH:
1762                value = (u32)atomic_read(&tipc_queue_size);
1763                break;
1764        case TIPC_SOCK_RECVQ_DEPTH:
1765                value = skb_queue_len(&sk->sk_receive_queue);
1766                break;
1767        default:
1768                res = -EINVAL;
1769        }
1770
1771        release_sock(sk);
1772
1773        if (res)
1774                return res;     /* "get" failed */
1775
1776        if (len < sizeof(value))
1777                return -EINVAL;
1778
1779        if (copy_to_user(ov, &value, sizeof(value)))
1780                return -EFAULT;
1781
1782        return put_user(sizeof(value), ol);
1783}
1784
1785/**
1786 * Protocol switches for the various types of TIPC sockets
1787 */
1788
1789static const struct proto_ops msg_ops = {
1790        .owner          = THIS_MODULE,
1791        .family         = AF_TIPC,
1792        .release        = release,
1793        .bind           = bind,
1794        .connect        = connect,
1795        .socketpair     = sock_no_socketpair,
1796        .accept         = accept,
1797        .getname        = get_name,
1798        .poll           = poll,
1799        .ioctl          = sock_no_ioctl,
1800        .listen         = listen,
1801        .shutdown       = shutdown,
1802        .setsockopt     = setsockopt,
1803        .getsockopt     = getsockopt,
1804        .sendmsg        = send_msg,
1805        .recvmsg        = recv_msg,
1806        .mmap           = sock_no_mmap,
1807        .sendpage       = sock_no_sendpage
1808};
1809
1810static const struct proto_ops packet_ops = {
1811        .owner          = THIS_MODULE,
1812        .family         = AF_TIPC,
1813        .release        = release,
1814        .bind           = bind,
1815        .connect        = connect,
1816        .socketpair     = sock_no_socketpair,
1817        .accept         = accept,
1818        .getname        = get_name,
1819        .poll           = poll,
1820        .ioctl          = sock_no_ioctl,
1821        .listen         = listen,
1822        .shutdown       = shutdown,
1823        .setsockopt     = setsockopt,
1824        .getsockopt     = getsockopt,
1825        .sendmsg        = send_packet,
1826        .recvmsg        = recv_msg,
1827        .mmap           = sock_no_mmap,
1828        .sendpage       = sock_no_sendpage
1829};
1830
1831static const struct proto_ops stream_ops = {
1832        .owner          = THIS_MODULE,
1833        .family         = AF_TIPC,
1834        .release        = release,
1835        .bind           = bind,
1836        .connect        = connect,
1837        .socketpair     = sock_no_socketpair,
1838        .accept         = accept,
1839        .getname        = get_name,
1840        .poll           = poll,
1841        .ioctl          = sock_no_ioctl,
1842        .listen         = listen,
1843        .shutdown       = shutdown,
1844        .setsockopt     = setsockopt,
1845        .getsockopt     = getsockopt,
1846        .sendmsg        = send_stream,
1847        .recvmsg        = recv_stream,
1848        .mmap           = sock_no_mmap,
1849        .sendpage       = sock_no_sendpage
1850};
1851
1852static const struct net_proto_family tipc_family_ops = {
1853        .owner          = THIS_MODULE,
1854        .family         = AF_TIPC,
1855        .create         = tipc_create
1856};
1857
1858static struct proto tipc_proto = {
1859        .name           = "TIPC",
1860        .owner          = THIS_MODULE,
1861        .obj_size       = sizeof(struct tipc_sock)
1862};
1863
1864/**
1865 * tipc_socket_init - initialize TIPC socket interface
1866 *
1867 * Returns 0 on success, errno otherwise
1868 */
1869int tipc_socket_init(void)
1870{
1871        int res;
1872
1873        res = proto_register(&tipc_proto, 1);
1874        if (res) {
1875                err("Failed to register TIPC protocol type\n");
1876                goto out;
1877        }
1878
1879        res = sock_register(&tipc_family_ops);
1880        if (res) {
1881                err("Failed to register TIPC socket type\n");
1882                proto_unregister(&tipc_proto);
1883                goto out;
1884        }
1885
1886        sockets_enabled = 1;
1887 out:
1888        return res;
1889}
1890
1891/**
1892 * tipc_socket_stop - stop TIPC socket interface
1893 */
1894
1895void tipc_socket_stop(void)
1896{
1897        if (!sockets_enabled)
1898                return;
1899
1900        sockets_enabled = 0;
1901        sock_unregister(tipc_family_ops.family);
1902        proto_unregister(&tipc_proto);
1903}
1904
1905