linux/net/tipc/subscr.c
<<
>>
Prefs
   1/*
   2 * net/tipc/subscr.c: TIPC network topology service
   3 *
   4 * Copyright (c) 2000-2006, Ericsson AB
   5 * Copyright (c) 2005-2007, 2010-2013, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include "core.h"
  38#include "name_table.h"
  39#include "port.h"
  40#include "subscr.h"
  41
  42/**
  43 * struct tipc_subscriber - TIPC network topology subscriber
  44 * @conid: connection identifier to server connecting to subscriber
  45 * @lock: control access to subscriber
  46 * @subscription_list: list of subscription objects for this subscriber
  47 */
  48struct tipc_subscriber {
  49        int conid;
  50        spinlock_t lock;
  51        struct list_head subscription_list;
  52};
  53
  54static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
  55                                  void *usr_data, void *buf, size_t len);
  56static void *subscr_named_msg_event(int conid);
  57static void subscr_conn_shutdown_event(int conid, void *usr_data);
  58
  59static atomic_t subscription_count = ATOMIC_INIT(0);
  60
  61static struct sockaddr_tipc topsrv_addr __read_mostly = {
  62        .family                 = AF_TIPC,
  63        .addrtype               = TIPC_ADDR_NAMESEQ,
  64        .addr.nameseq.type      = TIPC_TOP_SRV,
  65        .addr.nameseq.lower     = TIPC_TOP_SRV,
  66        .addr.nameseq.upper     = TIPC_TOP_SRV,
  67        .scope                  = TIPC_NODE_SCOPE
  68};
  69
  70static struct tipc_server topsrv __read_mostly = {
  71        .saddr                  = &topsrv_addr,
  72        .imp                    = TIPC_CRITICAL_IMPORTANCE,
  73        .type                   = SOCK_SEQPACKET,
  74        .max_rcvbuf_size        = sizeof(struct tipc_subscr),
  75        .name                   = "topology_server",
  76        .tipc_conn_recvmsg      = subscr_conn_msg_event,
  77        .tipc_conn_new          = subscr_named_msg_event,
  78        .tipc_conn_shutdown     = subscr_conn_shutdown_event,
  79};
  80
  81/**
  82 * htohl - convert value to endianness used by destination
  83 * @in: value to convert
  84 * @swap: non-zero if endianness must be reversed
  85 *
  86 * Returns converted value
  87 */
  88static u32 htohl(u32 in, int swap)
  89{
  90        return swap ? swab32(in) : in;
  91}
  92
  93static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower,
  94                              u32 found_upper, u32 event, u32 port_ref,
  95                              u32 node)
  96{
  97        struct tipc_subscriber *subscriber = sub->subscriber;
  98        struct kvec msg_sect;
  99
 100        msg_sect.iov_base = (void *)&sub->evt;
 101        msg_sect.iov_len = sizeof(struct tipc_event);
 102        sub->evt.event = htohl(event, sub->swap);
 103        sub->evt.found_lower = htohl(found_lower, sub->swap);
 104        sub->evt.found_upper = htohl(found_upper, sub->swap);
 105        sub->evt.port.ref = htohl(port_ref, sub->swap);
 106        sub->evt.port.node = htohl(node, sub->swap);
 107        tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL, msg_sect.iov_base,
 108                          msg_sect.iov_len);
 109}
 110
 111/**
 112 * tipc_subscr_overlap - test for subscription overlap with the given values
 113 *
 114 * Returns 1 if there is overlap, otherwise 0.
 115 */
 116int tipc_subscr_overlap(struct tipc_subscription *sub, u32 found_lower,
 117                        u32 found_upper)
 118{
 119        if (found_lower < sub->seq.lower)
 120                found_lower = sub->seq.lower;
 121        if (found_upper > sub->seq.upper)
 122                found_upper = sub->seq.upper;
 123        if (found_lower > found_upper)
 124                return 0;
 125        return 1;
 126}
 127
 128/**
 129 * tipc_subscr_report_overlap - issue event if there is subscription overlap
 130 *
 131 * Protected by nameseq.lock in name_table.c
 132 */
 133void tipc_subscr_report_overlap(struct tipc_subscription *sub, u32 found_lower,
 134                                u32 found_upper, u32 event, u32 port_ref,
 135                                u32 node, int must)
 136{
 137        if (!tipc_subscr_overlap(sub, found_lower, found_upper))
 138                return;
 139        if (!must && !(sub->filter & TIPC_SUB_PORTS))
 140                return;
 141
 142        subscr_send_event(sub, found_lower, found_upper, event, port_ref, node);
 143}
 144
 145static void subscr_timeout(struct tipc_subscription *sub)
 146{
 147        struct tipc_subscriber *subscriber = sub->subscriber;
 148
 149        /* The spin lock per subscriber is used to protect its members */
 150        spin_lock_bh(&subscriber->lock);
 151
 152        /* Validate timeout (in case subscription is being cancelled) */
 153        if (sub->timeout == TIPC_WAIT_FOREVER) {
 154                spin_unlock_bh(&subscriber->lock);
 155                return;
 156        }
 157
 158        /* Unlink subscription from name table */
 159        tipc_nametbl_unsubscribe(sub);
 160
 161        /* Unlink subscription from subscriber */
 162        list_del(&sub->subscription_list);
 163
 164        spin_unlock_bh(&subscriber->lock);
 165
 166        /* Notify subscriber of timeout */
 167        subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
 168                          TIPC_SUBSCR_TIMEOUT, 0, 0);
 169
 170        /* Now destroy subscription */
 171        k_term_timer(&sub->timer);
 172        kfree(sub);
 173        atomic_dec(&subscription_count);
 174}
 175
 176/**
 177 * subscr_del - delete a subscription within a subscription list
 178 *
 179 * Called with subscriber lock held.
 180 */
 181static void subscr_del(struct tipc_subscription *sub)
 182{
 183        tipc_nametbl_unsubscribe(sub);
 184        list_del(&sub->subscription_list);
 185        kfree(sub);
 186        atomic_dec(&subscription_count);
 187}
 188
 189/**
 190 * subscr_terminate - terminate communication with a subscriber
 191 *
 192 * Note: Must call it in process context since it might sleep.
 193 */
 194static void subscr_terminate(struct tipc_subscriber *subscriber)
 195{
 196        tipc_conn_terminate(&topsrv, subscriber->conid);
 197}
 198
 199static void subscr_release(struct tipc_subscriber *subscriber)
 200{
 201        struct tipc_subscription *sub;
 202        struct tipc_subscription *sub_temp;
 203
 204        spin_lock_bh(&subscriber->lock);
 205
 206        /* Destroy any existing subscriptions for subscriber */
 207        list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
 208                                 subscription_list) {
 209                if (sub->timeout != TIPC_WAIT_FOREVER) {
 210                        spin_unlock_bh(&subscriber->lock);
 211                        k_cancel_timer(&sub->timer);
 212                        k_term_timer(&sub->timer);
 213                        spin_lock_bh(&subscriber->lock);
 214                }
 215                subscr_del(sub);
 216        }
 217        spin_unlock_bh(&subscriber->lock);
 218
 219        /* Now destroy subscriber */
 220        kfree(subscriber);
 221}
 222
 223/**
 224 * subscr_cancel - handle subscription cancellation request
 225 *
 226 * Called with subscriber lock held. Routine must temporarily release lock
 227 * to enable the subscription timeout routine to finish without deadlocking;
 228 * the lock is then reclaimed to allow caller to release it upon return.
 229 *
 230 * Note that fields of 's' use subscriber's endianness!
 231 */
 232static void subscr_cancel(struct tipc_subscr *s,
 233                          struct tipc_subscriber *subscriber)
 234{
 235        struct tipc_subscription *sub;
 236        struct tipc_subscription *sub_temp;
 237        int found = 0;
 238
 239        /* Find first matching subscription, exit if not found */
 240        list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
 241                                 subscription_list) {
 242                if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
 243                        found = 1;
 244                        break;
 245                }
 246        }
 247        if (!found)
 248                return;
 249
 250        /* Cancel subscription timer (if used), then delete subscription */
 251        if (sub->timeout != TIPC_WAIT_FOREVER) {
 252                sub->timeout = TIPC_WAIT_FOREVER;
 253                spin_unlock_bh(&subscriber->lock);
 254                k_cancel_timer(&sub->timer);
 255                k_term_timer(&sub->timer);
 256                spin_lock_bh(&subscriber->lock);
 257        }
 258        subscr_del(sub);
 259}
 260
 261/**
 262 * subscr_subscribe - create subscription for subscriber
 263 *
 264 * Called with subscriber lock held.
 265 */
 266static int subscr_subscribe(struct tipc_subscr *s,
 267                            struct tipc_subscriber *subscriber,
 268                            struct tipc_subscription **sub_p) {
 269        struct tipc_subscription *sub;
 270        int swap;
 271
 272        /* Determine subscriber's endianness */
 273        swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE));
 274
 275        /* Detect & process a subscription cancellation request */
 276        if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
 277                s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
 278                subscr_cancel(s, subscriber);
 279                return 0;
 280        }
 281
 282        /* Refuse subscription if global limit exceeded */
 283        if (atomic_read(&subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) {
 284                pr_warn("Subscription rejected, limit reached (%u)\n",
 285                        TIPC_MAX_SUBSCRIPTIONS);
 286                return -EINVAL;
 287        }
 288
 289        /* Allocate subscription object */
 290        sub = kmalloc(sizeof(*sub), GFP_ATOMIC);
 291        if (!sub) {
 292                pr_warn("Subscription rejected, no memory\n");
 293                return -ENOMEM;
 294        }
 295
 296        /* Initialize subscription object */
 297        sub->seq.type = htohl(s->seq.type, swap);
 298        sub->seq.lower = htohl(s->seq.lower, swap);
 299        sub->seq.upper = htohl(s->seq.upper, swap);
 300        sub->timeout = htohl(s->timeout, swap);
 301        sub->filter = htohl(s->filter, swap);
 302        if ((!(sub->filter & TIPC_SUB_PORTS) ==
 303             !(sub->filter & TIPC_SUB_SERVICE)) ||
 304            (sub->seq.lower > sub->seq.upper)) {
 305                pr_warn("Subscription rejected, illegal request\n");
 306                kfree(sub);
 307                return -EINVAL;
 308        }
 309        INIT_LIST_HEAD(&sub->nameseq_list);
 310        list_add(&sub->subscription_list, &subscriber->subscription_list);
 311        sub->subscriber = subscriber;
 312        sub->swap = swap;
 313        memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
 314        atomic_inc(&subscription_count);
 315        if (sub->timeout != TIPC_WAIT_FOREVER) {
 316                k_init_timer(&sub->timer,
 317                             (Handler)subscr_timeout, (unsigned long)sub);
 318                k_start_timer(&sub->timer, sub->timeout);
 319        }
 320        *sub_p = sub;
 321        return 0;
 322}
 323
 324/* Handle one termination request for the subscriber */
 325static void subscr_conn_shutdown_event(int conid, void *usr_data)
 326{
 327        subscr_release((struct tipc_subscriber *)usr_data);
 328}
 329
 330/* Handle one request to create a new subscription for the subscriber */
 331static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
 332                                  void *usr_data, void *buf, size_t len)
 333{
 334        struct tipc_subscriber *subscriber = usr_data;
 335        struct tipc_subscription *sub = NULL;
 336
 337        spin_lock_bh(&subscriber->lock);
 338        if (subscr_subscribe((struct tipc_subscr *)buf, subscriber, &sub) < 0) {
 339                spin_unlock_bh(&subscriber->lock);
 340                subscr_terminate(subscriber);
 341                return;
 342        }
 343        if (sub)
 344                tipc_nametbl_subscribe(sub);
 345        spin_unlock_bh(&subscriber->lock);
 346}
 347
 348
 349/* Handle one request to establish a new subscriber */
 350static void *subscr_named_msg_event(int conid)
 351{
 352        struct tipc_subscriber *subscriber;
 353
 354        /* Create subscriber object */
 355        subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC);
 356        if (subscriber == NULL) {
 357                pr_warn("Subscriber rejected, no memory\n");
 358                return NULL;
 359        }
 360        INIT_LIST_HEAD(&subscriber->subscription_list);
 361        subscriber->conid = conid;
 362        spin_lock_init(&subscriber->lock);
 363
 364        return (void *)subscriber;
 365}
 366
 367int tipc_subscr_start(void)
 368{
 369        return tipc_server_start(&topsrv);
 370}
 371
 372void tipc_subscr_stop(void)
 373{
 374        tipc_server_stop(&topsrv);
 375}
 376