linux/net/tipc/server.c
<<
>>
Prefs
   1/*
   2 * net/tipc/server.c: TIPC server infrastructure
   3 *
   4 * Copyright (c) 2012-2013, Wind River Systems
   5 * All rights reserved.
   6 *
   7 * Redistribution and use in source and binary forms, with or without
   8 * modification, are permitted provided that the following conditions are met:
   9 *
  10 * 1. Redistributions of source code must retain the above copyright
  11 *    notice, this list of conditions and the following disclaimer.
  12 * 2. Redistributions in binary form must reproduce the above copyright
  13 *    notice, this list of conditions and the following disclaimer in the
  14 *    documentation and/or other materials provided with the distribution.
  15 * 3. Neither the names of the copyright holders nor the names of its
  16 *    contributors may be used to endorse or promote products derived from
  17 *    this software without specific prior written permission.
  18 *
  19 * Alternatively, this software may be distributed under the terms of the
  20 * GNU General Public License ("GPL") version 2 as published by the Free
  21 * Software Foundation.
  22 *
  23 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  24 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  25 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  26 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  27 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  28 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  29 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  30 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  31 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  32 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  33 * POSSIBILITY OF SUCH DAMAGE.
  34 */
  35
  36#include "server.h"
  37#include "core.h"
  38#include <net/sock.h>
  39
  40/* Number of messages to send before rescheduling */
  41#define MAX_SEND_MSG_COUNT      25
  42#define MAX_RECV_MSG_COUNT      25
  43#define CF_CONNECTED            1
  44
  45#define sock2con(x) ((struct tipc_conn *)(x)->sk_user_data)
  46
  47/**
  48 * struct tipc_conn - TIPC connection structure
  49 * @kref: reference counter to connection object
  50 * @conid: connection identifier
  51 * @sock: socket handler associated with connection
  52 * @flags: indicates connection state
  53 * @server: pointer to connected server
  54 * @rwork: receive work item
  55 * @usr_data: user-specified field
  56 * @rx_action: what to do when connection socket is active
  57 * @outqueue: pointer to first outbound message in queue
  58 * @outqueue_lock: controll access to the outqueue
  59 * @outqueue: list of connection objects for its server
  60 * @swork: send work item
  61 */
  62struct tipc_conn {
  63        struct kref kref;
  64        int conid;
  65        struct socket *sock;
  66        unsigned long flags;
  67        struct tipc_server *server;
  68        struct work_struct rwork;
  69        int (*rx_action) (struct tipc_conn *con);
  70        void *usr_data;
  71        struct list_head outqueue;
  72        spinlock_t outqueue_lock;
  73        struct work_struct swork;
  74};
  75
  76/* An entry waiting to be sent */
  77struct outqueue_entry {
  78        struct list_head list;
  79        struct kvec iov;
  80        struct sockaddr_tipc dest;
  81};
  82
  83static void tipc_recv_work(struct work_struct *work);
  84static void tipc_send_work(struct work_struct *work);
  85static void tipc_clean_outqueues(struct tipc_conn *con);
  86
  87static void tipc_conn_kref_release(struct kref *kref)
  88{
  89        struct tipc_conn *con = container_of(kref, struct tipc_conn, kref);
  90        struct tipc_server *s = con->server;
  91
  92        if (con->sock) {
  93                tipc_sock_release_local(con->sock);
  94                con->sock = NULL;
  95        }
  96
  97        tipc_clean_outqueues(con);
  98
  99        if (con->conid)
 100                s->tipc_conn_shutdown(con->conid, con->usr_data);
 101
 102        kfree(con);
 103}
 104
 105static void conn_put(struct tipc_conn *con)
 106{
 107        kref_put(&con->kref, tipc_conn_kref_release);
 108}
 109
 110static void conn_get(struct tipc_conn *con)
 111{
 112        kref_get(&con->kref);
 113}
 114
 115static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
 116{
 117        struct tipc_conn *con;
 118
 119        spin_lock_bh(&s->idr_lock);
 120        con = idr_find(&s->conn_idr, conid);
 121        if (con)
 122                conn_get(con);
 123        spin_unlock_bh(&s->idr_lock);
 124        return con;
 125}
 126
 127static void sock_data_ready(struct sock *sk, int unused)
 128{
 129        struct tipc_conn *con;
 130
 131        read_lock(&sk->sk_callback_lock);
 132        con = sock2con(sk);
 133        if (con && test_bit(CF_CONNECTED, &con->flags)) {
 134                conn_get(con);
 135                if (!queue_work(con->server->rcv_wq, &con->rwork))
 136                        conn_put(con);
 137        }
 138        read_unlock(&sk->sk_callback_lock);
 139}
 140
 141static void sock_write_space(struct sock *sk)
 142{
 143        struct tipc_conn *con;
 144
 145        read_lock(&sk->sk_callback_lock);
 146        con = sock2con(sk);
 147        if (con && test_bit(CF_CONNECTED, &con->flags)) {
 148                conn_get(con);
 149                if (!queue_work(con->server->send_wq, &con->swork))
 150                        conn_put(con);
 151        }
 152        read_unlock(&sk->sk_callback_lock);
 153}
 154
 155static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con)
 156{
 157        struct sock *sk = sock->sk;
 158
 159        write_lock_bh(&sk->sk_callback_lock);
 160
 161        sk->sk_data_ready = sock_data_ready;
 162        sk->sk_write_space = sock_write_space;
 163        sk->sk_user_data = con;
 164
 165        con->sock = sock;
 166
 167        write_unlock_bh(&sk->sk_callback_lock);
 168}
 169
 170static void tipc_unregister_callbacks(struct tipc_conn *con)
 171{
 172        struct sock *sk = con->sock->sk;
 173
 174        write_lock_bh(&sk->sk_callback_lock);
 175        sk->sk_user_data = NULL;
 176        write_unlock_bh(&sk->sk_callback_lock);
 177}
 178
 179static void tipc_close_conn(struct tipc_conn *con)
 180{
 181        struct tipc_server *s = con->server;
 182
 183        if (test_and_clear_bit(CF_CONNECTED, &con->flags)) {
 184                spin_lock_bh(&s->idr_lock);
 185                idr_remove(&s->conn_idr, con->conid);
 186                s->idr_in_use--;
 187                spin_unlock_bh(&s->idr_lock);
 188
 189                tipc_unregister_callbacks(con);
 190
 191                /* We shouldn't flush pending works as we may be in the
 192                 * thread. In fact the races with pending rx/tx work structs
 193                 * are harmless for us here as we have already deleted this
 194                 * connection from server connection list and set
 195                 * sk->sk_user_data to 0 before releasing connection object.
 196                 */
 197                kernel_sock_shutdown(con->sock, SHUT_RDWR);
 198
 199                conn_put(con);
 200        }
 201}
 202
 203static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
 204{
 205        struct tipc_conn *con;
 206        int ret;
 207
 208        con = kzalloc(sizeof(struct tipc_conn), GFP_ATOMIC);
 209        if (!con)
 210                return ERR_PTR(-ENOMEM);
 211
 212        kref_init(&con->kref);
 213        INIT_LIST_HEAD(&con->outqueue);
 214        spin_lock_init(&con->outqueue_lock);
 215        INIT_WORK(&con->swork, tipc_send_work);
 216        INIT_WORK(&con->rwork, tipc_recv_work);
 217
 218        spin_lock_bh(&s->idr_lock);
 219        ret = idr_alloc(&s->conn_idr, con, 0, 0, GFP_ATOMIC);
 220        if (ret < 0) {
 221                kfree(con);
 222                spin_unlock_bh(&s->idr_lock);
 223                return ERR_PTR(-ENOMEM);
 224        }
 225        con->conid = ret;
 226        s->idr_in_use++;
 227        spin_unlock_bh(&s->idr_lock);
 228
 229        set_bit(CF_CONNECTED, &con->flags);
 230        con->server = s;
 231
 232        return con;
 233}
 234
 235static int tipc_receive_from_sock(struct tipc_conn *con)
 236{
 237        struct msghdr msg = {};
 238        struct tipc_server *s = con->server;
 239        struct sockaddr_tipc addr;
 240        struct kvec iov;
 241        void *buf;
 242        int ret;
 243
 244        buf = kmem_cache_alloc(s->rcvbuf_cache, GFP_ATOMIC);
 245        if (!buf) {
 246                ret = -ENOMEM;
 247                goto out_close;
 248        }
 249
 250        iov.iov_base = buf;
 251        iov.iov_len = s->max_rcvbuf_size;
 252        msg.msg_name = &addr;
 253        ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
 254                             MSG_DONTWAIT);
 255        if (ret <= 0) {
 256                kmem_cache_free(s->rcvbuf_cache, buf);
 257                goto out_close;
 258        }
 259
 260        s->tipc_conn_recvmsg(con->conid, &addr, con->usr_data, buf, ret);
 261
 262        kmem_cache_free(s->rcvbuf_cache, buf);
 263
 264        return 0;
 265
 266out_close:
 267        if (ret != -EWOULDBLOCK)
 268                tipc_close_conn(con);
 269        else if (ret == 0)
 270                /* Don't return success if we really got EOF */
 271                ret = -EAGAIN;
 272
 273        return ret;
 274}
 275
 276static int tipc_accept_from_sock(struct tipc_conn *con)
 277{
 278        struct tipc_server *s = con->server;
 279        struct socket *sock = con->sock;
 280        struct socket *newsock;
 281        struct tipc_conn *newcon;
 282        int ret;
 283
 284        ret = tipc_sock_accept_local(sock, &newsock, O_NONBLOCK);
 285        if (ret < 0)
 286                return ret;
 287
 288        newcon = tipc_alloc_conn(con->server);
 289        if (IS_ERR(newcon)) {
 290                ret = PTR_ERR(newcon);
 291                sock_release(newsock);
 292                return ret;
 293        }
 294
 295        newcon->rx_action = tipc_receive_from_sock;
 296        tipc_register_callbacks(newsock, newcon);
 297
 298        /* Notify that new connection is incoming */
 299        newcon->usr_data = s->tipc_conn_new(newcon->conid);
 300
 301        /* Wake up receive process in case of 'SYN+' message */
 302        newsock->sk->sk_data_ready(newsock->sk, 0);
 303        return ret;
 304}
 305
 306static struct socket *tipc_create_listen_sock(struct tipc_conn *con)
 307{
 308        struct tipc_server *s = con->server;
 309        struct socket *sock = NULL;
 310        int ret;
 311
 312        ret = tipc_sock_create_local(s->type, &sock);
 313        if (ret < 0)
 314                return NULL;
 315        ret = kernel_setsockopt(sock, SOL_TIPC, TIPC_IMPORTANCE,
 316                                (char *)&s->imp, sizeof(s->imp));
 317        if (ret < 0)
 318                goto create_err;
 319        ret = kernel_bind(sock, (struct sockaddr *)s->saddr, sizeof(*s->saddr));
 320        if (ret < 0)
 321                goto create_err;
 322
 323        switch (s->type) {
 324        case SOCK_STREAM:
 325        case SOCK_SEQPACKET:
 326                con->rx_action = tipc_accept_from_sock;
 327
 328                ret = kernel_listen(sock, 0);
 329                if (ret < 0)
 330                        goto create_err;
 331                break;
 332        case SOCK_DGRAM:
 333        case SOCK_RDM:
 334                con->rx_action = tipc_receive_from_sock;
 335                break;
 336        default:
 337                pr_err("Unknown socket type %d\n", s->type);
 338                goto create_err;
 339        }
 340        return sock;
 341
 342create_err:
 343        sock_release(sock);
 344        con->sock = NULL;
 345        return NULL;
 346}
 347
 348static int tipc_open_listening_sock(struct tipc_server *s)
 349{
 350        struct socket *sock;
 351        struct tipc_conn *con;
 352
 353        con = tipc_alloc_conn(s);
 354        if (IS_ERR(con))
 355                return PTR_ERR(con);
 356
 357        sock = tipc_create_listen_sock(con);
 358        if (!sock) {
 359                idr_remove(&s->conn_idr, con->conid);
 360                s->idr_in_use--;
 361                kfree(con);
 362                return -EINVAL;
 363        }
 364
 365        tipc_register_callbacks(sock, con);
 366        return 0;
 367}
 368
 369static struct outqueue_entry *tipc_alloc_entry(void *data, int len)
 370{
 371        struct outqueue_entry *entry;
 372        void *buf;
 373
 374        entry = kmalloc(sizeof(struct outqueue_entry), GFP_ATOMIC);
 375        if (!entry)
 376                return NULL;
 377
 378        buf = kmalloc(len, GFP_ATOMIC);
 379        if (!buf) {
 380                kfree(entry);
 381                return NULL;
 382        }
 383
 384        memcpy(buf, data, len);
 385        entry->iov.iov_base = buf;
 386        entry->iov.iov_len = len;
 387
 388        return entry;
 389}
 390
 391static void tipc_free_entry(struct outqueue_entry *e)
 392{
 393        kfree(e->iov.iov_base);
 394        kfree(e);
 395}
 396
 397static void tipc_clean_outqueues(struct tipc_conn *con)
 398{
 399        struct outqueue_entry *e, *safe;
 400
 401        spin_lock_bh(&con->outqueue_lock);
 402        list_for_each_entry_safe(e, safe, &con->outqueue, list) {
 403                list_del(&e->list);
 404                tipc_free_entry(e);
 405        }
 406        spin_unlock_bh(&con->outqueue_lock);
 407}
 408
 409int tipc_conn_sendmsg(struct tipc_server *s, int conid,
 410                      struct sockaddr_tipc *addr, void *data, size_t len)
 411{
 412        struct outqueue_entry *e;
 413        struct tipc_conn *con;
 414
 415        con = tipc_conn_lookup(s, conid);
 416        if (!con)
 417                return -EINVAL;
 418
 419        e = tipc_alloc_entry(data, len);
 420        if (!e) {
 421                conn_put(con);
 422                return -ENOMEM;
 423        }
 424
 425        if (addr)
 426                memcpy(&e->dest, addr, sizeof(struct sockaddr_tipc));
 427
 428        spin_lock_bh(&con->outqueue_lock);
 429        list_add_tail(&e->list, &con->outqueue);
 430        spin_unlock_bh(&con->outqueue_lock);
 431
 432        if (test_bit(CF_CONNECTED, &con->flags))
 433                if (!queue_work(s->send_wq, &con->swork))
 434                        conn_put(con);
 435
 436        return 0;
 437}
 438
 439void tipc_conn_terminate(struct tipc_server *s, int conid)
 440{
 441        struct tipc_conn *con;
 442
 443        con = tipc_conn_lookup(s, conid);
 444        if (con) {
 445                tipc_close_conn(con);
 446                conn_put(con);
 447        }
 448}
 449
 450static void tipc_send_to_sock(struct tipc_conn *con)
 451{
 452        int count = 0;
 453        struct tipc_server *s = con->server;
 454        struct outqueue_entry *e;
 455        struct msghdr msg;
 456        int ret;
 457
 458        spin_lock_bh(&con->outqueue_lock);
 459        while (1) {
 460                e = list_entry(con->outqueue.next, struct outqueue_entry,
 461                               list);
 462                if ((struct list_head *) e == &con->outqueue)
 463                        break;
 464                spin_unlock_bh(&con->outqueue_lock);
 465
 466                memset(&msg, 0, sizeof(msg));
 467                msg.msg_flags = MSG_DONTWAIT;
 468
 469                if (s->type == SOCK_DGRAM || s->type == SOCK_RDM) {
 470                        msg.msg_name = &e->dest;
 471                        msg.msg_namelen = sizeof(struct sockaddr_tipc);
 472                }
 473                ret = kernel_sendmsg(con->sock, &msg, &e->iov, 1,
 474                                     e->iov.iov_len);
 475                if (ret == -EWOULDBLOCK || ret == 0) {
 476                        cond_resched();
 477                        goto out;
 478                } else if (ret < 0) {
 479                        goto send_err;
 480                }
 481
 482                /* Don't starve users filling buffers */
 483                if (++count >= MAX_SEND_MSG_COUNT) {
 484                        cond_resched();
 485                        count = 0;
 486                }
 487
 488                spin_lock_bh(&con->outqueue_lock);
 489                list_del(&e->list);
 490                tipc_free_entry(e);
 491        }
 492        spin_unlock_bh(&con->outqueue_lock);
 493out:
 494        return;
 495
 496send_err:
 497        tipc_close_conn(con);
 498}
 499
 500static void tipc_recv_work(struct work_struct *work)
 501{
 502        struct tipc_conn *con = container_of(work, struct tipc_conn, rwork);
 503        int count = 0;
 504
 505        while (test_bit(CF_CONNECTED, &con->flags)) {
 506                if (con->rx_action(con))
 507                        break;
 508
 509                /* Don't flood Rx machine */
 510                if (++count >= MAX_RECV_MSG_COUNT) {
 511                        cond_resched();
 512                        count = 0;
 513                }
 514        }
 515        conn_put(con);
 516}
 517
 518static void tipc_send_work(struct work_struct *work)
 519{
 520        struct tipc_conn *con = container_of(work, struct tipc_conn, swork);
 521
 522        if (test_bit(CF_CONNECTED, &con->flags))
 523                tipc_send_to_sock(con);
 524
 525        conn_put(con);
 526}
 527
 528static void tipc_work_stop(struct tipc_server *s)
 529{
 530        destroy_workqueue(s->rcv_wq);
 531        destroy_workqueue(s->send_wq);
 532}
 533
 534static int tipc_work_start(struct tipc_server *s)
 535{
 536        s->rcv_wq = alloc_workqueue("tipc_rcv", WQ_UNBOUND, 1);
 537        if (!s->rcv_wq) {
 538                pr_err("can't start tipc receive workqueue\n");
 539                return -ENOMEM;
 540        }
 541
 542        s->send_wq = alloc_workqueue("tipc_send", WQ_UNBOUND, 1);
 543        if (!s->send_wq) {
 544                pr_err("can't start tipc send workqueue\n");
 545                destroy_workqueue(s->rcv_wq);
 546                return -ENOMEM;
 547        }
 548
 549        return 0;
 550}
 551
 552int tipc_server_start(struct tipc_server *s)
 553{
 554        int ret;
 555
 556        spin_lock_init(&s->idr_lock);
 557        idr_init(&s->conn_idr);
 558        s->idr_in_use = 0;
 559
 560        s->rcvbuf_cache = kmem_cache_create(s->name, s->max_rcvbuf_size,
 561                                            0, SLAB_HWCACHE_ALIGN, NULL);
 562        if (!s->rcvbuf_cache)
 563                return -ENOMEM;
 564
 565        ret = tipc_work_start(s);
 566        if (ret < 0) {
 567                kmem_cache_destroy(s->rcvbuf_cache);
 568                return ret;
 569        }
 570        ret = tipc_open_listening_sock(s);
 571        if (ret < 0) {
 572                tipc_work_stop(s);
 573                kmem_cache_destroy(s->rcvbuf_cache);
 574                return ret;
 575        }
 576        s->enabled = 1;
 577        return ret;
 578}
 579
 580void tipc_server_stop(struct tipc_server *s)
 581{
 582        struct tipc_conn *con;
 583        int total = 0;
 584        int id;
 585
 586        if (!s->enabled)
 587                return;
 588
 589        s->enabled = 0;
 590        spin_lock_bh(&s->idr_lock);
 591        for (id = 0; total < s->idr_in_use; id++) {
 592                con = idr_find(&s->conn_idr, id);
 593                if (con) {
 594                        total++;
 595                        spin_unlock_bh(&s->idr_lock);
 596                        tipc_close_conn(con);
 597                        spin_lock_bh(&s->idr_lock);
 598                }
 599        }
 600        spin_unlock_bh(&s->idr_lock);
 601
 602        tipc_work_stop(s);
 603        kmem_cache_destroy(s->rcvbuf_cache);
 604        idr_destroy(&s->conn_idr);
 605}
 606