linux/net/tipc/name_table.c
<<
>>
Prefs
   1/*
   2 * net/tipc/name_table.c: TIPC name table code
   3 *
   4 * Copyright (c) 2000-2006, 2014-2018, Ericsson AB
   5 * Copyright (c) 2004-2008, 2010-2014, Wind River Systems
   6 * Copyright (c) 2020-2021, Red Hat Inc
   7 * All rights reserved.
   8 *
   9 * Redistribution and use in source and binary forms, with or without
  10 * modification, are permitted provided that the following conditions are met:
  11 *
  12 * 1. Redistributions of source code must retain the above copyright
  13 *    notice, this list of conditions and the following disclaimer.
  14 * 2. Redistributions in binary form must reproduce the above copyright
  15 *    notice, this list of conditions and the following disclaimer in the
  16 *    documentation and/or other materials provided with the distribution.
  17 * 3. Neither the names of the copyright holders nor the names of its
  18 *    contributors may be used to endorse or promote products derived from
  19 *    this software without specific prior written permission.
  20 *
  21 * Alternatively, this software may be distributed under the terms of the
  22 * GNU General Public License ("GPL") version 2 as published by the Free
  23 * Software Foundation.
  24 *
  25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  26 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  27 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  28 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  29 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  30 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  31 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  32 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  33 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  34 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  35 * POSSIBILITY OF SUCH DAMAGE.
  36 */
  37
  38#include <net/sock.h>
  39#include <linux/list_sort.h>
  40#include <linux/rbtree_augmented.h>
  41#include "core.h"
  42#include "netlink.h"
  43#include "name_table.h"
  44#include "name_distr.h"
  45#include "subscr.h"
  46#include "bcast.h"
  47#include "addr.h"
  48#include "node.h"
  49#include "group.h"
  50
  51/**
  52 * struct service_range - container for all bindings of a service range
  53 * @lower: service range lower bound
  54 * @upper: service range upper bound
  55 * @tree_node: member of service range RB tree
  56 * @max: largest 'upper' in this node subtree
  57 * @local_publ: list of identical publications made from this node
  58 *   Used by closest_first lookup and multicast lookup algorithm
  59 * @all_publ: all publications identical to this one, whatever node and scope
  60 *   Used by round-robin lookup algorithm
  61 */
  62struct service_range {
  63        u32 lower;
  64        u32 upper;
  65        struct rb_node tree_node;
  66        u32 max;
  67        struct list_head local_publ;
  68        struct list_head all_publ;
  69};
  70
  71/**
  72 * struct tipc_service - container for all published instances of a service type
  73 * @type: 32 bit 'type' value for service
  74 * @publ_cnt: increasing counter for publications in this service
  75 * @ranges: rb tree containing all service ranges for this service
  76 * @service_list: links to adjacent name ranges in hash chain
  77 * @subscriptions: list of subscriptions for this service type
  78 * @lock: spinlock controlling access to pertaining service ranges/publications
  79 * @rcu: RCU callback head used for deferred freeing
  80 */
  81struct tipc_service {
  82        u32 type;
  83        u32 publ_cnt;
  84        struct rb_root ranges;
  85        struct hlist_node service_list;
  86        struct list_head subscriptions;
  87        spinlock_t lock; /* Covers service range list */
  88        struct rcu_head rcu;
  89};
  90
  91#define service_range_upper(sr) ((sr)->upper)
  92RB_DECLARE_CALLBACKS_MAX(static, sr_callbacks,
  93                         struct service_range, tree_node, u32, max,
  94                         service_range_upper)
  95
  96#define service_range_entry(rbtree_node)                                \
  97        (container_of(rbtree_node, struct service_range, tree_node))
  98
  99#define service_range_overlap(sr, start, end)                           \
 100        ((sr)->lower <= (end) && (sr)->upper >= (start))
 101
 102/**
 103 * service_range_foreach_match - iterate over tipc service rbtree for each
 104 *                               range match
 105 * @sr: the service range pointer as a loop cursor
 106 * @sc: the pointer to tipc service which holds the service range rbtree
 107 * @start: beginning of the search range (end >= start) for matching
 108 * @end: end of the search range (end >= start) for matching
 109 */
 110#define service_range_foreach_match(sr, sc, start, end)                 \
 111        for (sr = service_range_match_first((sc)->ranges.rb_node,       \
 112                                            start,                      \
 113                                            end);                       \
 114             sr;                                                        \
 115             sr = service_range_match_next(&(sr)->tree_node,            \
 116                                           start,                       \
 117                                           end))
 118
 119/**
 120 * service_range_match_first - find first service range matching a range
 121 * @n: the root node of service range rbtree for searching
 122 * @start: beginning of the search range (end >= start) for matching
 123 * @end: end of the search range (end >= start) for matching
 124 *
 125 * Return: the leftmost service range node in the rbtree that overlaps the
 126 * specific range if any. Otherwise, returns NULL.
 127 */
 128static struct service_range *service_range_match_first(struct rb_node *n,
 129                                                       u32 start, u32 end)
 130{
 131        struct service_range *sr;
 132        struct rb_node *l, *r;
 133
 134        /* Non overlaps in tree at all? */
 135        if (!n || service_range_entry(n)->max < start)
 136                return NULL;
 137
 138        while (n) {
 139                l = n->rb_left;
 140                if (l && service_range_entry(l)->max >= start) {
 141                        /* A leftmost overlap range node must be one in the left
 142                         * subtree. If not, it has lower > end, then nodes on
 143                         * the right side cannot satisfy the condition either.
 144                         */
 145                        n = l;
 146                        continue;
 147                }
 148
 149                /* No one in the left subtree can match, return if this node is
 150                 * an overlap i.e. leftmost.
 151                 */
 152                sr = service_range_entry(n);
 153                if (service_range_overlap(sr, start, end))
 154                        return sr;
 155
 156                /* Ok, try to lookup on the right side */
 157                r = n->rb_right;
 158                if (sr->lower <= end &&
 159                    r && service_range_entry(r)->max >= start) {
 160                        n = r;
 161                        continue;
 162                }
 163                break;
 164        }
 165
 166        return NULL;
 167}
 168
 169/**
 170 * service_range_match_next - find next service range matching a range
 171 * @n: a node in service range rbtree from which the searching starts
 172 * @start: beginning of the search range (end >= start) for matching
 173 * @end: end of the search range (end >= start) for matching
 174 *
 175 * Return: the next service range node to the given node in the rbtree that
 176 * overlaps the specific range if any. Otherwise, returns NULL.
 177 */
 178static struct service_range *service_range_match_next(struct rb_node *n,
 179                                                      u32 start, u32 end)
 180{
 181        struct service_range *sr;
 182        struct rb_node *p, *r;
 183
 184        while (n) {
 185                r = n->rb_right;
 186                if (r && service_range_entry(r)->max >= start)
 187                        /* A next overlap range node must be one in the right
 188                         * subtree. If not, it has lower > end, then any next
 189                         * successor (- an ancestor) of this node cannot
 190                         * satisfy the condition either.
 191                         */
 192                        return service_range_match_first(r, start, end);
 193
 194                /* No one in the right subtree can match, go up to find an
 195                 * ancestor of this node which is parent of a left-hand child.
 196                 */
 197                while ((p = rb_parent(n)) && n == p->rb_right)
 198                        n = p;
 199                if (!p)
 200                        break;
 201
 202                /* Return if this ancestor is an overlap */
 203                sr = service_range_entry(p);
 204                if (service_range_overlap(sr, start, end))
 205                        return sr;
 206
 207                /* Ok, try to lookup more from this ancestor */
 208                if (sr->lower <= end) {
 209                        n = p;
 210                        continue;
 211                }
 212                break;
 213        }
 214
 215        return NULL;
 216}
 217
 218static int hash(int x)
 219{
 220        return x & (TIPC_NAMETBL_SIZE - 1);
 221}
 222
 223/**
 224 * tipc_publ_create - create a publication structure
 225 * @ua: the service range the user is binding to
 226 * @sk: the address of the socket that is bound
 227 * @key: publication key
 228 */
 229static struct publication *tipc_publ_create(struct tipc_uaddr *ua,
 230                                            struct tipc_socket_addr *sk,
 231                                            u32 key)
 232{
 233        struct publication *p = kzalloc(sizeof(*p), GFP_ATOMIC);
 234
 235        if (!p)
 236                return NULL;
 237
 238        p->sr = ua->sr;
 239        p->sk = *sk;
 240        p->scope = ua->scope;
 241        p->key = key;
 242        INIT_LIST_HEAD(&p->binding_sock);
 243        INIT_LIST_HEAD(&p->binding_node);
 244        INIT_LIST_HEAD(&p->local_publ);
 245        INIT_LIST_HEAD(&p->all_publ);
 246        INIT_LIST_HEAD(&p->list);
 247        return p;
 248}
 249
 250/**
 251 * tipc_service_create - create a service structure for the specified 'type'
 252 * @net: network namespace
 253 * @ua: address representing the service to be bound
 254 *
 255 * Allocates a single range structure and sets it to all 0's.
 256 */
 257static struct tipc_service *tipc_service_create(struct net *net,
 258                                                struct tipc_uaddr *ua)
 259{
 260        struct name_table *nt = tipc_name_table(net);
 261        struct tipc_service *service;
 262        struct hlist_head *hd;
 263
 264        service = kzalloc(sizeof(*service), GFP_ATOMIC);
 265        if (!service) {
 266                pr_warn("Service creation failed, no memory\n");
 267                return NULL;
 268        }
 269
 270        spin_lock_init(&service->lock);
 271        service->type = ua->sr.type;
 272        service->ranges = RB_ROOT;
 273        INIT_HLIST_NODE(&service->service_list);
 274        INIT_LIST_HEAD(&service->subscriptions);
 275        hd = &nt->services[hash(ua->sr.type)];
 276        hlist_add_head_rcu(&service->service_list, hd);
 277        return service;
 278}
 279
 280/*  tipc_service_find_range - find service range matching publication parameters
 281 */
 282static struct service_range *tipc_service_find_range(struct tipc_service *sc,
 283                                                     struct tipc_uaddr *ua)
 284{
 285        struct service_range *sr;
 286
 287        service_range_foreach_match(sr, sc, ua->sr.lower, ua->sr.upper) {
 288                /* Look for exact match */
 289                if (sr->lower == ua->sr.lower && sr->upper == ua->sr.upper)
 290                        return sr;
 291        }
 292
 293        return NULL;
 294}
 295
 296static struct service_range *tipc_service_create_range(struct tipc_service *sc,
 297                                                       struct publication *p)
 298{
 299        struct rb_node **n, *parent = NULL;
 300        struct service_range *sr;
 301        u32 lower = p->sr.lower;
 302        u32 upper = p->sr.upper;
 303
 304        n = &sc->ranges.rb_node;
 305        while (*n) {
 306                parent = *n;
 307                sr = service_range_entry(parent);
 308                if (lower == sr->lower && upper == sr->upper)
 309                        return sr;
 310                if (sr->max < upper)
 311                        sr->max = upper;
 312                if (lower <= sr->lower)
 313                        n = &parent->rb_left;
 314                else
 315                        n = &parent->rb_right;
 316        }
 317        sr = kzalloc(sizeof(*sr), GFP_ATOMIC);
 318        if (!sr)
 319                return NULL;
 320        sr->lower = lower;
 321        sr->upper = upper;
 322        sr->max = upper;
 323        INIT_LIST_HEAD(&sr->local_publ);
 324        INIT_LIST_HEAD(&sr->all_publ);
 325        rb_link_node(&sr->tree_node, parent, n);
 326        rb_insert_augmented(&sr->tree_node, &sc->ranges, &sr_callbacks);
 327        return sr;
 328}
 329
 330static bool tipc_service_insert_publ(struct net *net,
 331                                     struct tipc_service *sc,
 332                                     struct publication *p)
 333{
 334        struct tipc_subscription *sub, *tmp;
 335        struct service_range *sr;
 336        struct publication *_p;
 337        u32 node = p->sk.node;
 338        bool first = false;
 339        bool res = false;
 340        u32 key = p->key;
 341
 342        spin_lock_bh(&sc->lock);
 343        sr = tipc_service_create_range(sc, p);
 344        if (!sr)
 345                goto  exit;
 346
 347        first = list_empty(&sr->all_publ);
 348
 349        /* Return if the publication already exists */
 350        list_for_each_entry(_p, &sr->all_publ, all_publ) {
 351                if (_p->key == key && (!_p->sk.node || _p->sk.node == node)) {
 352                        pr_debug("Failed to bind duplicate %u,%u,%u/%u:%u/%u\n",
 353                                 p->sr.type, p->sr.lower, p->sr.upper,
 354                                 node, p->sk.ref, key);
 355                        goto exit;
 356                }
 357        }
 358
 359        if (in_own_node(net, p->sk.node))
 360                list_add(&p->local_publ, &sr->local_publ);
 361        list_add(&p->all_publ, &sr->all_publ);
 362        p->id = sc->publ_cnt++;
 363
 364        /* Any subscriptions waiting for notification?  */
 365        list_for_each_entry_safe(sub, tmp, &sc->subscriptions, service_list) {
 366                tipc_sub_report_overlap(sub, p, TIPC_PUBLISHED, first);
 367        }
 368        res = true;
 369exit:
 370        if (!res)
 371                pr_warn("Failed to bind to %u,%u,%u\n",
 372                        p->sr.type, p->sr.lower, p->sr.upper);
 373        spin_unlock_bh(&sc->lock);
 374        return res;
 375}
 376
 377/**
 378 * tipc_service_remove_publ - remove a publication from a service
 379 * @r: service_range to remove publication from
 380 * @sk: address publishing socket
 381 * @key: target publication key
 382 */
 383static struct publication *tipc_service_remove_publ(struct service_range *r,
 384                                                    struct tipc_socket_addr *sk,
 385                                                    u32 key)
 386{
 387        struct publication *p;
 388        u32 node = sk->node;
 389
 390        list_for_each_entry(p, &r->all_publ, all_publ) {
 391                if (p->key != key || (node && node != p->sk.node))
 392                        continue;
 393                list_del(&p->all_publ);
 394                list_del(&p->local_publ);
 395                return p;
 396        }
 397        return NULL;
 398}
 399
 400/*
 401 * Code reused: time_after32() for the same purpose
 402 */
 403#define publication_after(pa, pb) time_after32((pa)->id, (pb)->id)
 404static int tipc_publ_sort(void *priv, const struct list_head *a,
 405                          const struct list_head *b)
 406{
 407        struct publication *pa, *pb;
 408
 409        pa = container_of(a, struct publication, list);
 410        pb = container_of(b, struct publication, list);
 411        return publication_after(pa, pb);
 412}
 413
 414/**
 415 * tipc_service_subscribe - attach a subscription, and optionally
 416 * issue the prescribed number of events if there is any service
 417 * range overlapping with the requested range
 418 * @service: the tipc_service to attach the @sub to
 419 * @sub: the subscription to attach
 420 */
 421static void tipc_service_subscribe(struct tipc_service *service,
 422                                   struct tipc_subscription *sub)
 423{
 424        struct publication *p, *first, *tmp;
 425        struct list_head publ_list;
 426        struct service_range *sr;
 427        u32 filter, lower, upper;
 428
 429        filter = sub->s.filter;
 430        lower = sub->s.seq.lower;
 431        upper = sub->s.seq.upper;
 432
 433        tipc_sub_get(sub);
 434        list_add(&sub->service_list, &service->subscriptions);
 435
 436        if (filter & TIPC_SUB_NO_STATUS)
 437                return;
 438
 439        INIT_LIST_HEAD(&publ_list);
 440        service_range_foreach_match(sr, service, lower, upper) {
 441                first = NULL;
 442                list_for_each_entry(p, &sr->all_publ, all_publ) {
 443                        if (filter & TIPC_SUB_PORTS)
 444                                list_add_tail(&p->list, &publ_list);
 445                        else if (!first || publication_after(first, p))
 446                                /* Pick this range's *first* publication */
 447                                first = p;
 448                }
 449                if (first)
 450                        list_add_tail(&first->list, &publ_list);
 451        }
 452
 453        /* Sort the publications before reporting */
 454        list_sort(NULL, &publ_list, tipc_publ_sort);
 455        list_for_each_entry_safe(p, tmp, &publ_list, list) {
 456                tipc_sub_report_overlap(sub, p, TIPC_PUBLISHED, true);
 457                list_del_init(&p->list);
 458        }
 459}
 460
 461static struct tipc_service *tipc_service_find(struct net *net,
 462                                              struct tipc_uaddr *ua)
 463{
 464        struct name_table *nt = tipc_name_table(net);
 465        struct hlist_head *service_head;
 466        struct tipc_service *service;
 467
 468        service_head = &nt->services[hash(ua->sr.type)];
 469        hlist_for_each_entry_rcu(service, service_head, service_list) {
 470                if (service->type == ua->sr.type)
 471                        return service;
 472        }
 473        return NULL;
 474};
 475
 476struct publication *tipc_nametbl_insert_publ(struct net *net,
 477                                             struct tipc_uaddr *ua,
 478                                             struct tipc_socket_addr *sk,
 479                                             u32 key)
 480{
 481        struct tipc_service *sc;
 482        struct publication *p;
 483
 484        p = tipc_publ_create(ua, sk, key);
 485        if (!p)
 486                return NULL;
 487
 488        sc = tipc_service_find(net, ua);
 489        if (!sc)
 490                sc = tipc_service_create(net, ua);
 491        if (sc && tipc_service_insert_publ(net, sc, p))
 492                return p;
 493        kfree(p);
 494        return NULL;
 495}
 496
 497struct publication *tipc_nametbl_remove_publ(struct net *net,
 498                                             struct tipc_uaddr *ua,
 499                                             struct tipc_socket_addr *sk,
 500                                             u32 key)
 501{
 502        struct tipc_subscription *sub, *tmp;
 503        struct publication *p = NULL;
 504        struct service_range *sr;
 505        struct tipc_service *sc;
 506        bool last;
 507
 508        sc = tipc_service_find(net, ua);
 509        if (!sc)
 510                goto exit;
 511
 512        spin_lock_bh(&sc->lock);
 513        sr = tipc_service_find_range(sc, ua);
 514        if (!sr)
 515                goto unlock;
 516        p = tipc_service_remove_publ(sr, sk, key);
 517        if (!p)
 518                goto unlock;
 519
 520        /* Notify any waiting subscriptions */
 521        last = list_empty(&sr->all_publ);
 522        list_for_each_entry_safe(sub, tmp, &sc->subscriptions, service_list) {
 523                tipc_sub_report_overlap(sub, p, TIPC_WITHDRAWN, last);
 524        }
 525
 526        /* Remove service range item if this was its last publication */
 527        if (list_empty(&sr->all_publ)) {
 528                rb_erase_augmented(&sr->tree_node, &sc->ranges, &sr_callbacks);
 529                kfree(sr);
 530        }
 531
 532        /* Delete service item if no more publications and subscriptions */
 533        if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) {
 534                hlist_del_init_rcu(&sc->service_list);
 535                kfree_rcu(sc, rcu);
 536        }
 537unlock:
 538        spin_unlock_bh(&sc->lock);
 539exit:
 540        if (!p) {
 541                pr_err("Failed to remove unknown binding: %u,%u,%u/%u:%u/%u\n",
 542                       ua->sr.type, ua->sr.lower, ua->sr.upper,
 543                       sk->node, sk->ref, key);
 544        }
 545        return p;
 546}
 547
 548/**
 549 * tipc_nametbl_lookup_anycast - perform service instance to socket translation
 550 * @net: network namespace
 551 * @ua: service address to look up
 552 * @sk: address to socket we want to find
 553 *
 554 * On entry, a non-zero 'sk->node' indicates the node where we want lookup to be
 555 * performed, which may not be this one.
 556 *
 557 * On exit:
 558 *
 559 * - If lookup is deferred to another node, leave 'sk->node' unchanged and
 560 *   return 'true'.
 561 * - If lookup is successful, set the 'sk->node' and 'sk->ref' (== portid) which
 562 *   represent the bound socket and return 'true'.
 563 * - If lookup fails, return 'false'
 564 *
 565 * Note that for legacy users (node configured with Z.C.N address format) the
 566 * 'closest-first' lookup algorithm must be maintained, i.e., if sk.node is 0
 567 * we must look in the local binding list first
 568 */
 569bool tipc_nametbl_lookup_anycast(struct net *net,
 570                                 struct tipc_uaddr *ua,
 571                                 struct tipc_socket_addr *sk)
 572{
 573        struct tipc_net *tn = tipc_net(net);
 574        bool legacy = tn->legacy_addr_format;
 575        u32 self = tipc_own_addr(net);
 576        u32 inst = ua->sa.instance;
 577        struct service_range *r;
 578        struct tipc_service *sc;
 579        struct publication *p;
 580        struct list_head *l;
 581        bool res = false;
 582
 583        if (!tipc_in_scope(legacy, sk->node, self))
 584                return true;
 585
 586        rcu_read_lock();
 587        sc = tipc_service_find(net, ua);
 588        if (unlikely(!sc))
 589                goto exit;
 590
 591        spin_lock_bh(&sc->lock);
 592        service_range_foreach_match(r, sc, inst, inst) {
 593                /* Select lookup algo: local, closest-first or round-robin */
 594                if (sk->node == self) {
 595                        l = &r->local_publ;
 596                        if (list_empty(l))
 597                                continue;
 598                        p = list_first_entry(l, struct publication, local_publ);
 599                        list_move_tail(&p->local_publ, &r->local_publ);
 600                } else if (legacy && !sk->node && !list_empty(&r->local_publ)) {
 601                        l = &r->local_publ;
 602                        p = list_first_entry(l, struct publication, local_publ);
 603                        list_move_tail(&p->local_publ, &r->local_publ);
 604                } else {
 605                        l = &r->all_publ;
 606                        p = list_first_entry(l, struct publication, all_publ);
 607                        list_move_tail(&p->all_publ, &r->all_publ);
 608                }
 609                *sk = p->sk;
 610                res = true;
 611                /* Todo: as for legacy, pick the first matching range only, a
 612                 * "true" round-robin will be performed as needed.
 613                 */
 614                break;
 615        }
 616        spin_unlock_bh(&sc->lock);
 617
 618exit:
 619        rcu_read_unlock();
 620        return res;
 621}
 622
 623/* tipc_nametbl_lookup_group(): lookup destinaton(s) in a communication group
 624 * Returns a list of one (== group anycast) or more (== group multicast)
 625 * destination socket/node pairs matching the given address.
 626 * The requester may or may not want to exclude himself from the list.
 627 */
 628bool tipc_nametbl_lookup_group(struct net *net, struct tipc_uaddr *ua,
 629                               struct list_head *dsts, int *dstcnt,
 630                               u32 exclude, bool mcast)
 631{
 632        u32 self = tipc_own_addr(net);
 633        u32 inst = ua->sa.instance;
 634        struct service_range *sr;
 635        struct tipc_service *sc;
 636        struct publication *p;
 637
 638        *dstcnt = 0;
 639        rcu_read_lock();
 640        sc = tipc_service_find(net, ua);
 641        if (unlikely(!sc))
 642                goto exit;
 643
 644        spin_lock_bh(&sc->lock);
 645
 646        /* Todo: a full search i.e. service_range_foreach_match() instead? */
 647        sr = service_range_match_first(sc->ranges.rb_node, inst, inst);
 648        if (!sr)
 649                goto no_match;
 650
 651        list_for_each_entry(p, &sr->all_publ, all_publ) {
 652                if (p->scope != ua->scope)
 653                        continue;
 654                if (p->sk.ref == exclude && p->sk.node == self)
 655                        continue;
 656                tipc_dest_push(dsts, p->sk.node, p->sk.ref);
 657                (*dstcnt)++;
 658                if (mcast)
 659                        continue;
 660                list_move_tail(&p->all_publ, &sr->all_publ);
 661                break;
 662        }
 663no_match:
 664        spin_unlock_bh(&sc->lock);
 665exit:
 666        rcu_read_unlock();
 667        return !list_empty(dsts);
 668}
 669
 670/* tipc_nametbl_lookup_mcast_sockets(): look up node local destinaton sockets
 671 *                                      matching the given address
 672 * Used on nodes which have received a multicast/broadcast message
 673 * Returns a list of local sockets
 674 */
 675void tipc_nametbl_lookup_mcast_sockets(struct net *net, struct tipc_uaddr *ua,
 676                                       struct list_head *dports)
 677{
 678        struct service_range *sr;
 679        struct tipc_service *sc;
 680        struct publication *p;
 681        u8 scope = ua->scope;
 682
 683        rcu_read_lock();
 684        sc = tipc_service_find(net, ua);
 685        if (!sc)
 686                goto exit;
 687
 688        spin_lock_bh(&sc->lock);
 689        service_range_foreach_match(sr, sc, ua->sr.lower, ua->sr.upper) {
 690                list_for_each_entry(p, &sr->local_publ, local_publ) {
 691                        if (scope == p->scope || scope == TIPC_ANY_SCOPE)
 692                                tipc_dest_push(dports, 0, p->sk.ref);
 693                }
 694        }
 695        spin_unlock_bh(&sc->lock);
 696exit:
 697        rcu_read_unlock();
 698}
 699
 700/* tipc_nametbl_lookup_mcast_nodes(): look up all destination nodes matching
 701 *                                    the given address. Used in sending node.
 702 * Used on nodes which are sending out a multicast/broadcast message
 703 * Returns a list of nodes, including own node if applicable
 704 */
 705void tipc_nametbl_lookup_mcast_nodes(struct net *net, struct tipc_uaddr *ua,
 706                                     struct tipc_nlist *nodes)
 707{
 708        struct service_range *sr;
 709        struct tipc_service *sc;
 710        struct publication *p;
 711
 712        rcu_read_lock();
 713        sc = tipc_service_find(net, ua);
 714        if (!sc)
 715                goto exit;
 716
 717        spin_lock_bh(&sc->lock);
 718        service_range_foreach_match(sr, sc, ua->sr.lower, ua->sr.upper) {
 719                list_for_each_entry(p, &sr->all_publ, all_publ) {
 720                        tipc_nlist_add(nodes, p->sk.node);
 721                }
 722        }
 723        spin_unlock_bh(&sc->lock);
 724exit:
 725        rcu_read_unlock();
 726}
 727
 728/* tipc_nametbl_build_group - build list of communication group members
 729 */
 730void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
 731                              struct tipc_uaddr *ua)
 732{
 733        struct service_range *sr;
 734        struct tipc_service *sc;
 735        struct publication *p;
 736        struct rb_node *n;
 737
 738        rcu_read_lock();
 739        sc = tipc_service_find(net, ua);
 740        if (!sc)
 741                goto exit;
 742
 743        spin_lock_bh(&sc->lock);
 744        for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
 745                sr = container_of(n, struct service_range, tree_node);
 746                list_for_each_entry(p, &sr->all_publ, all_publ) {
 747                        if (p->scope != ua->scope)
 748                                continue;
 749                        tipc_group_add_member(grp, p->sk.node, p->sk.ref,
 750                                              p->sr.lower);
 751                }
 752        }
 753        spin_unlock_bh(&sc->lock);
 754exit:
 755        rcu_read_unlock();
 756}
 757
 758/* tipc_nametbl_publish - add service binding to name table
 759 */
 760struct publication *tipc_nametbl_publish(struct net *net, struct tipc_uaddr *ua,
 761                                         struct tipc_socket_addr *sk, u32 key)
 762{
 763        struct name_table *nt = tipc_name_table(net);
 764        struct tipc_net *tn = tipc_net(net);
 765        struct publication *p = NULL;
 766        struct sk_buff *skb = NULL;
 767        u32 rc_dests;
 768
 769        spin_lock_bh(&tn->nametbl_lock);
 770
 771        if (nt->local_publ_count >= TIPC_MAX_PUBL) {
 772                pr_warn("Bind failed, max limit %u reached\n", TIPC_MAX_PUBL);
 773                goto exit;
 774        }
 775
 776        p = tipc_nametbl_insert_publ(net, ua, sk, key);
 777        if (p) {
 778                nt->local_publ_count++;
 779                skb = tipc_named_publish(net, p);
 780        }
 781        rc_dests = nt->rc_dests;
 782exit:
 783        spin_unlock_bh(&tn->nametbl_lock);
 784
 785        if (skb)
 786                tipc_node_broadcast(net, skb, rc_dests);
 787        return p;
 788
 789}
 790
 791/**
 792 * tipc_nametbl_withdraw - withdraw a service binding
 793 * @net: network namespace
 794 * @ua: service address/range being unbound
 795 * @sk: address of the socket being unbound from
 796 * @key: target publication key
 797 */
 798void tipc_nametbl_withdraw(struct net *net, struct tipc_uaddr *ua,
 799                           struct tipc_socket_addr *sk, u32 key)
 800{
 801        struct name_table *nt = tipc_name_table(net);
 802        struct tipc_net *tn = tipc_net(net);
 803        struct sk_buff *skb = NULL;
 804        struct publication *p;
 805        u32 rc_dests;
 806
 807        spin_lock_bh(&tn->nametbl_lock);
 808
 809        p = tipc_nametbl_remove_publ(net, ua, sk, key);
 810        if (p) {
 811                nt->local_publ_count--;
 812                skb = tipc_named_withdraw(net, p);
 813                list_del_init(&p->binding_sock);
 814                kfree_rcu(p, rcu);
 815        }
 816        rc_dests = nt->rc_dests;
 817        spin_unlock_bh(&tn->nametbl_lock);
 818
 819        if (skb)
 820                tipc_node_broadcast(net, skb, rc_dests);
 821}
 822
 823/**
 824 * tipc_nametbl_subscribe - add a subscription object to the name table
 825 * @sub: subscription to add
 826 */
 827bool tipc_nametbl_subscribe(struct tipc_subscription *sub)
 828{
 829        struct tipc_net *tn = tipc_net(sub->net);
 830        u32 type = sub->s.seq.type;
 831        struct tipc_service *sc;
 832        struct tipc_uaddr ua;
 833        bool res = true;
 834
 835        tipc_uaddr(&ua, TIPC_SERVICE_RANGE, TIPC_NODE_SCOPE, type,
 836                   sub->s.seq.lower, sub->s.seq.upper);
 837        spin_lock_bh(&tn->nametbl_lock);
 838        sc = tipc_service_find(sub->net, &ua);
 839        if (!sc)
 840                sc = tipc_service_create(sub->net, &ua);
 841        if (sc) {
 842                spin_lock_bh(&sc->lock);
 843                tipc_service_subscribe(sc, sub);
 844                spin_unlock_bh(&sc->lock);
 845        } else {
 846                pr_warn("Failed to subscribe for {%u,%u,%u}\n",
 847                        type, sub->s.seq.lower, sub->s.seq.upper);
 848                res = false;
 849        }
 850        spin_unlock_bh(&tn->nametbl_lock);
 851        return res;
 852}
 853
 854/**
 855 * tipc_nametbl_unsubscribe - remove a subscription object from name table
 856 * @sub: subscription to remove
 857 */
 858void tipc_nametbl_unsubscribe(struct tipc_subscription *sub)
 859{
 860        struct tipc_net *tn = tipc_net(sub->net);
 861        struct tipc_service *sc;
 862        struct tipc_uaddr ua;
 863
 864        tipc_uaddr(&ua, TIPC_SERVICE_RANGE, TIPC_NODE_SCOPE,
 865                   sub->s.seq.type, sub->s.seq.lower, sub->s.seq.upper);
 866        spin_lock_bh(&tn->nametbl_lock);
 867        sc = tipc_service_find(sub->net, &ua);
 868        if (!sc)
 869                goto exit;
 870
 871        spin_lock_bh(&sc->lock);
 872        list_del_init(&sub->service_list);
 873        tipc_sub_put(sub);
 874
 875        /* Delete service item if no more publications and subscriptions */
 876        if (RB_EMPTY_ROOT(&sc->ranges) && list_empty(&sc->subscriptions)) {
 877                hlist_del_init_rcu(&sc->service_list);
 878                kfree_rcu(sc, rcu);
 879        }
 880        spin_unlock_bh(&sc->lock);
 881exit:
 882        spin_unlock_bh(&tn->nametbl_lock);
 883}
 884
 885int tipc_nametbl_init(struct net *net)
 886{
 887        struct tipc_net *tn = tipc_net(net);
 888        struct name_table *nt;
 889        int i;
 890
 891        nt = kzalloc(sizeof(*nt), GFP_KERNEL);
 892        if (!nt)
 893                return -ENOMEM;
 894
 895        for (i = 0; i < TIPC_NAMETBL_SIZE; i++)
 896                INIT_HLIST_HEAD(&nt->services[i]);
 897
 898        INIT_LIST_HEAD(&nt->node_scope);
 899        INIT_LIST_HEAD(&nt->cluster_scope);
 900        rwlock_init(&nt->cluster_scope_lock);
 901        tn->nametbl = nt;
 902        spin_lock_init(&tn->nametbl_lock);
 903        return 0;
 904}
 905
 906/**
 907 * tipc_service_delete - purge all publications for a service and delete it
 908 * @net: the associated network namespace
 909 * @sc: tipc_service to delete
 910 */
 911static void tipc_service_delete(struct net *net, struct tipc_service *sc)
 912{
 913        struct service_range *sr, *tmpr;
 914        struct publication *p, *tmp;
 915
 916        spin_lock_bh(&sc->lock);
 917        rbtree_postorder_for_each_entry_safe(sr, tmpr, &sc->ranges, tree_node) {
 918                list_for_each_entry_safe(p, tmp, &sr->all_publ, all_publ) {
 919                        tipc_service_remove_publ(sr, &p->sk, p->key);
 920                        kfree_rcu(p, rcu);
 921                }
 922                rb_erase_augmented(&sr->tree_node, &sc->ranges, &sr_callbacks);
 923                kfree(sr);
 924        }
 925        hlist_del_init_rcu(&sc->service_list);
 926        spin_unlock_bh(&sc->lock);
 927        kfree_rcu(sc, rcu);
 928}
 929
 930void tipc_nametbl_stop(struct net *net)
 931{
 932        struct name_table *nt = tipc_name_table(net);
 933        struct tipc_net *tn = tipc_net(net);
 934        struct hlist_head *service_head;
 935        struct tipc_service *service;
 936        u32 i;
 937
 938        /* Verify name table is empty and purge any lingering
 939         * publications, then release the name table
 940         */
 941        spin_lock_bh(&tn->nametbl_lock);
 942        for (i = 0; i < TIPC_NAMETBL_SIZE; i++) {
 943                if (hlist_empty(&nt->services[i]))
 944                        continue;
 945                service_head = &nt->services[i];
 946                hlist_for_each_entry_rcu(service, service_head, service_list) {
 947                        tipc_service_delete(net, service);
 948                }
 949        }
 950        spin_unlock_bh(&tn->nametbl_lock);
 951
 952        synchronize_net();
 953        kfree(nt);
 954}
 955
 956static int __tipc_nl_add_nametable_publ(struct tipc_nl_msg *msg,
 957                                        struct tipc_service *service,
 958                                        struct service_range *sr,
 959                                        u32 *last_key)
 960{
 961        struct publication *p;
 962        struct nlattr *attrs;
 963        struct nlattr *b;
 964        void *hdr;
 965
 966        if (*last_key) {
 967                list_for_each_entry(p, &sr->all_publ, all_publ)
 968                        if (p->key == *last_key)
 969                                break;
 970                if (p->key != *last_key)
 971                        return -EPIPE;
 972        } else {
 973                p = list_first_entry(&sr->all_publ,
 974                                     struct publication,
 975                                     all_publ);
 976        }
 977
 978        list_for_each_entry_from(p, &sr->all_publ, all_publ) {
 979                *last_key = p->key;
 980
 981                hdr = genlmsg_put(msg->skb, msg->portid, msg->seq,
 982                                  &tipc_genl_family, NLM_F_MULTI,
 983                                  TIPC_NL_NAME_TABLE_GET);
 984                if (!hdr)
 985                        return -EMSGSIZE;
 986
 987                attrs = nla_nest_start_noflag(msg->skb, TIPC_NLA_NAME_TABLE);
 988                if (!attrs)
 989                        goto msg_full;
 990
 991                b = nla_nest_start_noflag(msg->skb, TIPC_NLA_NAME_TABLE_PUBL);
 992                if (!b)
 993                        goto attr_msg_full;
 994
 995                if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_TYPE, service->type))
 996                        goto publ_msg_full;
 997                if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_LOWER, sr->lower))
 998                        goto publ_msg_full;
 999                if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_UPPER, sr->upper))
1000                        goto publ_msg_full;
1001                if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_SCOPE, p->scope))
1002                        goto publ_msg_full;
1003                if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_NODE, p->sk.node))
1004                        goto publ_msg_full;
1005                if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_REF, p->sk.ref))
1006                        goto publ_msg_full;
1007                if (nla_put_u32(msg->skb, TIPC_NLA_PUBL_KEY, p->key))
1008                        goto publ_msg_full;
1009
1010                nla_nest_end(msg->skb, b);
1011                nla_nest_end(msg->skb, attrs);
1012                genlmsg_end(msg->skb, hdr);
1013        }
1014        *last_key = 0;
1015
1016        return 0;
1017
1018publ_msg_full:
1019        nla_nest_cancel(msg->skb, b);
1020attr_msg_full:
1021        nla_nest_cancel(msg->skb, attrs);
1022msg_full:
1023        genlmsg_cancel(msg->skb, hdr);
1024
1025        return -EMSGSIZE;
1026}
1027
1028static int __tipc_nl_service_range_list(struct tipc_nl_msg *msg,
1029                                        struct tipc_service *sc,
1030                                        u32 *last_lower, u32 *last_key)
1031{
1032        struct service_range *sr;
1033        struct rb_node *n;
1034        int err;
1035
1036        for (n = rb_first(&sc->ranges); n; n = rb_next(n)) {
1037                sr = container_of(n, struct service_range, tree_node);
1038                if (sr->lower < *last_lower)
1039                        continue;
1040                err = __tipc_nl_add_nametable_publ(msg, sc, sr, last_key);
1041                if (err) {
1042                        *last_lower = sr->lower;
1043                        return err;
1044                }
1045        }
1046        *last_lower = 0;
1047        return 0;
1048}
1049
1050static int tipc_nl_service_list(struct net *net, struct tipc_nl_msg *msg,
1051                                u32 *last_type, u32 *last_lower, u32 *last_key)
1052{
1053        struct tipc_net *tn = tipc_net(net);
1054        struct tipc_service *service = NULL;
1055        struct hlist_head *head;
1056        struct tipc_uaddr ua;
1057        int err;
1058        int i;
1059
1060        if (*last_type)
1061                i = hash(*last_type);
1062        else
1063                i = 0;
1064
1065        for (; i < TIPC_NAMETBL_SIZE; i++) {
1066                head = &tn->nametbl->services[i];
1067
1068                if (*last_type ||
1069                    (!i && *last_key && (*last_lower == *last_key))) {
1070                        tipc_uaddr(&ua, TIPC_SERVICE_RANGE, TIPC_NODE_SCOPE,
1071                                   *last_type, *last_lower, *last_lower);
1072                        service = tipc_service_find(net, &ua);
1073                        if (!service)
1074                                return -EPIPE;
1075                } else {
1076                        hlist_for_each_entry_rcu(service, head, service_list)
1077                                break;
1078                        if (!service)
1079                                continue;
1080                }
1081
1082                hlist_for_each_entry_from_rcu(service, service_list) {
1083                        spin_lock_bh(&service->lock);
1084                        err = __tipc_nl_service_range_list(msg, service,
1085                                                           last_lower,
1086                                                           last_key);
1087
1088                        if (err) {
1089                                *last_type = service->type;
1090                                spin_unlock_bh(&service->lock);
1091                                return err;
1092                        }
1093                        spin_unlock_bh(&service->lock);
1094                }
1095                *last_type = 0;
1096        }
1097        return 0;
1098}
1099
1100int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb)
1101{
1102        struct net *net = sock_net(skb->sk);
1103        u32 last_type = cb->args[0];
1104        u32 last_lower = cb->args[1];
1105        u32 last_key = cb->args[2];
1106        int done = cb->args[3];
1107        struct tipc_nl_msg msg;
1108        int err;
1109
1110        if (done)
1111                return 0;
1112
1113        msg.skb = skb;
1114        msg.portid = NETLINK_CB(cb->skb).portid;
1115        msg.seq = cb->nlh->nlmsg_seq;
1116
1117        rcu_read_lock();
1118        err = tipc_nl_service_list(net, &msg, &last_type,
1119                                   &last_lower, &last_key);
1120        if (!err) {
1121                done = 1;
1122        } else if (err != -EMSGSIZE) {
1123                /* We never set seq or call nl_dump_check_consistent() this
1124                 * means that setting prev_seq here will cause the consistence
1125                 * check to fail in the netlink callback handler. Resulting in
1126                 * the NLMSG_DONE message having the NLM_F_DUMP_INTR flag set if
1127                 * we got an error.
1128                 */
1129                cb->prev_seq = 1;
1130        }
1131        rcu_read_unlock();
1132
1133        cb->args[0] = last_type;
1134        cb->args[1] = last_lower;
1135        cb->args[2] = last_key;
1136        cb->args[3] = done;
1137
1138        return skb->len;
1139}
1140
1141struct tipc_dest *tipc_dest_find(struct list_head *l, u32 node, u32 port)
1142{
1143        struct tipc_dest *dst;
1144
1145        list_for_each_entry(dst, l, list) {
1146                if (dst->node == node && dst->port == port)
1147                        return dst;
1148        }
1149        return NULL;
1150}
1151
1152bool tipc_dest_push(struct list_head *l, u32 node, u32 port)
1153{
1154        struct tipc_dest *dst;
1155
1156        if (tipc_dest_find(l, node, port))
1157                return false;
1158
1159        dst = kmalloc(sizeof(*dst), GFP_ATOMIC);
1160        if (unlikely(!dst))
1161                return false;
1162        dst->node = node;
1163        dst->port = port;
1164        list_add(&dst->list, l);
1165        return true;
1166}
1167
1168bool tipc_dest_pop(struct list_head *l, u32 *node, u32 *port)
1169{
1170        struct tipc_dest *dst;
1171
1172        if (list_empty(l))
1173                return false;
1174        dst = list_first_entry(l, typeof(*dst), list);
1175        if (port)
1176                *port = dst->port;
1177        if (node)
1178                *node = dst->node;
1179        list_del(&dst->list);
1180        kfree(dst);
1181        return true;
1182}
1183
1184bool tipc_dest_del(struct list_head *l, u32 node, u32 port)
1185{
1186        struct tipc_dest *dst;
1187
1188        dst = tipc_dest_find(l, node, port);
1189        if (!dst)
1190                return false;
1191        list_del(&dst->list);
1192        kfree(dst);
1193        return true;
1194}
1195
1196void tipc_dest_list_purge(struct list_head *l)
1197{
1198        struct tipc_dest *dst, *tmp;
1199
1200        list_for_each_entry_safe(dst, tmp, l, list) {
1201                list_del(&dst->list);
1202                kfree(dst);
1203        }
1204}
1205
1206int tipc_dest_list_len(struct list_head *l)
1207{
1208        struct tipc_dest *dst;
1209        int i = 0;
1210
1211        list_for_each_entry(dst, l, list) {
1212                i++;
1213        }
1214        return i;
1215}
1216