linux/net/tipc/subscr.c
<<
>>
Prefs
   1/*
   2 * net/tipc/subscr.c: TIPC network topology service
   3 *
   4 * Copyright (c) 2000-2006, Ericsson AB
   5 * Copyright (c) 2005-2007, 2010-2013, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include "core.h"
  38#include "name_table.h"
  39#include "port.h"
  40#include "subscr.h"
  41
  42/**
  43 * struct tipc_subscriber - TIPC network topology subscriber
  44 * @conid: connection identifier to server connecting to subscriber
  45 * @lock: controll access to subscriber
  46 * @subscription_list: list of subscription objects for this subscriber
  47 */
  48struct tipc_subscriber {
  49        int conid;
  50        spinlock_t lock;
  51        struct list_head subscription_list;
  52};
  53
  54static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
  55                                  void *usr_data, void *buf, size_t len);
  56static void *subscr_named_msg_event(int conid);
  57static void subscr_conn_shutdown_event(int conid, void *usr_data);
  58
  59static atomic_t subscription_count = ATOMIC_INIT(0);
  60
  61static struct sockaddr_tipc topsrv_addr __read_mostly = {
  62        .family                 = AF_TIPC,
  63        .addrtype               = TIPC_ADDR_NAMESEQ,
  64        .addr.nameseq.type      = TIPC_TOP_SRV,
  65        .addr.nameseq.lower     = TIPC_TOP_SRV,
  66        .addr.nameseq.upper     = TIPC_TOP_SRV,
  67        .scope                  = TIPC_NODE_SCOPE
  68};
  69
  70static struct tipc_server topsrv __read_mostly = {
  71        .saddr                  = &topsrv_addr,
  72        .imp                    = TIPC_CRITICAL_IMPORTANCE,
  73        .type                   = SOCK_SEQPACKET,
  74        .max_rcvbuf_size        = sizeof(struct tipc_subscr),
  75        .name                   = "topology_server",
  76        .tipc_conn_recvmsg      = subscr_conn_msg_event,
  77        .tipc_conn_new          = subscr_named_msg_event,
  78        .tipc_conn_shutdown     = subscr_conn_shutdown_event,
  79};
  80
  81/**
  82 * htohl - convert value to endianness used by destination
  83 * @in: value to convert
  84 * @swap: non-zero if endianness must be reversed
  85 *
  86 * Returns converted value
  87 */
  88static u32 htohl(u32 in, int swap)
  89{
  90        return swap ? swab32(in) : in;
  91}
  92
  93static void subscr_send_event(struct tipc_subscription *sub, u32 found_lower,
  94                              u32 found_upper, u32 event, u32 port_ref,
  95                              u32 node)
  96{
  97        struct tipc_subscriber *subscriber = sub->subscriber;
  98        struct kvec msg_sect;
  99        int ret;
 100
 101        msg_sect.iov_base = (void *)&sub->evt;
 102        msg_sect.iov_len = sizeof(struct tipc_event);
 103
 104        sub->evt.event = htohl(event, sub->swap);
 105        sub->evt.found_lower = htohl(found_lower, sub->swap);
 106        sub->evt.found_upper = htohl(found_upper, sub->swap);
 107        sub->evt.port.ref = htohl(port_ref, sub->swap);
 108        sub->evt.port.node = htohl(node, sub->swap);
 109        ret = tipc_conn_sendmsg(&topsrv, subscriber->conid, NULL,
 110                                msg_sect.iov_base, msg_sect.iov_len);
 111        if (ret < 0)
 112                pr_err("Sending subscription event failed, no memory\n");
 113}
 114
 115/**
 116 * tipc_subscr_overlap - test for subscription overlap with the given values
 117 *
 118 * Returns 1 if there is overlap, otherwise 0.
 119 */
 120int tipc_subscr_overlap(struct tipc_subscription *sub, u32 found_lower,
 121                        u32 found_upper)
 122{
 123        if (found_lower < sub->seq.lower)
 124                found_lower = sub->seq.lower;
 125        if (found_upper > sub->seq.upper)
 126                found_upper = sub->seq.upper;
 127        if (found_lower > found_upper)
 128                return 0;
 129        return 1;
 130}
 131
 132/**
 133 * tipc_subscr_report_overlap - issue event if there is subscription overlap
 134 *
 135 * Protected by nameseq.lock in name_table.c
 136 */
 137void tipc_subscr_report_overlap(struct tipc_subscription *sub, u32 found_lower,
 138                                u32 found_upper, u32 event, u32 port_ref,
 139                                u32 node, int must)
 140{
 141        if (!tipc_subscr_overlap(sub, found_lower, found_upper))
 142                return;
 143        if (!must && !(sub->filter & TIPC_SUB_PORTS))
 144                return;
 145
 146        subscr_send_event(sub, found_lower, found_upper, event, port_ref, node);
 147}
 148
 149static void subscr_timeout(struct tipc_subscription *sub)
 150{
 151        struct tipc_subscriber *subscriber = sub->subscriber;
 152
 153        /* The spin lock per subscriber is used to protect its members */
 154        spin_lock_bh(&subscriber->lock);
 155
 156        /* Validate if the connection related to the subscriber is
 157         * closed (in case subscriber is terminating)
 158         */
 159        if (subscriber->conid == 0) {
 160                spin_unlock_bh(&subscriber->lock);
 161                return;
 162        }
 163
 164        /* Validate timeout (in case subscription is being cancelled) */
 165        if (sub->timeout == TIPC_WAIT_FOREVER) {
 166                spin_unlock_bh(&subscriber->lock);
 167                return;
 168        }
 169
 170        /* Unlink subscription from name table */
 171        tipc_nametbl_unsubscribe(sub);
 172
 173        /* Unlink subscription from subscriber */
 174        list_del(&sub->subscription_list);
 175
 176        spin_unlock_bh(&subscriber->lock);
 177
 178        /* Notify subscriber of timeout */
 179        subscr_send_event(sub, sub->evt.s.seq.lower, sub->evt.s.seq.upper,
 180                          TIPC_SUBSCR_TIMEOUT, 0, 0);
 181
 182        /* Now destroy subscription */
 183        k_term_timer(&sub->timer);
 184        kfree(sub);
 185        atomic_dec(&subscription_count);
 186}
 187
 188/**
 189 * subscr_del - delete a subscription within a subscription list
 190 *
 191 * Called with subscriber lock held.
 192 */
 193static void subscr_del(struct tipc_subscription *sub)
 194{
 195        tipc_nametbl_unsubscribe(sub);
 196        list_del(&sub->subscription_list);
 197        kfree(sub);
 198        atomic_dec(&subscription_count);
 199}
 200
 201/**
 202 * subscr_terminate - terminate communication with a subscriber
 203 *
 204 * Note: Must call it in process context since it might sleep.
 205 */
 206static void subscr_terminate(struct tipc_subscriber *subscriber)
 207{
 208        tipc_conn_terminate(&topsrv, subscriber->conid);
 209}
 210
 211static void subscr_release(struct tipc_subscriber *subscriber)
 212{
 213        struct tipc_subscription *sub;
 214        struct tipc_subscription *sub_temp;
 215
 216        spin_lock_bh(&subscriber->lock);
 217
 218        /* Invalidate subscriber reference */
 219        subscriber->conid = 0;
 220
 221        /* Destroy any existing subscriptions for subscriber */
 222        list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
 223                                 subscription_list) {
 224                if (sub->timeout != TIPC_WAIT_FOREVER) {
 225                        spin_unlock_bh(&subscriber->lock);
 226                        k_cancel_timer(&sub->timer);
 227                        k_term_timer(&sub->timer);
 228                        spin_lock_bh(&subscriber->lock);
 229                }
 230                subscr_del(sub);
 231        }
 232        spin_unlock_bh(&subscriber->lock);
 233
 234        /* Now destroy subscriber */
 235        kfree(subscriber);
 236}
 237
 238/**
 239 * subscr_cancel - handle subscription cancellation request
 240 *
 241 * Called with subscriber lock held. Routine must temporarily release lock
 242 * to enable the subscription timeout routine to finish without deadlocking;
 243 * the lock is then reclaimed to allow caller to release it upon return.
 244 *
 245 * Note that fields of 's' use subscriber's endianness!
 246 */
 247static void subscr_cancel(struct tipc_subscr *s,
 248                          struct tipc_subscriber *subscriber)
 249{
 250        struct tipc_subscription *sub;
 251        struct tipc_subscription *sub_temp;
 252        int found = 0;
 253
 254        /* Find first matching subscription, exit if not found */
 255        list_for_each_entry_safe(sub, sub_temp, &subscriber->subscription_list,
 256                                 subscription_list) {
 257                if (!memcmp(s, &sub->evt.s, sizeof(struct tipc_subscr))) {
 258                        found = 1;
 259                        break;
 260                }
 261        }
 262        if (!found)
 263                return;
 264
 265        /* Cancel subscription timer (if used), then delete subscription */
 266        if (sub->timeout != TIPC_WAIT_FOREVER) {
 267                sub->timeout = TIPC_WAIT_FOREVER;
 268                spin_unlock_bh(&subscriber->lock);
 269                k_cancel_timer(&sub->timer);
 270                k_term_timer(&sub->timer);
 271                spin_lock_bh(&subscriber->lock);
 272        }
 273        subscr_del(sub);
 274}
 275
 276/**
 277 * subscr_subscribe - create subscription for subscriber
 278 *
 279 * Called with subscriber lock held.
 280 */
 281static struct tipc_subscription *subscr_subscribe(struct tipc_subscr *s,
 282                                             struct tipc_subscriber *subscriber)
 283{
 284        struct tipc_subscription *sub;
 285        int swap;
 286
 287        /* Determine subscriber's endianness */
 288        swap = !(s->filter & (TIPC_SUB_PORTS | TIPC_SUB_SERVICE));
 289
 290        /* Detect & process a subscription cancellation request */
 291        if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
 292                s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
 293                subscr_cancel(s, subscriber);
 294                return NULL;
 295        }
 296
 297        /* Refuse subscription if global limit exceeded */
 298        if (atomic_read(&subscription_count) >= TIPC_MAX_SUBSCRIPTIONS) {
 299                pr_warn("Subscription rejected, limit reached (%u)\n",
 300                        TIPC_MAX_SUBSCRIPTIONS);
 301                subscr_terminate(subscriber);
 302                return NULL;
 303        }
 304
 305        /* Allocate subscription object */
 306        sub = kmalloc(sizeof(*sub), GFP_ATOMIC);
 307        if (!sub) {
 308                pr_warn("Subscription rejected, no memory\n");
 309                subscr_terminate(subscriber);
 310                return NULL;
 311        }
 312
 313        /* Initialize subscription object */
 314        sub->seq.type = htohl(s->seq.type, swap);
 315        sub->seq.lower = htohl(s->seq.lower, swap);
 316        sub->seq.upper = htohl(s->seq.upper, swap);
 317        sub->timeout = htohl(s->timeout, swap);
 318        sub->filter = htohl(s->filter, swap);
 319        if ((!(sub->filter & TIPC_SUB_PORTS) ==
 320             !(sub->filter & TIPC_SUB_SERVICE)) ||
 321            (sub->seq.lower > sub->seq.upper)) {
 322                pr_warn("Subscription rejected, illegal request\n");
 323                kfree(sub);
 324                subscr_terminate(subscriber);
 325                return NULL;
 326        }
 327        INIT_LIST_HEAD(&sub->nameseq_list);
 328        list_add(&sub->subscription_list, &subscriber->subscription_list);
 329        sub->subscriber = subscriber;
 330        sub->swap = swap;
 331        memcpy(&sub->evt.s, s, sizeof(struct tipc_subscr));
 332        atomic_inc(&subscription_count);
 333        if (sub->timeout != TIPC_WAIT_FOREVER) {
 334                k_init_timer(&sub->timer,
 335                             (Handler)subscr_timeout, (unsigned long)sub);
 336                k_start_timer(&sub->timer, sub->timeout);
 337        }
 338
 339        return sub;
 340}
 341
 342/* Handle one termination request for the subscriber */
 343static void subscr_conn_shutdown_event(int conid, void *usr_data)
 344{
 345        subscr_release((struct tipc_subscriber *)usr_data);
 346}
 347
 348/* Handle one request to create a new subscription for the subscriber */
 349static void subscr_conn_msg_event(int conid, struct sockaddr_tipc *addr,
 350                                  void *usr_data, void *buf, size_t len)
 351{
 352        struct tipc_subscriber *subscriber = usr_data;
 353        struct tipc_subscription *sub;
 354
 355        spin_lock_bh(&subscriber->lock);
 356        sub = subscr_subscribe((struct tipc_subscr *)buf, subscriber);
 357        if (sub)
 358                tipc_nametbl_subscribe(sub);
 359        spin_unlock_bh(&subscriber->lock);
 360}
 361
 362
 363/* Handle one request to establish a new subscriber */
 364static void *subscr_named_msg_event(int conid)
 365{
 366        struct tipc_subscriber *subscriber;
 367
 368        /* Create subscriber object */
 369        subscriber = kzalloc(sizeof(struct tipc_subscriber), GFP_ATOMIC);
 370        if (subscriber == NULL) {
 371                pr_warn("Subscriber rejected, no memory\n");
 372                return NULL;
 373        }
 374        INIT_LIST_HEAD(&subscriber->subscription_list);
 375        subscriber->conid = conid;
 376        spin_lock_init(&subscriber->lock);
 377
 378        return (void *)subscriber;
 379}
 380
 381int tipc_subscr_start(void)
 382{
 383        return tipc_server_start(&topsrv);
 384}
 385
 386void tipc_subscr_stop(void)
 387{
 388        tipc_server_stop(&topsrv);
 389}
 390