linux/net/tipc/socket.c
<<
>>
Prefs
   1/*
   2 * net/tipc/socket.c: TIPC socket API
   3 *
   4 * Copyright (c) 2001-2007, 2012-2014, Ericsson AB
   5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include "core.h"
  38#include "port.h"
  39#include "node.h"
  40
  41#include <linux/export.h>
  42
  43#define SS_LISTENING    -1      /* socket is listening */
  44#define SS_READY        -2      /* socket is connectionless */
  45
  46#define CONN_TIMEOUT_DEFAULT    8000    /* default connect timeout = 8s */
  47
  48static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb);
  49static void tipc_data_ready(struct sock *sk);
  50static void tipc_write_space(struct sock *sk);
  51static int tipc_release(struct socket *sock);
  52static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
  53
  54static const struct proto_ops packet_ops;
  55static const struct proto_ops stream_ops;
  56static const struct proto_ops msg_ops;
  57
  58static struct proto tipc_proto;
  59static struct proto tipc_proto_kern;
  60
  61/*
  62 * Revised TIPC socket locking policy:
  63 *
  64 * Most socket operations take the standard socket lock when they start
  65 * and hold it until they finish (or until they need to sleep).  Acquiring
  66 * this lock grants the owner exclusive access to the fields of the socket
  67 * data structures, with the exception of the backlog queue.  A few socket
  68 * operations can be done without taking the socket lock because they only
  69 * read socket information that never changes during the life of the socket.
  70 *
  71 * Socket operations may acquire the lock for the associated TIPC port if they
  72 * need to perform an operation on the port.  If any routine needs to acquire
  73 * both the socket lock and the port lock it must take the socket lock first
  74 * to avoid the risk of deadlock.
  75 *
  76 * The dispatcher handling incoming messages cannot grab the socket lock in
  77 * the standard fashion, since invoked it runs at the BH level and cannot block.
  78 * Instead, it checks to see if the socket lock is currently owned by someone,
  79 * and either handles the message itself or adds it to the socket's backlog
  80 * queue; in the latter case the queued message is processed once the process
  81 * owning the socket lock releases it.
  82 *
  83 * NOTE: Releasing the socket lock while an operation is sleeping overcomes
  84 * the problem of a blocked socket operation preventing any other operations
  85 * from occurring.  However, applications must be careful if they have
  86 * multiple threads trying to send (or receive) on the same socket, as these
  87 * operations might interfere with each other.  For example, doing a connect
  88 * and a receive at the same time might allow the receive to consume the
  89 * ACK message meant for the connect.  While additional work could be done
  90 * to try and overcome this, it doesn't seem to be worthwhile at the present.
  91 *
  92 * NOTE: Releasing the socket lock while an operation is sleeping also ensures
  93 * that another operation that must be performed in a non-blocking manner is
  94 * not delayed for very long because the lock has already been taken.
  95 *
  96 * NOTE: This code assumes that certain fields of a port/socket pair are
  97 * constant over its lifetime; such fields can be examined without taking
  98 * the socket lock and/or port lock, and do not need to be re-read even
  99 * after resuming processing after waiting.  These fields include:
 100 *   - socket type
 101 *   - pointer to socket sk structure (aka tipc_sock structure)
 102 *   - pointer to port structure
 103 *   - port reference
 104 */
 105
 106#include "socket.h"
 107
 108/**
 109 * advance_rx_queue - discard first buffer in socket receive queue
 110 *
 111 * Caller must hold socket lock
 112 */
 113static void advance_rx_queue(struct sock *sk)
 114{
 115        kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
 116}
 117
 118/**
 119 * reject_rx_queue - reject all buffers in socket receive queue
 120 *
 121 * Caller must hold socket lock
 122 */
 123static void reject_rx_queue(struct sock *sk)
 124{
 125        struct sk_buff *buf;
 126
 127        while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
 128                tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
 129}
 130
 131/**
 132 * tipc_sk_create - create a TIPC socket
 133 * @net: network namespace (must be default network)
 134 * @sock: pre-allocated socket structure
 135 * @protocol: protocol indicator (must be 0)
 136 * @kern: caused by kernel or by userspace?
 137 *
 138 * This routine creates additional data structures used by the TIPC socket,
 139 * initializes them, and links them together.
 140 *
 141 * Returns 0 on success, errno otherwise
 142 */
 143static int tipc_sk_create(struct net *net, struct socket *sock,
 144                          int protocol, int kern)
 145{
 146        const struct proto_ops *ops;
 147        socket_state state;
 148        struct sock *sk;
 149        struct tipc_sock *tsk;
 150        struct tipc_port *port;
 151        u32 ref;
 152
 153        /* Validate arguments */
 154        if (unlikely(protocol != 0))
 155                return -EPROTONOSUPPORT;
 156
 157        switch (sock->type) {
 158        case SOCK_STREAM:
 159                ops = &stream_ops;
 160                state = SS_UNCONNECTED;
 161                break;
 162        case SOCK_SEQPACKET:
 163                ops = &packet_ops;
 164                state = SS_UNCONNECTED;
 165                break;
 166        case SOCK_DGRAM:
 167        case SOCK_RDM:
 168                ops = &msg_ops;
 169                state = SS_READY;
 170                break;
 171        default:
 172                return -EPROTOTYPE;
 173        }
 174
 175        /* Allocate socket's protocol area */
 176        if (!kern)
 177                sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
 178        else
 179                sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto_kern);
 180
 181        if (sk == NULL)
 182                return -ENOMEM;
 183
 184        tsk = tipc_sk(sk);
 185        port = &tsk->port;
 186
 187        ref = tipc_port_init(port, TIPC_LOW_IMPORTANCE);
 188        if (!ref) {
 189                pr_warn("Socket registration failed, ref. table exhausted\n");
 190                sk_free(sk);
 191                return -ENOMEM;
 192        }
 193
 194        /* Finish initializing socket data structures */
 195        sock->ops = ops;
 196        sock->state = state;
 197
 198        sock_init_data(sock, sk);
 199        sk->sk_backlog_rcv = tipc_backlog_rcv;
 200        sk->sk_rcvbuf = sysctl_tipc_rmem[1];
 201        sk->sk_data_ready = tipc_data_ready;
 202        sk->sk_write_space = tipc_write_space;
 203        tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
 204        atomic_set(&tsk->dupl_rcvcnt, 0);
 205        tipc_port_unlock(port);
 206
 207        if (sock->state == SS_READY) {
 208                tipc_port_set_unreturnable(port, true);
 209                if (sock->type == SOCK_DGRAM)
 210                        tipc_port_set_unreliable(port, true);
 211        }
 212        return 0;
 213}
 214
 215/**
 216 * tipc_sock_create_local - create TIPC socket from inside TIPC module
 217 * @type: socket type - SOCK_RDM or SOCK_SEQPACKET
 218 *
 219 * We cannot use sock_creat_kern here because it bumps module user count.
 220 * Since socket owner and creator is the same module we must make sure
 221 * that module count remains zero for module local sockets, otherwise
 222 * we cannot do rmmod.
 223 *
 224 * Returns 0 on success, errno otherwise
 225 */
 226int tipc_sock_create_local(int type, struct socket **res)
 227{
 228        int rc;
 229
 230        rc = sock_create_lite(AF_TIPC, type, 0, res);
 231        if (rc < 0) {
 232                pr_err("Failed to create kernel socket\n");
 233                return rc;
 234        }
 235        tipc_sk_create(&init_net, *res, 0, 1);
 236
 237        return 0;
 238}
 239
 240/**
 241 * tipc_sock_release_local - release socket created by tipc_sock_create_local
 242 * @sock: the socket to be released.
 243 *
 244 * Module reference count is not incremented when such sockets are created,
 245 * so we must keep it from being decremented when they are released.
 246 */
 247void tipc_sock_release_local(struct socket *sock)
 248{
 249        tipc_release(sock);
 250        sock->ops = NULL;
 251        sock_release(sock);
 252}
 253
 254/**
 255 * tipc_sock_accept_local - accept a connection on a socket created
 256 * with tipc_sock_create_local. Use this function to avoid that
 257 * module reference count is inadvertently incremented.
 258 *
 259 * @sock:    the accepting socket
 260 * @newsock: reference to the new socket to be created
 261 * @flags:   socket flags
 262 */
 263
 264int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
 265                           int flags)
 266{
 267        struct sock *sk = sock->sk;
 268        int ret;
 269
 270        ret = sock_create_lite(sk->sk_family, sk->sk_type,
 271                               sk->sk_protocol, newsock);
 272        if (ret < 0)
 273                return ret;
 274
 275        ret = tipc_accept(sock, *newsock, flags);
 276        if (ret < 0) {
 277                sock_release(*newsock);
 278                return ret;
 279        }
 280        (*newsock)->ops = sock->ops;
 281        return ret;
 282}
 283
 284/**
 285 * tipc_release - destroy a TIPC socket
 286 * @sock: socket to destroy
 287 *
 288 * This routine cleans up any messages that are still queued on the socket.
 289 * For DGRAM and RDM socket types, all queued messages are rejected.
 290 * For SEQPACKET and STREAM socket types, the first message is rejected
 291 * and any others are discarded.  (If the first message on a STREAM socket
 292 * is partially-read, it is discarded and the next one is rejected instead.)
 293 *
 294 * NOTE: Rejected messages are not necessarily returned to the sender!  They
 295 * are returned or discarded according to the "destination droppable" setting
 296 * specified for the message by the sender.
 297 *
 298 * Returns 0 on success, errno otherwise
 299 */
 300static int tipc_release(struct socket *sock)
 301{
 302        struct sock *sk = sock->sk;
 303        struct tipc_sock *tsk;
 304        struct tipc_port *port;
 305        struct sk_buff *buf;
 306
 307        /*
 308         * Exit if socket isn't fully initialized (occurs when a failed accept()
 309         * releases a pre-allocated child socket that was never used)
 310         */
 311        if (sk == NULL)
 312                return 0;
 313
 314        tsk = tipc_sk(sk);
 315        port = &tsk->port;
 316        lock_sock(sk);
 317
 318        /*
 319         * Reject all unreceived messages, except on an active connection
 320         * (which disconnects locally & sends a 'FIN+' to peer)
 321         */
 322        while (sock->state != SS_DISCONNECTING) {
 323                buf = __skb_dequeue(&sk->sk_receive_queue);
 324                if (buf == NULL)
 325                        break;
 326                if (TIPC_SKB_CB(buf)->handle != NULL)
 327                        kfree_skb(buf);
 328                else {
 329                        if ((sock->state == SS_CONNECTING) ||
 330                            (sock->state == SS_CONNECTED)) {
 331                                sock->state = SS_DISCONNECTING;
 332                                tipc_port_disconnect(port->ref);
 333                        }
 334                        tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
 335                }
 336        }
 337
 338        /* Destroy TIPC port; also disconnects an active connection and
 339         * sends a 'FIN-' to peer.
 340         */
 341        tipc_port_destroy(port);
 342
 343        /* Discard any remaining (connection-based) messages in receive queue */
 344        __skb_queue_purge(&sk->sk_receive_queue);
 345
 346        /* Reject any messages that accumulated in backlog queue */
 347        sock->state = SS_DISCONNECTING;
 348        release_sock(sk);
 349
 350        sock_put(sk);
 351        sock->sk = NULL;
 352
 353        return 0;
 354}
 355
 356/**
 357 * tipc_bind - associate or disassocate TIPC name(s) with a socket
 358 * @sock: socket structure
 359 * @uaddr: socket address describing name(s) and desired operation
 360 * @uaddr_len: size of socket address data structure
 361 *
 362 * Name and name sequence binding is indicated using a positive scope value;
 363 * a negative scope value unbinds the specified name.  Specifying no name
 364 * (i.e. a socket address length of 0) unbinds all names from the socket.
 365 *
 366 * Returns 0 on success, errno otherwise
 367 *
 368 * NOTE: This routine doesn't need to take the socket lock since it doesn't
 369 *       access any non-constant socket information.
 370 */
 371static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
 372                     int uaddr_len)
 373{
 374        struct sock *sk = sock->sk;
 375        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
 376        struct tipc_sock *tsk = tipc_sk(sk);
 377        int res = -EINVAL;
 378
 379        lock_sock(sk);
 380        if (unlikely(!uaddr_len)) {
 381                res = tipc_withdraw(&tsk->port, 0, NULL);
 382                goto exit;
 383        }
 384
 385        if (uaddr_len < sizeof(struct sockaddr_tipc)) {
 386                res = -EINVAL;
 387                goto exit;
 388        }
 389        if (addr->family != AF_TIPC) {
 390                res = -EAFNOSUPPORT;
 391                goto exit;
 392        }
 393
 394        if (addr->addrtype == TIPC_ADDR_NAME)
 395                addr->addr.nameseq.upper = addr->addr.nameseq.lower;
 396        else if (addr->addrtype != TIPC_ADDR_NAMESEQ) {
 397                res = -EAFNOSUPPORT;
 398                goto exit;
 399        }
 400
 401        if ((addr->addr.nameseq.type < TIPC_RESERVED_TYPES) &&
 402            (addr->addr.nameseq.type != TIPC_TOP_SRV) &&
 403            (addr->addr.nameseq.type != TIPC_CFG_SRV)) {
 404                res = -EACCES;
 405                goto exit;
 406        }
 407
 408        res = (addr->scope > 0) ?
 409                tipc_publish(&tsk->port, addr->scope, &addr->addr.nameseq) :
 410                tipc_withdraw(&tsk->port, -addr->scope, &addr->addr.nameseq);
 411exit:
 412        release_sock(sk);
 413        return res;
 414}
 415
 416/**
 417 * tipc_getname - get port ID of socket or peer socket
 418 * @sock: socket structure
 419 * @uaddr: area for returned socket address
 420 * @uaddr_len: area for returned length of socket address
 421 * @peer: 0 = own ID, 1 = current peer ID, 2 = current/former peer ID
 422 *
 423 * Returns 0 on success, errno otherwise
 424 *
 425 * NOTE: This routine doesn't need to take the socket lock since it only
 426 *       accesses socket information that is unchanging (or which changes in
 427 *       a completely predictable manner).
 428 */
 429static int tipc_getname(struct socket *sock, struct sockaddr *uaddr,
 430                        int *uaddr_len, int peer)
 431{
 432        struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
 433        struct tipc_sock *tsk = tipc_sk(sock->sk);
 434
 435        memset(addr, 0, sizeof(*addr));
 436        if (peer) {
 437                if ((sock->state != SS_CONNECTED) &&
 438                        ((peer != 2) || (sock->state != SS_DISCONNECTING)))
 439                        return -ENOTCONN;
 440                addr->addr.id.ref = tipc_port_peerport(&tsk->port);
 441                addr->addr.id.node = tipc_port_peernode(&tsk->port);
 442        } else {
 443                addr->addr.id.ref = tsk->port.ref;
 444                addr->addr.id.node = tipc_own_addr;
 445        }
 446
 447        *uaddr_len = sizeof(*addr);
 448        addr->addrtype = TIPC_ADDR_ID;
 449        addr->family = AF_TIPC;
 450        addr->scope = 0;
 451        addr->addr.name.domain = 0;
 452
 453        return 0;
 454}
 455
 456/**
 457 * tipc_poll - read and possibly block on pollmask
 458 * @file: file structure associated with the socket
 459 * @sock: socket for which to calculate the poll bits
 460 * @wait: ???
 461 *
 462 * Returns pollmask value
 463 *
 464 * COMMENTARY:
 465 * It appears that the usual socket locking mechanisms are not useful here
 466 * since the pollmask info is potentially out-of-date the moment this routine
 467 * exits.  TCP and other protocols seem to rely on higher level poll routines
 468 * to handle any preventable race conditions, so TIPC will do the same ...
 469 *
 470 * TIPC sets the returned events as follows:
 471 *
 472 * socket state         flags set
 473 * ------------         ---------
 474 * unconnected          no read flags
 475 *                      POLLOUT if port is not congested
 476 *
 477 * connecting           POLLIN/POLLRDNORM if ACK/NACK in rx queue
 478 *                      no write flags
 479 *
 480 * connected            POLLIN/POLLRDNORM if data in rx queue
 481 *                      POLLOUT if port is not congested
 482 *
 483 * disconnecting        POLLIN/POLLRDNORM/POLLHUP
 484 *                      no write flags
 485 *
 486 * listening            POLLIN if SYN in rx queue
 487 *                      no write flags
 488 *
 489 * ready                POLLIN/POLLRDNORM if data in rx queue
 490 * [connectionless]     POLLOUT (since port cannot be congested)
 491 *
 492 * IMPORTANT: The fact that a read or write operation is indicated does NOT
 493 * imply that the operation will succeed, merely that it should be performed
 494 * and will not block.
 495 */
 496static unsigned int tipc_poll(struct file *file, struct socket *sock,
 497                              poll_table *wait)
 498{
 499        struct sock *sk = sock->sk;
 500        struct tipc_sock *tsk = tipc_sk(sk);
 501        u32 mask = 0;
 502
 503        sock_poll_wait(file, sk_sleep(sk), wait);
 504
 505        switch ((int)sock->state) {
 506        case SS_UNCONNECTED:
 507                if (!tsk->port.congested)
 508                        mask |= POLLOUT;
 509                break;
 510        case SS_READY:
 511        case SS_CONNECTED:
 512                if (!tsk->port.congested)
 513                        mask |= POLLOUT;
 514                /* fall thru' */
 515        case SS_CONNECTING:
 516        case SS_LISTENING:
 517                if (!skb_queue_empty(&sk->sk_receive_queue))
 518                        mask |= (POLLIN | POLLRDNORM);
 519                break;
 520        case SS_DISCONNECTING:
 521                mask = (POLLIN | POLLRDNORM | POLLHUP);
 522                break;
 523        }
 524
 525        return mask;
 526}
 527
 528/**
 529 * dest_name_check - verify user is permitted to send to specified port name
 530 * @dest: destination address
 531 * @m: descriptor for message to be sent
 532 *
 533 * Prevents restricted configuration commands from being issued by
 534 * unauthorized users.
 535 *
 536 * Returns 0 if permission is granted, otherwise errno
 537 */
 538static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
 539{
 540        struct tipc_cfg_msg_hdr hdr;
 541
 542        if (likely(dest->addr.name.name.type >= TIPC_RESERVED_TYPES))
 543                return 0;
 544        if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
 545                return 0;
 546        if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
 547                return -EACCES;
 548
 549        if (!m->msg_iovlen || (m->msg_iov[0].iov_len < sizeof(hdr)))
 550                return -EMSGSIZE;
 551        if (copy_from_user(&hdr, m->msg_iov[0].iov_base, sizeof(hdr)))
 552                return -EFAULT;
 553        if ((ntohs(hdr.tcm_type) & 0xC000) && (!capable(CAP_NET_ADMIN)))
 554                return -EACCES;
 555
 556        return 0;
 557}
 558
 559static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
 560{
 561        struct sock *sk = sock->sk;
 562        struct tipc_sock *tsk = tipc_sk(sk);
 563        DEFINE_WAIT(wait);
 564        int done;
 565
 566        do {
 567                int err = sock_error(sk);
 568                if (err)
 569                        return err;
 570                if (sock->state == SS_DISCONNECTING)
 571                        return -EPIPE;
 572                if (!*timeo_p)
 573                        return -EAGAIN;
 574                if (signal_pending(current))
 575                        return sock_intr_errno(*timeo_p);
 576
 577                prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
 578                done = sk_wait_event(sk, timeo_p, !tsk->port.congested);
 579                finish_wait(sk_sleep(sk), &wait);
 580        } while (!done);
 581        return 0;
 582}
 583
 584
 585/**
 586 * tipc_sendmsg - send message in connectionless manner
 587 * @iocb: if NULL, indicates that socket lock is already held
 588 * @sock: socket structure
 589 * @m: message to send
 590 * @total_len: length of message
 591 *
 592 * Message must have an destination specified explicitly.
 593 * Used for SOCK_RDM and SOCK_DGRAM messages,
 594 * and for 'SYN' messages on SOCK_SEQPACKET and SOCK_STREAM connections.
 595 * (Note: 'SYN+' is prohibited on SOCK_STREAM.)
 596 *
 597 * Returns the number of bytes sent on success, or errno otherwise
 598 */
 599static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
 600                        struct msghdr *m, size_t total_len)
 601{
 602        struct sock *sk = sock->sk;
 603        struct tipc_sock *tsk = tipc_sk(sk);
 604        struct tipc_port *port = &tsk->port;
 605        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
 606        int needs_conn;
 607        long timeo;
 608        int res = -EINVAL;
 609
 610        if (unlikely(!dest))
 611                return -EDESTADDRREQ;
 612        if (unlikely((m->msg_namelen < sizeof(*dest)) ||
 613                     (dest->family != AF_TIPC)))
 614                return -EINVAL;
 615        if (total_len > TIPC_MAX_USER_MSG_SIZE)
 616                return -EMSGSIZE;
 617
 618        if (iocb)
 619                lock_sock(sk);
 620
 621        needs_conn = (sock->state != SS_READY);
 622        if (unlikely(needs_conn)) {
 623                if (sock->state == SS_LISTENING) {
 624                        res = -EPIPE;
 625                        goto exit;
 626                }
 627                if (sock->state != SS_UNCONNECTED) {
 628                        res = -EISCONN;
 629                        goto exit;
 630                }
 631                if (tsk->port.published) {
 632                        res = -EOPNOTSUPP;
 633                        goto exit;
 634                }
 635                if (dest->addrtype == TIPC_ADDR_NAME) {
 636                        tsk->port.conn_type = dest->addr.name.name.type;
 637                        tsk->port.conn_instance = dest->addr.name.name.instance;
 638                }
 639
 640                /* Abort any pending connection attempts (very unlikely) */
 641                reject_rx_queue(sk);
 642        }
 643
 644        timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
 645        do {
 646                if (dest->addrtype == TIPC_ADDR_NAME) {
 647                        res = dest_name_check(dest, m);
 648                        if (res)
 649                                break;
 650                        res = tipc_send2name(port,
 651                                             &dest->addr.name.name,
 652                                             dest->addr.name.domain,
 653                                             m->msg_iov,
 654                                             total_len);
 655                } else if (dest->addrtype == TIPC_ADDR_ID) {
 656                        res = tipc_send2port(port,
 657                                             &dest->addr.id,
 658                                             m->msg_iov,
 659                                             total_len);
 660                } else if (dest->addrtype == TIPC_ADDR_MCAST) {
 661                        if (needs_conn) {
 662                                res = -EOPNOTSUPP;
 663                                break;
 664                        }
 665                        res = dest_name_check(dest, m);
 666                        if (res)
 667                                break;
 668                        res = tipc_port_mcast_xmit(port,
 669                                                   &dest->addr.nameseq,
 670                                                   m->msg_iov,
 671                                                   total_len);
 672                }
 673                if (likely(res != -ELINKCONG)) {
 674                        if (needs_conn && (res >= 0))
 675                                sock->state = SS_CONNECTING;
 676                        break;
 677                }
 678                res = tipc_wait_for_sndmsg(sock, &timeo);
 679                if (res)
 680                        break;
 681        } while (1);
 682
 683exit:
 684        if (iocb)
 685                release_sock(sk);
 686        return res;
 687}
 688
 689static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
 690{
 691        struct sock *sk = sock->sk;
 692        struct tipc_sock *tsk = tipc_sk(sk);
 693        struct tipc_port *port = &tsk->port;
 694        DEFINE_WAIT(wait);
 695        int done;
 696
 697        do {
 698                int err = sock_error(sk);
 699                if (err)
 700                        return err;
 701                if (sock->state == SS_DISCONNECTING)
 702                        return -EPIPE;
 703                else if (sock->state != SS_CONNECTED)
 704                        return -ENOTCONN;
 705                if (!*timeo_p)
 706                        return -EAGAIN;
 707                if (signal_pending(current))
 708                        return sock_intr_errno(*timeo_p);
 709
 710                prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
 711                done = sk_wait_event(sk, timeo_p,
 712                                     (!port->congested || !port->connected));
 713                finish_wait(sk_sleep(sk), &wait);
 714        } while (!done);
 715        return 0;
 716}
 717
 718/**
 719 * tipc_send_packet - send a connection-oriented message
 720 * @iocb: if NULL, indicates that socket lock is already held
 721 * @sock: socket structure
 722 * @m: message to send
 723 * @total_len: length of message
 724 *
 725 * Used for SOCK_SEQPACKET messages and SOCK_STREAM data.
 726 *
 727 * Returns the number of bytes sent on success, or errno otherwise
 728 */
 729static int tipc_send_packet(struct kiocb *iocb, struct socket *sock,
 730                            struct msghdr *m, size_t total_len)
 731{
 732        struct sock *sk = sock->sk;
 733        struct tipc_sock *tsk = tipc_sk(sk);
 734        DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
 735        int res = -EINVAL;
 736        long timeo;
 737
 738        /* Handle implied connection establishment */
 739        if (unlikely(dest))
 740                return tipc_sendmsg(iocb, sock, m, total_len);
 741
 742        if (total_len > TIPC_MAX_USER_MSG_SIZE)
 743                return -EMSGSIZE;
 744
 745        if (iocb)
 746                lock_sock(sk);
 747
 748        if (unlikely(sock->state != SS_CONNECTED)) {
 749                if (sock->state == SS_DISCONNECTING)
 750                        res = -EPIPE;
 751                else
 752                        res = -ENOTCONN;
 753                goto exit;
 754        }
 755
 756        timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
 757        do {
 758                res = tipc_send(&tsk->port, m->msg_iov, total_len);
 759                if (likely(res != -ELINKCONG))
 760                        break;
 761                res = tipc_wait_for_sndpkt(sock, &timeo);
 762                if (res)
 763                        break;
 764        } while (1);
 765exit:
 766        if (iocb)
 767                release_sock(sk);
 768        return res;
 769}
 770
 771/**
 772 * tipc_send_stream - send stream-oriented data
 773 * @iocb: (unused)
 774 * @sock: socket structure
 775 * @m: data to send
 776 * @total_len: total length of data to be sent
 777 *
 778 * Used for SOCK_STREAM data.
 779 *
 780 * Returns the number of bytes sent on success (or partial success),
 781 * or errno if no data sent
 782 */
 783static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
 784                            struct msghdr *m, size_t total_len)
 785{
 786        struct sock *sk = sock->sk;
 787        struct tipc_sock *tsk = tipc_sk(sk);
 788        struct msghdr my_msg;
 789        struct iovec my_iov;
 790        struct iovec *curr_iov;
 791        int curr_iovlen;
 792        char __user *curr_start;
 793        u32 hdr_size;
 794        int curr_left;
 795        int bytes_to_send;
 796        int bytes_sent;
 797        int res;
 798
 799        lock_sock(sk);
 800
 801        /* Handle special cases where there is no connection */
 802        if (unlikely(sock->state != SS_CONNECTED)) {
 803                if (sock->state == SS_UNCONNECTED)
 804                        res = tipc_send_packet(NULL, sock, m, total_len);
 805                else
 806                        res = sock->state == SS_DISCONNECTING ? -EPIPE : -ENOTCONN;
 807                goto exit;
 808        }
 809
 810        if (unlikely(m->msg_name)) {
 811                res = -EISCONN;
 812                goto exit;
 813        }
 814
 815        if (total_len > (unsigned int)INT_MAX) {
 816                res = -EMSGSIZE;
 817                goto exit;
 818        }
 819
 820        /*
 821         * Send each iovec entry using one or more messages
 822         *
 823         * Note: This algorithm is good for the most likely case
 824         * (i.e. one large iovec entry), but could be improved to pass sets
 825         * of small iovec entries into send_packet().
 826         */
 827        curr_iov = m->msg_iov;
 828        curr_iovlen = m->msg_iovlen;
 829        my_msg.msg_iov = &my_iov;
 830        my_msg.msg_iovlen = 1;
 831        my_msg.msg_flags = m->msg_flags;
 832        my_msg.msg_name = NULL;
 833        bytes_sent = 0;
 834
 835        hdr_size = msg_hdr_sz(&tsk->port.phdr);
 836
 837        while (curr_iovlen--) {
 838                curr_start = curr_iov->iov_base;
 839                curr_left = curr_iov->iov_len;
 840
 841                while (curr_left) {
 842                        bytes_to_send = tsk->port.max_pkt - hdr_size;
 843                        if (bytes_to_send > TIPC_MAX_USER_MSG_SIZE)
 844                                bytes_to_send = TIPC_MAX_USER_MSG_SIZE;
 845                        if (curr_left < bytes_to_send)
 846                                bytes_to_send = curr_left;
 847                        my_iov.iov_base = curr_start;
 848                        my_iov.iov_len = bytes_to_send;
 849                        res = tipc_send_packet(NULL, sock, &my_msg,
 850                                               bytes_to_send);
 851                        if (res < 0) {
 852                                if (bytes_sent)
 853                                        res = bytes_sent;
 854                                goto exit;
 855                        }
 856                        curr_left -= bytes_to_send;
 857                        curr_start += bytes_to_send;
 858                        bytes_sent += bytes_to_send;
 859                }
 860
 861                curr_iov++;
 862        }
 863        res = bytes_sent;
 864exit:
 865        release_sock(sk);
 866        return res;
 867}
 868
 869/**
 870 * auto_connect - complete connection setup to a remote port
 871 * @tsk: tipc socket structure
 872 * @msg: peer's response message
 873 *
 874 * Returns 0 on success, errno otherwise
 875 */
 876static int auto_connect(struct tipc_sock *tsk, struct tipc_msg *msg)
 877{
 878        struct tipc_port *port = &tsk->port;
 879        struct socket *sock = tsk->sk.sk_socket;
 880        struct tipc_portid peer;
 881
 882        peer.ref = msg_origport(msg);
 883        peer.node = msg_orignode(msg);
 884
 885        __tipc_port_connect(port->ref, port, &peer);
 886
 887        if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)
 888                return -EINVAL;
 889        msg_set_importance(&port->phdr, (u32)msg_importance(msg));
 890        sock->state = SS_CONNECTED;
 891        return 0;
 892}
 893
 894/**
 895 * set_orig_addr - capture sender's address for received message
 896 * @m: descriptor for message info
 897 * @msg: received message header
 898 *
 899 * Note: Address is not captured if not requested by receiver.
 900 */
 901static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
 902{
 903        DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name);
 904
 905        if (addr) {
 906                addr->family = AF_TIPC;
 907                addr->addrtype = TIPC_ADDR_ID;
 908                memset(&addr->addr, 0, sizeof(addr->addr));
 909                addr->addr.id.ref = msg_origport(msg);
 910                addr->addr.id.node = msg_orignode(msg);
 911                addr->addr.name.domain = 0;     /* could leave uninitialized */
 912                addr->scope = 0;                /* could leave uninitialized */
 913                m->msg_namelen = sizeof(struct sockaddr_tipc);
 914        }
 915}
 916
 917/**
 918 * anc_data_recv - optionally capture ancillary data for received message
 919 * @m: descriptor for message info
 920 * @msg: received message header
 921 * @tport: TIPC port associated with message
 922 *
 923 * Note: Ancillary data is not captured if not requested by receiver.
 924 *
 925 * Returns 0 if successful, otherwise errno
 926 */
 927static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
 928                         struct tipc_port *tport)
 929{
 930        u32 anc_data[3];
 931        u32 err;
 932        u32 dest_type;
 933        int has_name;
 934        int res;
 935
 936        if (likely(m->msg_controllen == 0))
 937                return 0;
 938
 939        /* Optionally capture errored message object(s) */
 940        err = msg ? msg_errcode(msg) : 0;
 941        if (unlikely(err)) {
 942                anc_data[0] = err;
 943                anc_data[1] = msg_data_sz(msg);
 944                res = put_cmsg(m, SOL_TIPC, TIPC_ERRINFO, 8, anc_data);
 945                if (res)
 946                        return res;
 947                if (anc_data[1]) {
 948                        res = put_cmsg(m, SOL_TIPC, TIPC_RETDATA, anc_data[1],
 949                                       msg_data(msg));
 950                        if (res)
 951                                return res;
 952                }
 953        }
 954
 955        /* Optionally capture message destination object */
 956        dest_type = msg ? msg_type(msg) : TIPC_DIRECT_MSG;
 957        switch (dest_type) {
 958        case TIPC_NAMED_MSG:
 959                has_name = 1;
 960                anc_data[0] = msg_nametype(msg);
 961                anc_data[1] = msg_namelower(msg);
 962                anc_data[2] = msg_namelower(msg);
 963                break;
 964        case TIPC_MCAST_MSG:
 965                has_name = 1;
 966                anc_data[0] = msg_nametype(msg);
 967                anc_data[1] = msg_namelower(msg);
 968                anc_data[2] = msg_nameupper(msg);
 969                break;
 970        case TIPC_CONN_MSG:
 971                has_name = (tport->conn_type != 0);
 972                anc_data[0] = tport->conn_type;
 973                anc_data[1] = tport->conn_instance;
 974                anc_data[2] = tport->conn_instance;
 975                break;
 976        default:
 977                has_name = 0;
 978        }
 979        if (has_name) {
 980                res = put_cmsg(m, SOL_TIPC, TIPC_DESTNAME, 12, anc_data);
 981                if (res)
 982                        return res;
 983        }
 984
 985        return 0;
 986}
 987
 988static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
 989{
 990        struct sock *sk = sock->sk;
 991        DEFINE_WAIT(wait);
 992        long timeo = *timeop;
 993        int err;
 994
 995        for (;;) {
 996                prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
 997                if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
 998                        if (sock->state == SS_DISCONNECTING) {
 999                                err = -ENOTCONN;
1000                                break;
1001                        }
1002                        release_sock(sk);
1003                        timeo = schedule_timeout(timeo);
1004                        lock_sock(sk);
1005                }
1006                err = 0;
1007                if (!skb_queue_empty(&sk->sk_receive_queue))
1008                        break;
1009                err = sock_intr_errno(timeo);
1010                if (signal_pending(current))
1011                        break;
1012                err = -EAGAIN;
1013                if (!timeo)
1014                        break;
1015        }
1016        finish_wait(sk_sleep(sk), &wait);
1017        *timeop = timeo;
1018        return err;
1019}
1020
1021/**
1022 * tipc_recvmsg - receive packet-oriented message
1023 * @iocb: (unused)
1024 * @m: descriptor for message info
1025 * @buf_len: total size of user buffer area
1026 * @flags: receive flags
1027 *
1028 * Used for SOCK_DGRAM, SOCK_RDM, and SOCK_SEQPACKET messages.
1029 * If the complete message doesn't fit in user area, truncate it.
1030 *
1031 * Returns size of returned message data, errno otherwise
1032 */
1033static int tipc_recvmsg(struct kiocb *iocb, struct socket *sock,
1034                        struct msghdr *m, size_t buf_len, int flags)
1035{
1036        struct sock *sk = sock->sk;
1037        struct tipc_sock *tsk = tipc_sk(sk);
1038        struct tipc_port *port = &tsk->port;
1039        struct sk_buff *buf;
1040        struct tipc_msg *msg;
1041        long timeo;
1042        unsigned int sz;
1043        u32 err;
1044        int res;
1045
1046        /* Catch invalid receive requests */
1047        if (unlikely(!buf_len))
1048                return -EINVAL;
1049
1050        lock_sock(sk);
1051
1052        if (unlikely(sock->state == SS_UNCONNECTED)) {
1053                res = -ENOTCONN;
1054                goto exit;
1055        }
1056
1057        timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1058restart:
1059
1060        /* Look for a message in receive queue; wait if necessary */
1061        res = tipc_wait_for_rcvmsg(sock, &timeo);
1062        if (res)
1063                goto exit;
1064
1065        /* Look at first message in receive queue */
1066        buf = skb_peek(&sk->sk_receive_queue);
1067        msg = buf_msg(buf);
1068        sz = msg_data_sz(msg);
1069        err = msg_errcode(msg);
1070
1071        /* Discard an empty non-errored message & try again */
1072        if ((!sz) && (!err)) {
1073                advance_rx_queue(sk);
1074                goto restart;
1075        }
1076
1077        /* Capture sender's address (optional) */
1078        set_orig_addr(m, msg);
1079
1080        /* Capture ancillary data (optional) */
1081        res = anc_data_recv(m, msg, port);
1082        if (res)
1083                goto exit;
1084
1085        /* Capture message data (if valid) & compute return value (always) */
1086        if (!err) {
1087                if (unlikely(buf_len < sz)) {
1088                        sz = buf_len;
1089                        m->msg_flags |= MSG_TRUNC;
1090                }
1091                res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg),
1092                                              m->msg_iov, sz);
1093                if (res)
1094                        goto exit;
1095                res = sz;
1096        } else {
1097                if ((sock->state == SS_READY) ||
1098                    ((err == TIPC_CONN_SHUTDOWN) || m->msg_control))
1099                        res = 0;
1100                else
1101                        res = -ECONNRESET;
1102        }
1103
1104        /* Consume received message (optional) */
1105        if (likely(!(flags & MSG_PEEK))) {
1106                if ((sock->state != SS_READY) &&
1107                    (++port->conn_unacked >= TIPC_CONNACK_INTV))
1108                        tipc_acknowledge(port->ref, port->conn_unacked);
1109                advance_rx_queue(sk);
1110        }
1111exit:
1112        release_sock(sk);
1113        return res;
1114}
1115
1116/**
1117 * tipc_recv_stream - receive stream-oriented data
1118 * @iocb: (unused)
1119 * @m: descriptor for message info
1120 * @buf_len: total size of user buffer area
1121 * @flags: receive flags
1122 *
1123 * Used for SOCK_STREAM messages only.  If not enough data is available
1124 * will optionally wait for more; never truncates data.
1125 *
1126 * Returns size of returned message data, errno otherwise
1127 */
1128static int tipc_recv_stream(struct kiocb *iocb, struct socket *sock,
1129                            struct msghdr *m, size_t buf_len, int flags)
1130{
1131        struct sock *sk = sock->sk;
1132        struct tipc_sock *tsk = tipc_sk(sk);
1133        struct tipc_port *port = &tsk->port;
1134        struct sk_buff *buf;
1135        struct tipc_msg *msg;
1136        long timeo;
1137        unsigned int sz;
1138        int sz_to_copy, target, needed;
1139        int sz_copied = 0;
1140        u32 err;
1141        int res = 0;
1142
1143        /* Catch invalid receive attempts */
1144        if (unlikely(!buf_len))
1145                return -EINVAL;
1146
1147        lock_sock(sk);
1148
1149        if (unlikely(sock->state == SS_UNCONNECTED)) {
1150                res = -ENOTCONN;
1151                goto exit;
1152        }
1153
1154        target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1155        timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1156
1157restart:
1158        /* Look for a message in receive queue; wait if necessary */
1159        res = tipc_wait_for_rcvmsg(sock, &timeo);
1160        if (res)
1161                goto exit;
1162
1163        /* Look at first message in receive queue */
1164        buf = skb_peek(&sk->sk_receive_queue);
1165        msg = buf_msg(buf);
1166        sz = msg_data_sz(msg);
1167        err = msg_errcode(msg);
1168
1169        /* Discard an empty non-errored message & try again */
1170        if ((!sz) && (!err)) {
1171                advance_rx_queue(sk);
1172                goto restart;
1173        }
1174
1175        /* Optionally capture sender's address & ancillary data of first msg */
1176        if (sz_copied == 0) {
1177                set_orig_addr(m, msg);
1178                res = anc_data_recv(m, msg, port);
1179                if (res)
1180                        goto exit;
1181        }
1182
1183        /* Capture message data (if valid) & compute return value (always) */
1184        if (!err) {
1185                u32 offset = (u32)(unsigned long)(TIPC_SKB_CB(buf)->handle);
1186
1187                sz -= offset;
1188                needed = (buf_len - sz_copied);
1189                sz_to_copy = (sz <= needed) ? sz : needed;
1190
1191                res = skb_copy_datagram_iovec(buf, msg_hdr_sz(msg) + offset,
1192                                              m->msg_iov, sz_to_copy);
1193                if (res)
1194                        goto exit;
1195
1196                sz_copied += sz_to_copy;
1197
1198                if (sz_to_copy < sz) {
1199                        if (!(flags & MSG_PEEK))
1200                                TIPC_SKB_CB(buf)->handle =
1201                                (void *)(unsigned long)(offset + sz_to_copy);
1202                        goto exit;
1203                }
1204        } else {
1205                if (sz_copied != 0)
1206                        goto exit; /* can't add error msg to valid data */
1207
1208                if ((err == TIPC_CONN_SHUTDOWN) || m->msg_control)
1209                        res = 0;
1210                else
1211                        res = -ECONNRESET;
1212        }
1213
1214        /* Consume received message (optional) */
1215        if (likely(!(flags & MSG_PEEK))) {
1216                if (unlikely(++port->conn_unacked >= TIPC_CONNACK_INTV))
1217                        tipc_acknowledge(port->ref, port->conn_unacked);
1218                advance_rx_queue(sk);
1219        }
1220
1221        /* Loop around if more data is required */
1222        if ((sz_copied < buf_len) &&    /* didn't get all requested data */
1223            (!skb_queue_empty(&sk->sk_receive_queue) ||
1224            (sz_copied < target)) &&    /* and more is ready or required */
1225            (!(flags & MSG_PEEK)) &&    /* and aren't just peeking at data */
1226            (!err))                     /* and haven't reached a FIN */
1227                goto restart;
1228
1229exit:
1230        release_sock(sk);
1231        return sz_copied ? sz_copied : res;
1232}
1233
1234/**
1235 * tipc_write_space - wake up thread if port congestion is released
1236 * @sk: socket
1237 */
1238static void tipc_write_space(struct sock *sk)
1239{
1240        struct socket_wq *wq;
1241
1242        rcu_read_lock();
1243        wq = rcu_dereference(sk->sk_wq);
1244        if (wq_has_sleeper(wq))
1245                wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1246                                                POLLWRNORM | POLLWRBAND);
1247        rcu_read_unlock();
1248}
1249
1250/**
1251 * tipc_data_ready - wake up threads to indicate messages have been received
1252 * @sk: socket
1253 * @len: the length of messages
1254 */
1255static void tipc_data_ready(struct sock *sk)
1256{
1257        struct socket_wq *wq;
1258
1259        rcu_read_lock();
1260        wq = rcu_dereference(sk->sk_wq);
1261        if (wq_has_sleeper(wq))
1262                wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1263                                                POLLRDNORM | POLLRDBAND);
1264        rcu_read_unlock();
1265}
1266
1267/**
1268 * filter_connect - Handle all incoming messages for a connection-based socket
1269 * @tsk: TIPC socket
1270 * @msg: message
1271 *
1272 * Returns TIPC error status code and socket error status code
1273 * once it encounters some errors
1274 */
1275static u32 filter_connect(struct tipc_sock *tsk, struct sk_buff **buf)
1276{
1277        struct sock *sk = &tsk->sk;
1278        struct tipc_port *port = &tsk->port;
1279        struct socket *sock = sk->sk_socket;
1280        struct tipc_msg *msg = buf_msg(*buf);
1281
1282        u32 retval = TIPC_ERR_NO_PORT;
1283        int res;
1284
1285        if (msg_mcast(msg))
1286                return retval;
1287
1288        switch ((int)sock->state) {
1289        case SS_CONNECTED:
1290                /* Accept only connection-based messages sent by peer */
1291                if (msg_connected(msg) && tipc_port_peer_msg(port, msg)) {
1292                        if (unlikely(msg_errcode(msg))) {
1293                                sock->state = SS_DISCONNECTING;
1294                                __tipc_port_disconnect(port);
1295                        }
1296                        retval = TIPC_OK;
1297                }
1298                break;
1299        case SS_CONNECTING:
1300                /* Accept only ACK or NACK message */
1301                if (unlikely(msg_errcode(msg))) {
1302                        sock->state = SS_DISCONNECTING;
1303                        sk->sk_err = ECONNREFUSED;
1304                        retval = TIPC_OK;
1305                        break;
1306                }
1307
1308                if (unlikely(!msg_connected(msg)))
1309                        break;
1310
1311                res = auto_connect(tsk, msg);
1312                if (res) {
1313                        sock->state = SS_DISCONNECTING;
1314                        sk->sk_err = -res;
1315                        retval = TIPC_OK;
1316                        break;
1317                }
1318
1319                /* If an incoming message is an 'ACK-', it should be
1320                 * discarded here because it doesn't contain useful
1321                 * data. In addition, we should try to wake up
1322                 * connect() routine if sleeping.
1323                 */
1324                if (msg_data_sz(msg) == 0) {
1325                        kfree_skb(*buf);
1326                        *buf = NULL;
1327                        if (waitqueue_active(sk_sleep(sk)))
1328                                wake_up_interruptible(sk_sleep(sk));
1329                }
1330                retval = TIPC_OK;
1331                break;
1332        case SS_LISTENING:
1333        case SS_UNCONNECTED:
1334                /* Accept only SYN message */
1335                if (!msg_connected(msg) && !(msg_errcode(msg)))
1336                        retval = TIPC_OK;
1337                break;
1338        case SS_DISCONNECTING:
1339                break;
1340        default:
1341                pr_err("Unknown socket state %u\n", sock->state);
1342        }
1343        return retval;
1344}
1345
1346/**
1347 * rcvbuf_limit - get proper overload limit of socket receive queue
1348 * @sk: socket
1349 * @buf: message
1350 *
1351 * For all connection oriented messages, irrespective of importance,
1352 * the default overload value (i.e. 67MB) is set as limit.
1353 *
1354 * For all connectionless messages, by default new queue limits are
1355 * as belows:
1356 *
1357 * TIPC_LOW_IMPORTANCE       (4 MB)
1358 * TIPC_MEDIUM_IMPORTANCE    (8 MB)
1359 * TIPC_HIGH_IMPORTANCE      (16 MB)
1360 * TIPC_CRITICAL_IMPORTANCE  (32 MB)
1361 *
1362 * Returns overload limit according to corresponding message importance
1363 */
1364static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1365{
1366        struct tipc_msg *msg = buf_msg(buf);
1367
1368        if (msg_connected(msg))
1369                return sysctl_tipc_rmem[2];
1370
1371        return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE <<
1372                msg_importance(msg);
1373}
1374
1375/**
1376 * filter_rcv - validate incoming message
1377 * @sk: socket
1378 * @buf: message
1379 *
1380 * Enqueues message on receive queue if acceptable; optionally handles
1381 * disconnect indication for a connected socket.
1382 *
1383 * Called with socket lock already taken; port lock may also be taken.
1384 *
1385 * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1386 */
1387static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1388{
1389        struct socket *sock = sk->sk_socket;
1390        struct tipc_sock *tsk = tipc_sk(sk);
1391        struct tipc_msg *msg = buf_msg(buf);
1392        unsigned int limit = rcvbuf_limit(sk, buf);
1393        u32 res = TIPC_OK;
1394
1395        /* Reject message if it is wrong sort of message for socket */
1396        if (msg_type(msg) > TIPC_DIRECT_MSG)
1397                return TIPC_ERR_NO_PORT;
1398
1399        if (sock->state == SS_READY) {
1400                if (msg_connected(msg))
1401                        return TIPC_ERR_NO_PORT;
1402        } else {
1403                res = filter_connect(tsk, &buf);
1404                if (res != TIPC_OK || buf == NULL)
1405                        return res;
1406        }
1407
1408        /* Reject message if there isn't room to queue it */
1409        if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1410                return TIPC_ERR_OVERLOAD;
1411
1412        /* Enqueue message */
1413        TIPC_SKB_CB(buf)->handle = NULL;
1414        __skb_queue_tail(&sk->sk_receive_queue, buf);
1415        skb_set_owner_r(buf, sk);
1416
1417        sk->sk_data_ready(sk);
1418        return TIPC_OK;
1419}
1420
1421/**
1422 * tipc_backlog_rcv - handle incoming message from backlog queue
1423 * @sk: socket
1424 * @buf: message
1425 *
1426 * Caller must hold socket lock, but not port lock.
1427 *
1428 * Returns 0
1429 */
1430static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
1431{
1432        u32 res;
1433        struct tipc_sock *tsk = tipc_sk(sk);
1434        uint truesize = buf->truesize;
1435
1436        res = filter_rcv(sk, buf);
1437        if (unlikely(res))
1438                tipc_reject_msg(buf, res);
1439
1440        if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
1441                atomic_add(truesize, &tsk->dupl_rcvcnt);
1442
1443        return 0;
1444}
1445
1446/**
1447 * tipc_sk_rcv - handle incoming message
1448 * @buf: buffer containing arriving message
1449 * Consumes buffer
1450 * Returns 0 if success, or errno: -EHOSTUNREACH
1451 */
1452int tipc_sk_rcv(struct sk_buff *buf)
1453{
1454        struct tipc_sock *tsk;
1455        struct tipc_port *port;
1456        struct sock *sk;
1457        u32 dport = msg_destport(buf_msg(buf));
1458        int err = TIPC_OK;
1459        uint limit;
1460
1461        /* Forward unresolved named message */
1462        if (unlikely(!dport)) {
1463                tipc_net_route_msg(buf);
1464                return 0;
1465        }
1466
1467        /* Validate destination */
1468        port = tipc_port_lock(dport);
1469        if (unlikely(!port)) {
1470                err = TIPC_ERR_NO_PORT;
1471                goto exit;
1472        }
1473
1474        tsk = tipc_port_to_sock(port);
1475        sk = &tsk->sk;
1476
1477        /* Queue message */
1478        bh_lock_sock(sk);
1479
1480        if (!sock_owned_by_user(sk)) {
1481                err = filter_rcv(sk, buf);
1482        } else {
1483                if (sk->sk_backlog.len == 0)
1484                        atomic_set(&tsk->dupl_rcvcnt, 0);
1485                limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt);
1486                if (sk_add_backlog(sk, buf, limit))
1487                        err = TIPC_ERR_OVERLOAD;
1488        }
1489
1490        bh_unlock_sock(sk);
1491        tipc_port_unlock(port);
1492
1493        if (likely(!err))
1494                return 0;
1495exit:
1496        tipc_reject_msg(buf, err);
1497        return -EHOSTUNREACH;
1498}
1499
1500static int tipc_wait_for_connect(struct socket *sock, long *timeo_p)
1501{
1502        struct sock *sk = sock->sk;
1503        DEFINE_WAIT(wait);
1504        int done;
1505
1506        do {
1507                int err = sock_error(sk);
1508                if (err)
1509                        return err;
1510                if (!*timeo_p)
1511                        return -ETIMEDOUT;
1512                if (signal_pending(current))
1513                        return sock_intr_errno(*timeo_p);
1514
1515                prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
1516                done = sk_wait_event(sk, timeo_p, sock->state != SS_CONNECTING);
1517                finish_wait(sk_sleep(sk), &wait);
1518        } while (!done);
1519        return 0;
1520}
1521
1522/**
1523 * tipc_connect - establish a connection to another TIPC port
1524 * @sock: socket structure
1525 * @dest: socket address for destination port
1526 * @destlen: size of socket address data structure
1527 * @flags: file-related flags associated with socket
1528 *
1529 * Returns 0 on success, errno otherwise
1530 */
1531static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1532                        int destlen, int flags)
1533{
1534        struct sock *sk = sock->sk;
1535        struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1536        struct msghdr m = {NULL,};
1537        long timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1538        socket_state previous;
1539        int res;
1540
1541        lock_sock(sk);
1542
1543        /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1544        if (sock->state == SS_READY) {
1545                res = -EOPNOTSUPP;
1546                goto exit;
1547        }
1548
1549        /*
1550         * Reject connection attempt using multicast address
1551         *
1552         * Note: send_msg() validates the rest of the address fields,
1553         *       so there's no need to do it here
1554         */
1555        if (dst->addrtype == TIPC_ADDR_MCAST) {
1556                res = -EINVAL;
1557                goto exit;
1558        }
1559
1560        previous = sock->state;
1561        switch (sock->state) {
1562        case SS_UNCONNECTED:
1563                /* Send a 'SYN-' to destination */
1564                m.msg_name = dest;
1565                m.msg_namelen = destlen;
1566
1567                /* If connect is in non-blocking case, set MSG_DONTWAIT to
1568                 * indicate send_msg() is never blocked.
1569                 */
1570                if (!timeout)
1571                        m.msg_flags = MSG_DONTWAIT;
1572
1573                res = tipc_sendmsg(NULL, sock, &m, 0);
1574                if ((res < 0) && (res != -EWOULDBLOCK))
1575                        goto exit;
1576
1577                /* Just entered SS_CONNECTING state; the only
1578                 * difference is that return value in non-blocking
1579                 * case is EINPROGRESS, rather than EALREADY.
1580                 */
1581                res = -EINPROGRESS;
1582        case SS_CONNECTING:
1583                if (previous == SS_CONNECTING)
1584                        res = -EALREADY;
1585                if (!timeout)
1586                        goto exit;
1587                timeout = msecs_to_jiffies(timeout);
1588                /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1589                res = tipc_wait_for_connect(sock, &timeout);
1590                break;
1591        case SS_CONNECTED:
1592                res = -EISCONN;
1593                break;
1594        default:
1595                res = -EINVAL;
1596                break;
1597        }
1598exit:
1599        release_sock(sk);
1600        return res;
1601}
1602
1603/**
1604 * tipc_listen - allow socket to listen for incoming connections
1605 * @sock: socket structure
1606 * @len: (unused)
1607 *
1608 * Returns 0 on success, errno otherwise
1609 */
1610static int tipc_listen(struct socket *sock, int len)
1611{
1612        struct sock *sk = sock->sk;
1613        int res;
1614
1615        lock_sock(sk);
1616
1617        if (sock->state != SS_UNCONNECTED)
1618                res = -EINVAL;
1619        else {
1620                sock->state = SS_LISTENING;
1621                res = 0;
1622        }
1623
1624        release_sock(sk);
1625        return res;
1626}
1627
1628static int tipc_wait_for_accept(struct socket *sock, long timeo)
1629{
1630        struct sock *sk = sock->sk;
1631        DEFINE_WAIT(wait);
1632        int err;
1633
1634        /* True wake-one mechanism for incoming connections: only
1635         * one process gets woken up, not the 'whole herd'.
1636         * Since we do not 'race & poll' for established sockets
1637         * anymore, the common case will execute the loop only once.
1638        */
1639        for (;;) {
1640                prepare_to_wait_exclusive(sk_sleep(sk), &wait,
1641                                          TASK_INTERRUPTIBLE);
1642                if (timeo && skb_queue_empty(&sk->sk_receive_queue)) {
1643                        release_sock(sk);
1644                        timeo = schedule_timeout(timeo);
1645                        lock_sock(sk);
1646                }
1647                err = 0;
1648                if (!skb_queue_empty(&sk->sk_receive_queue))
1649                        break;
1650                err = -EINVAL;
1651                if (sock->state != SS_LISTENING)
1652                        break;
1653                err = sock_intr_errno(timeo);
1654                if (signal_pending(current))
1655                        break;
1656                err = -EAGAIN;
1657                if (!timeo)
1658                        break;
1659        }
1660        finish_wait(sk_sleep(sk), &wait);
1661        return err;
1662}
1663
1664/**
1665 * tipc_accept - wait for connection request
1666 * @sock: listening socket
1667 * @newsock: new socket that is to be connected
1668 * @flags: file-related flags associated with socket
1669 *
1670 * Returns 0 on success, errno otherwise
1671 */
1672static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
1673{
1674        struct sock *new_sk, *sk = sock->sk;
1675        struct sk_buff *buf;
1676        struct tipc_port *new_port;
1677        struct tipc_msg *msg;
1678        struct tipc_portid peer;
1679        u32 new_ref;
1680        long timeo;
1681        int res;
1682
1683        lock_sock(sk);
1684
1685        if (sock->state != SS_LISTENING) {
1686                res = -EINVAL;
1687                goto exit;
1688        }
1689        timeo = sock_rcvtimeo(sk, flags & O_NONBLOCK);
1690        res = tipc_wait_for_accept(sock, timeo);
1691        if (res)
1692                goto exit;
1693
1694        buf = skb_peek(&sk->sk_receive_queue);
1695
1696        res = tipc_sk_create(sock_net(sock->sk), new_sock, 0, 1);
1697        if (res)
1698                goto exit;
1699
1700        new_sk = new_sock->sk;
1701        new_port = &tipc_sk(new_sk)->port;
1702        new_ref = new_port->ref;
1703        msg = buf_msg(buf);
1704
1705        /* we lock on new_sk; but lockdep sees the lock on sk */
1706        lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
1707
1708        /*
1709         * Reject any stray messages received by new socket
1710         * before the socket lock was taken (very, very unlikely)
1711         */
1712        reject_rx_queue(new_sk);
1713
1714        /* Connect new socket to it's peer */
1715        peer.ref = msg_origport(msg);
1716        peer.node = msg_orignode(msg);
1717        tipc_port_connect(new_ref, &peer);
1718        new_sock->state = SS_CONNECTED;
1719
1720        tipc_port_set_importance(new_port, msg_importance(msg));
1721        if (msg_named(msg)) {
1722                new_port->conn_type = msg_nametype(msg);
1723                new_port->conn_instance = msg_nameinst(msg);
1724        }
1725
1726        /*
1727         * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1728         * Respond to 'SYN+' by queuing it on new socket.
1729         */
1730        if (!msg_data_sz(msg)) {
1731                struct msghdr m = {NULL,};
1732
1733                advance_rx_queue(sk);
1734                tipc_send_packet(NULL, new_sock, &m, 0);
1735        } else {
1736                __skb_dequeue(&sk->sk_receive_queue);
1737                __skb_queue_head(&new_sk->sk_receive_queue, buf);
1738                skb_set_owner_r(buf, new_sk);
1739        }
1740        release_sock(new_sk);
1741exit:
1742        release_sock(sk);
1743        return res;
1744}
1745
1746/**
1747 * tipc_shutdown - shutdown socket connection
1748 * @sock: socket structure
1749 * @how: direction to close (must be SHUT_RDWR)
1750 *
1751 * Terminates connection (if necessary), then purges socket's receive queue.
1752 *
1753 * Returns 0 on success, errno otherwise
1754 */
1755static int tipc_shutdown(struct socket *sock, int how)
1756{
1757        struct sock *sk = sock->sk;
1758        struct tipc_sock *tsk = tipc_sk(sk);
1759        struct tipc_port *port = &tsk->port;
1760        struct sk_buff *buf;
1761        int res;
1762
1763        if (how != SHUT_RDWR)
1764                return -EINVAL;
1765
1766        lock_sock(sk);
1767
1768        switch (sock->state) {
1769        case SS_CONNECTING:
1770        case SS_CONNECTED:
1771
1772restart:
1773                /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1774                buf = __skb_dequeue(&sk->sk_receive_queue);
1775                if (buf) {
1776                        if (TIPC_SKB_CB(buf)->handle != NULL) {
1777                                kfree_skb(buf);
1778                                goto restart;
1779                        }
1780                        tipc_port_disconnect(port->ref);
1781                        tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
1782                } else {
1783                        tipc_port_shutdown(port->ref);
1784                }
1785
1786                sock->state = SS_DISCONNECTING;
1787
1788                /* fall through */
1789
1790        case SS_DISCONNECTING:
1791
1792                /* Discard any unreceived messages */
1793                __skb_queue_purge(&sk->sk_receive_queue);
1794
1795                /* Wake up anyone sleeping in poll */
1796                sk->sk_state_change(sk);
1797                res = 0;
1798                break;
1799
1800        default:
1801                res = -ENOTCONN;
1802        }
1803
1804        release_sock(sk);
1805        return res;
1806}
1807
1808/**
1809 * tipc_setsockopt - set socket option
1810 * @sock: socket structure
1811 * @lvl: option level
1812 * @opt: option identifier
1813 * @ov: pointer to new option value
1814 * @ol: length of option value
1815 *
1816 * For stream sockets only, accepts and ignores all IPPROTO_TCP options
1817 * (to ease compatibility).
1818 *
1819 * Returns 0 on success, errno otherwise
1820 */
1821static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
1822                           char __user *ov, unsigned int ol)
1823{
1824        struct sock *sk = sock->sk;
1825        struct tipc_sock *tsk = tipc_sk(sk);
1826        struct tipc_port *port = &tsk->port;
1827        u32 value;
1828        int res;
1829
1830        if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1831                return 0;
1832        if (lvl != SOL_TIPC)
1833                return -ENOPROTOOPT;
1834        if (ol < sizeof(value))
1835                return -EINVAL;
1836        res = get_user(value, (u32 __user *)ov);
1837        if (res)
1838                return res;
1839
1840        lock_sock(sk);
1841
1842        switch (opt) {
1843        case TIPC_IMPORTANCE:
1844                tipc_port_set_importance(port, value);
1845                break;
1846        case TIPC_SRC_DROPPABLE:
1847                if (sock->type != SOCK_STREAM)
1848                        tipc_port_set_unreliable(port, value);
1849                else
1850                        res = -ENOPROTOOPT;
1851                break;
1852        case TIPC_DEST_DROPPABLE:
1853                tipc_port_set_unreturnable(port, value);
1854                break;
1855        case TIPC_CONN_TIMEOUT:
1856                tipc_sk(sk)->conn_timeout = value;
1857                /* no need to set "res", since already 0 at this point */
1858                break;
1859        default:
1860                res = -EINVAL;
1861        }
1862
1863        release_sock(sk);
1864
1865        return res;
1866}
1867
1868/**
1869 * tipc_getsockopt - get socket option
1870 * @sock: socket structure
1871 * @lvl: option level
1872 * @opt: option identifier
1873 * @ov: receptacle for option value
1874 * @ol: receptacle for length of option value
1875 *
1876 * For stream sockets only, returns 0 length result for all IPPROTO_TCP options
1877 * (to ease compatibility).
1878 *
1879 * Returns 0 on success, errno otherwise
1880 */
1881static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
1882                           char __user *ov, int __user *ol)
1883{
1884        struct sock *sk = sock->sk;
1885        struct tipc_sock *tsk = tipc_sk(sk);
1886        struct tipc_port *port = &tsk->port;
1887        int len;
1888        u32 value;
1889        int res;
1890
1891        if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
1892                return put_user(0, ol);
1893        if (lvl != SOL_TIPC)
1894                return -ENOPROTOOPT;
1895        res = get_user(len, ol);
1896        if (res)
1897                return res;
1898
1899        lock_sock(sk);
1900
1901        switch (opt) {
1902        case TIPC_IMPORTANCE:
1903                value = tipc_port_importance(port);
1904                break;
1905        case TIPC_SRC_DROPPABLE:
1906                value = tipc_port_unreliable(port);
1907                break;
1908        case TIPC_DEST_DROPPABLE:
1909                value = tipc_port_unreturnable(port);
1910                break;
1911        case TIPC_CONN_TIMEOUT:
1912                value = tipc_sk(sk)->conn_timeout;
1913                /* no need to set "res", since already 0 at this point */
1914                break;
1915        case TIPC_NODE_RECVQ_DEPTH:
1916                value = 0; /* was tipc_queue_size, now obsolete */
1917                break;
1918        case TIPC_SOCK_RECVQ_DEPTH:
1919                value = skb_queue_len(&sk->sk_receive_queue);
1920                break;
1921        default:
1922                res = -EINVAL;
1923        }
1924
1925        release_sock(sk);
1926
1927        if (res)
1928                return res;     /* "get" failed */
1929
1930        if (len < sizeof(value))
1931                return -EINVAL;
1932
1933        if (copy_to_user(ov, &value, sizeof(value)))
1934                return -EFAULT;
1935
1936        return put_user(sizeof(value), ol);
1937}
1938
1939int tipc_ioctl(struct socket *sk, unsigned int cmd, unsigned long arg)
1940{
1941        struct tipc_sioc_ln_req lnr;
1942        void __user *argp = (void __user *)arg;
1943
1944        switch (cmd) {
1945        case SIOCGETLINKNAME:
1946                if (copy_from_user(&lnr, argp, sizeof(lnr)))
1947                        return -EFAULT;
1948                if (!tipc_node_get_linkname(lnr.bearer_id, lnr.peer,
1949                                            lnr.linkname, TIPC_MAX_LINK_NAME)) {
1950                        if (copy_to_user(argp, &lnr, sizeof(lnr)))
1951                                return -EFAULT;
1952                        return 0;
1953                }
1954                return -EADDRNOTAVAIL;
1955                break;
1956        default:
1957                return -ENOIOCTLCMD;
1958        }
1959}
1960
1961/* Protocol switches for the various types of TIPC sockets */
1962
1963static const struct proto_ops msg_ops = {
1964        .owner          = THIS_MODULE,
1965        .family         = AF_TIPC,
1966        .release        = tipc_release,
1967        .bind           = tipc_bind,
1968        .connect        = tipc_connect,
1969        .socketpair     = sock_no_socketpair,
1970        .accept         = sock_no_accept,
1971        .getname        = tipc_getname,
1972        .poll           = tipc_poll,
1973        .ioctl          = tipc_ioctl,
1974        .listen         = sock_no_listen,
1975        .shutdown       = tipc_shutdown,
1976        .setsockopt     = tipc_setsockopt,
1977        .getsockopt     = tipc_getsockopt,
1978        .sendmsg        = tipc_sendmsg,
1979        .recvmsg        = tipc_recvmsg,
1980        .mmap           = sock_no_mmap,
1981        .sendpage       = sock_no_sendpage
1982};
1983
1984static const struct proto_ops packet_ops = {
1985        .owner          = THIS_MODULE,
1986        .family         = AF_TIPC,
1987        .release        = tipc_release,
1988        .bind           = tipc_bind,
1989        .connect        = tipc_connect,
1990        .socketpair     = sock_no_socketpair,
1991        .accept         = tipc_accept,
1992        .getname        = tipc_getname,
1993        .poll           = tipc_poll,
1994        .ioctl          = tipc_ioctl,
1995        .listen         = tipc_listen,
1996        .shutdown       = tipc_shutdown,
1997        .setsockopt     = tipc_setsockopt,
1998        .getsockopt     = tipc_getsockopt,
1999        .sendmsg        = tipc_send_packet,
2000        .recvmsg        = tipc_recvmsg,
2001        .mmap           = sock_no_mmap,
2002        .sendpage       = sock_no_sendpage
2003};
2004
2005static const struct proto_ops stream_ops = {
2006        .owner          = THIS_MODULE,
2007        .family         = AF_TIPC,
2008        .release        = tipc_release,
2009        .bind           = tipc_bind,
2010        .connect        = tipc_connect,
2011        .socketpair     = sock_no_socketpair,
2012        .accept         = tipc_accept,
2013        .getname        = tipc_getname,
2014        .poll           = tipc_poll,
2015        .ioctl          = tipc_ioctl,
2016        .listen         = tipc_listen,
2017        .shutdown       = tipc_shutdown,
2018        .setsockopt     = tipc_setsockopt,
2019        .getsockopt     = tipc_getsockopt,
2020        .sendmsg        = tipc_send_stream,
2021        .recvmsg        = tipc_recv_stream,
2022        .mmap           = sock_no_mmap,
2023        .sendpage       = sock_no_sendpage
2024};
2025
2026static const struct net_proto_family tipc_family_ops = {
2027        .owner          = THIS_MODULE,
2028        .family         = AF_TIPC,
2029        .create         = tipc_sk_create
2030};
2031
2032static struct proto tipc_proto = {
2033        .name           = "TIPC",
2034        .owner          = THIS_MODULE,
2035        .obj_size       = sizeof(struct tipc_sock),
2036        .sysctl_rmem    = sysctl_tipc_rmem
2037};
2038
2039static struct proto tipc_proto_kern = {
2040        .name           = "TIPC",
2041        .obj_size       = sizeof(struct tipc_sock),
2042        .sysctl_rmem    = sysctl_tipc_rmem
2043};
2044
2045/**
2046 * tipc_socket_init - initialize TIPC socket interface
2047 *
2048 * Returns 0 on success, errno otherwise
2049 */
2050int tipc_socket_init(void)
2051{
2052        int res;
2053
2054        res = proto_register(&tipc_proto, 1);
2055        if (res) {
2056                pr_err("Failed to register TIPC protocol type\n");
2057                goto out;
2058        }
2059
2060        res = sock_register(&tipc_family_ops);
2061        if (res) {
2062                pr_err("Failed to register TIPC socket type\n");
2063                proto_unregister(&tipc_proto);
2064                goto out;
2065        }
2066 out:
2067        return res;
2068}
2069
2070/**
2071 * tipc_socket_stop - stop TIPC socket interface
2072 */
2073void tipc_socket_stop(void)
2074{
2075        sock_unregister(tipc_family_ops.family);
2076        proto_unregister(&tipc_proto);
2077}
2078