linux/net/tipc/port.c
<<
>>
Prefs
   1/*
   2 * net/tipc/port.c: TIPC port code
   3 *
   4 * Copyright (c) 1992-2007, Ericsson AB
   5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include "core.h"
  38#include "config.h"
  39#include "port.h"
  40#include "name_table.h"
  41
  42/* Connection management: */
  43#define PROBING_INTERVAL 3600000        /* [ms] => 1 h */
  44#define CONFIRMED 0
  45#define PROBING 1
  46
  47#define MAX_REJECT_SIZE 1024
  48
  49DEFINE_SPINLOCK(tipc_port_list_lock);
  50
  51static LIST_HEAD(ports);
  52static void port_handle_node_down(unsigned long ref);
  53static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err);
  54static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err);
  55static void port_timeout(unsigned long ref);
  56
  57
  58static u32 port_peernode(struct tipc_port *p_ptr)
  59{
  60        return msg_destnode(&p_ptr->phdr);
  61}
  62
  63static u32 port_peerport(struct tipc_port *p_ptr)
  64{
  65        return msg_destport(&p_ptr->phdr);
  66}
  67
  68/**
  69 * tipc_port_peer_msg - verify message was sent by connected port's peer
  70 *
  71 * Handles cases where the node's network address has changed from
  72 * the default of <0.0.0> to its configured setting.
  73 */
  74int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg)
  75{
  76        u32 peernode;
  77        u32 orignode;
  78
  79        if (msg_origport(msg) != port_peerport(p_ptr))
  80                return 0;
  81
  82        orignode = msg_orignode(msg);
  83        peernode = port_peernode(p_ptr);
  84        return (orignode == peernode) ||
  85                (!orignode && (peernode == tipc_own_addr)) ||
  86                (!peernode && (orignode == tipc_own_addr));
  87}
  88
  89/**
  90 * tipc_multicast - send a multicast message to local and remote destinations
  91 */
  92int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
  93                   u32 num_sect, struct iovec const *msg_sect,
  94                   unsigned int total_len)
  95{
  96        struct tipc_msg *hdr;
  97        struct sk_buff *buf;
  98        struct sk_buff *ibuf = NULL;
  99        struct tipc_port_list dports = {0, NULL, };
 100        struct tipc_port *oport = tipc_port_deref(ref);
 101        int ext_targets;
 102        int res;
 103
 104        if (unlikely(!oport))
 105                return -EINVAL;
 106
 107        /* Create multicast message */
 108        hdr = &oport->phdr;
 109        msg_set_type(hdr, TIPC_MCAST_MSG);
 110        msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
 111        msg_set_destport(hdr, 0);
 112        msg_set_destnode(hdr, 0);
 113        msg_set_nametype(hdr, seq->type);
 114        msg_set_namelower(hdr, seq->lower);
 115        msg_set_nameupper(hdr, seq->upper);
 116        msg_set_hdr_sz(hdr, MCAST_H_SIZE);
 117        res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
 118                             &buf);
 119        if (unlikely(!buf))
 120                return res;
 121
 122        /* Figure out where to send multicast message */
 123        ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
 124                                                TIPC_NODE_SCOPE, &dports);
 125
 126        /* Send message to destinations (duplicate it only if necessary) */
 127        if (ext_targets) {
 128                if (dports.count != 0) {
 129                        ibuf = skb_copy(buf, GFP_ATOMIC);
 130                        if (ibuf == NULL) {
 131                                tipc_port_list_free(&dports);
 132                                kfree_skb(buf);
 133                                return -ENOMEM;
 134                        }
 135                }
 136                res = tipc_bclink_send_msg(buf);
 137                if ((res < 0) && (dports.count != 0))
 138                        kfree_skb(ibuf);
 139        } else {
 140                ibuf = buf;
 141        }
 142
 143        if (res >= 0) {
 144                if (ibuf)
 145                        tipc_port_recv_mcast(ibuf, &dports);
 146        } else {
 147                tipc_port_list_free(&dports);
 148        }
 149        return res;
 150}
 151
 152/**
 153 * tipc_port_recv_mcast - deliver multicast message to all destination ports
 154 *
 155 * If there is no port list, perform a lookup to create one
 156 */
 157void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp)
 158{
 159        struct tipc_msg *msg;
 160        struct tipc_port_list dports = {0, NULL, };
 161        struct tipc_port_list *item = dp;
 162        int cnt = 0;
 163
 164        msg = buf_msg(buf);
 165
 166        /* Create destination port list, if one wasn't supplied */
 167        if (dp == NULL) {
 168                tipc_nametbl_mc_translate(msg_nametype(msg),
 169                                     msg_namelower(msg),
 170                                     msg_nameupper(msg),
 171                                     TIPC_CLUSTER_SCOPE,
 172                                     &dports);
 173                item = dp = &dports;
 174        }
 175
 176        /* Deliver a copy of message to each destination port */
 177        if (dp->count != 0) {
 178                msg_set_destnode(msg, tipc_own_addr);
 179                if (dp->count == 1) {
 180                        msg_set_destport(msg, dp->ports[0]);
 181                        tipc_port_recv_msg(buf);
 182                        tipc_port_list_free(dp);
 183                        return;
 184                }
 185                for (; cnt < dp->count; cnt++) {
 186                        int index = cnt % PLSIZE;
 187                        struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
 188
 189                        if (b == NULL) {
 190                                pr_warn("Unable to deliver multicast message(s)\n");
 191                                goto exit;
 192                        }
 193                        if ((index == 0) && (cnt != 0))
 194                                item = item->next;
 195                        msg_set_destport(buf_msg(b), item->ports[index]);
 196                        tipc_port_recv_msg(b);
 197                }
 198        }
 199exit:
 200        kfree_skb(buf);
 201        tipc_port_list_free(dp);
 202}
 203
 204/**
 205 * tipc_createport - create a generic TIPC port
 206 *
 207 * Returns pointer to (locked) TIPC port, or NULL if unable to create it
 208 */
 209struct tipc_port *tipc_createport(struct sock *sk,
 210                                  u32 (*dispatcher)(struct tipc_port *,
 211                                  struct sk_buff *),
 212                                  void (*wakeup)(struct tipc_port *),
 213                                  const u32 importance)
 214{
 215        struct tipc_port *p_ptr;
 216        struct tipc_msg *msg;
 217        u32 ref;
 218
 219        p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC);
 220        if (!p_ptr) {
 221                pr_warn("Port creation failed, no memory\n");
 222                return NULL;
 223        }
 224        ref = tipc_ref_acquire(p_ptr, &p_ptr->lock);
 225        if (!ref) {
 226                pr_warn("Port creation failed, ref. table exhausted\n");
 227                kfree(p_ptr);
 228                return NULL;
 229        }
 230
 231        p_ptr->sk = sk;
 232        p_ptr->max_pkt = MAX_PKT_DEFAULT;
 233        p_ptr->ref = ref;
 234        INIT_LIST_HEAD(&p_ptr->wait_list);
 235        INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
 236        p_ptr->dispatcher = dispatcher;
 237        p_ptr->wakeup = wakeup;
 238        k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
 239        INIT_LIST_HEAD(&p_ptr->publications);
 240        INIT_LIST_HEAD(&p_ptr->port_list);
 241
 242        /*
 243         * Must hold port list lock while initializing message header template
 244         * to ensure a change to node's own network address doesn't result
 245         * in template containing out-dated network address information
 246         */
 247        spin_lock_bh(&tipc_port_list_lock);
 248        msg = &p_ptr->phdr;
 249        tipc_msg_init(msg, importance, TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
 250        msg_set_origport(msg, ref);
 251        list_add_tail(&p_ptr->port_list, &ports);
 252        spin_unlock_bh(&tipc_port_list_lock);
 253        return p_ptr;
 254}
 255
 256int tipc_deleteport(u32 ref)
 257{
 258        struct tipc_port *p_ptr;
 259        struct sk_buff *buf = NULL;
 260
 261        tipc_withdraw(ref, 0, NULL);
 262        p_ptr = tipc_port_lock(ref);
 263        if (!p_ptr)
 264                return -EINVAL;
 265
 266        tipc_ref_discard(ref);
 267        tipc_port_unlock(p_ptr);
 268
 269        k_cancel_timer(&p_ptr->timer);
 270        if (p_ptr->connected) {
 271                buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
 272                tipc_nodesub_unsubscribe(&p_ptr->subscription);
 273        }
 274
 275        spin_lock_bh(&tipc_port_list_lock);
 276        list_del(&p_ptr->port_list);
 277        list_del(&p_ptr->wait_list);
 278        spin_unlock_bh(&tipc_port_list_lock);
 279        k_term_timer(&p_ptr->timer);
 280        kfree(p_ptr);
 281        tipc_net_route_msg(buf);
 282        return 0;
 283}
 284
 285static int port_unreliable(struct tipc_port *p_ptr)
 286{
 287        return msg_src_droppable(&p_ptr->phdr);
 288}
 289
 290int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
 291{
 292        struct tipc_port *p_ptr;
 293
 294        p_ptr = tipc_port_lock(ref);
 295        if (!p_ptr)
 296                return -EINVAL;
 297        *isunreliable = port_unreliable(p_ptr);
 298        tipc_port_unlock(p_ptr);
 299        return 0;
 300}
 301
 302int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
 303{
 304        struct tipc_port *p_ptr;
 305
 306        p_ptr = tipc_port_lock(ref);
 307        if (!p_ptr)
 308                return -EINVAL;
 309        msg_set_src_droppable(&p_ptr->phdr, (isunreliable != 0));
 310        tipc_port_unlock(p_ptr);
 311        return 0;
 312}
 313
 314static int port_unreturnable(struct tipc_port *p_ptr)
 315{
 316        return msg_dest_droppable(&p_ptr->phdr);
 317}
 318
 319int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
 320{
 321        struct tipc_port *p_ptr;
 322
 323        p_ptr = tipc_port_lock(ref);
 324        if (!p_ptr)
 325                return -EINVAL;
 326        *isunrejectable = port_unreturnable(p_ptr);
 327        tipc_port_unlock(p_ptr);
 328        return 0;
 329}
 330
 331int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
 332{
 333        struct tipc_port *p_ptr;
 334
 335        p_ptr = tipc_port_lock(ref);
 336        if (!p_ptr)
 337                return -EINVAL;
 338        msg_set_dest_droppable(&p_ptr->phdr, (isunrejectable != 0));
 339        tipc_port_unlock(p_ptr);
 340        return 0;
 341}
 342
 343/*
 344 * port_build_proto_msg(): create connection protocol message for port
 345 *
 346 * On entry the port must be locked and connected.
 347 */
 348static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr,
 349                                            u32 type, u32 ack)
 350{
 351        struct sk_buff *buf;
 352        struct tipc_msg *msg;
 353
 354        buf = tipc_buf_acquire(INT_H_SIZE);
 355        if (buf) {
 356                msg = buf_msg(buf);
 357                tipc_msg_init(msg, CONN_MANAGER, type, INT_H_SIZE,
 358                              port_peernode(p_ptr));
 359                msg_set_destport(msg, port_peerport(p_ptr));
 360                msg_set_origport(msg, p_ptr->ref);
 361                msg_set_msgcnt(msg, ack);
 362        }
 363        return buf;
 364}
 365
 366int tipc_reject_msg(struct sk_buff *buf, u32 err)
 367{
 368        struct tipc_msg *msg = buf_msg(buf);
 369        struct sk_buff *rbuf;
 370        struct tipc_msg *rmsg;
 371        int hdr_sz;
 372        u32 imp;
 373        u32 data_sz = msg_data_sz(msg);
 374        u32 src_node;
 375        u32 rmsg_sz;
 376
 377        /* discard rejected message if it shouldn't be returned to sender */
 378        if (WARN(!msg_isdata(msg),
 379                 "attempt to reject message with user=%u", msg_user(msg))) {
 380                dump_stack();
 381                goto exit;
 382        }
 383        if (msg_errcode(msg) || msg_dest_droppable(msg))
 384                goto exit;
 385
 386        /*
 387         * construct returned message by copying rejected message header and
 388         * data (or subset), then updating header fields that need adjusting
 389         */
 390        hdr_sz = msg_hdr_sz(msg);
 391        rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE);
 392
 393        rbuf = tipc_buf_acquire(rmsg_sz);
 394        if (rbuf == NULL)
 395                goto exit;
 396
 397        rmsg = buf_msg(rbuf);
 398        skb_copy_to_linear_data(rbuf, msg, rmsg_sz);
 399
 400        if (msg_connected(rmsg)) {
 401                imp = msg_importance(rmsg);
 402                if (imp < TIPC_CRITICAL_IMPORTANCE)
 403                        msg_set_importance(rmsg, ++imp);
 404        }
 405        msg_set_non_seq(rmsg, 0);
 406        msg_set_size(rmsg, rmsg_sz);
 407        msg_set_errcode(rmsg, err);
 408        msg_set_prevnode(rmsg, tipc_own_addr);
 409        msg_swap_words(rmsg, 4, 5);
 410        if (!msg_short(rmsg))
 411                msg_swap_words(rmsg, 6, 7);
 412
 413        /* send self-abort message when rejecting on a connected port */
 414        if (msg_connected(msg)) {
 415                struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
 416
 417                if (p_ptr) {
 418                        struct sk_buff *abuf = NULL;
 419
 420                        if (p_ptr->connected)
 421                                abuf = port_build_self_abort_msg(p_ptr, err);
 422                        tipc_port_unlock(p_ptr);
 423                        tipc_net_route_msg(abuf);
 424                }
 425        }
 426
 427        /* send returned message & dispose of rejected message */
 428        src_node = msg_prevnode(msg);
 429        if (in_own_node(src_node))
 430                tipc_port_recv_msg(rbuf);
 431        else
 432                tipc_link_send(rbuf, src_node, msg_link_selector(rmsg));
 433exit:
 434        kfree_skb(buf);
 435        return data_sz;
 436}
 437
 438int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr,
 439                              struct iovec const *msg_sect, u32 num_sect,
 440                              unsigned int total_len, int err)
 441{
 442        struct sk_buff *buf;
 443        int res;
 444
 445        res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
 446                             &buf);
 447        if (!buf)
 448                return res;
 449
 450        return tipc_reject_msg(buf, err);
 451}
 452
 453static void port_timeout(unsigned long ref)
 454{
 455        struct tipc_port *p_ptr = tipc_port_lock(ref);
 456        struct sk_buff *buf = NULL;
 457
 458        if (!p_ptr)
 459                return;
 460
 461        if (!p_ptr->connected) {
 462                tipc_port_unlock(p_ptr);
 463                return;
 464        }
 465
 466        /* Last probe answered ? */
 467        if (p_ptr->probing_state == PROBING) {
 468                buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
 469        } else {
 470                buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0);
 471                p_ptr->probing_state = PROBING;
 472                k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
 473        }
 474        tipc_port_unlock(p_ptr);
 475        tipc_net_route_msg(buf);
 476}
 477
 478
 479static void port_handle_node_down(unsigned long ref)
 480{
 481        struct tipc_port *p_ptr = tipc_port_lock(ref);
 482        struct sk_buff *buf = NULL;
 483
 484        if (!p_ptr)
 485                return;
 486        buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
 487        tipc_port_unlock(p_ptr);
 488        tipc_net_route_msg(buf);
 489}
 490
 491
 492static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err)
 493{
 494        struct sk_buff *buf = port_build_peer_abort_msg(p_ptr, err);
 495
 496        if (buf) {
 497                struct tipc_msg *msg = buf_msg(buf);
 498                msg_swap_words(msg, 4, 5);
 499                msg_swap_words(msg, 6, 7);
 500        }
 501        return buf;
 502}
 503
 504
 505static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err)
 506{
 507        struct sk_buff *buf;
 508        struct tipc_msg *msg;
 509        u32 imp;
 510
 511        if (!p_ptr->connected)
 512                return NULL;
 513
 514        buf = tipc_buf_acquire(BASIC_H_SIZE);
 515        if (buf) {
 516                msg = buf_msg(buf);
 517                memcpy(msg, &p_ptr->phdr, BASIC_H_SIZE);
 518                msg_set_hdr_sz(msg, BASIC_H_SIZE);
 519                msg_set_size(msg, BASIC_H_SIZE);
 520                imp = msg_importance(msg);
 521                if (imp < TIPC_CRITICAL_IMPORTANCE)
 522                        msg_set_importance(msg, ++imp);
 523                msg_set_errcode(msg, err);
 524        }
 525        return buf;
 526}
 527
 528void tipc_port_recv_proto_msg(struct sk_buff *buf)
 529{
 530        struct tipc_msg *msg = buf_msg(buf);
 531        struct tipc_port *p_ptr;
 532        struct sk_buff *r_buf = NULL;
 533        u32 destport = msg_destport(msg);
 534        int wakeable;
 535
 536        /* Validate connection */
 537        p_ptr = tipc_port_lock(destport);
 538        if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) {
 539                r_buf = tipc_buf_acquire(BASIC_H_SIZE);
 540                if (r_buf) {
 541                        msg = buf_msg(r_buf);
 542                        tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG,
 543                                      BASIC_H_SIZE, msg_orignode(msg));
 544                        msg_set_errcode(msg, TIPC_ERR_NO_PORT);
 545                        msg_set_origport(msg, destport);
 546                        msg_set_destport(msg, msg_origport(msg));
 547                }
 548                if (p_ptr)
 549                        tipc_port_unlock(p_ptr);
 550                goto exit;
 551        }
 552
 553        /* Process protocol message sent by peer */
 554        switch (msg_type(msg)) {
 555        case CONN_ACK:
 556                wakeable = tipc_port_congested(p_ptr) && p_ptr->congested &&
 557                        p_ptr->wakeup;
 558                p_ptr->acked += msg_msgcnt(msg);
 559                if (!tipc_port_congested(p_ptr)) {
 560                        p_ptr->congested = 0;
 561                        if (wakeable)
 562                                p_ptr->wakeup(p_ptr);
 563                }
 564                break;
 565        case CONN_PROBE:
 566                r_buf = port_build_proto_msg(p_ptr, CONN_PROBE_REPLY, 0);
 567                break;
 568        default:
 569                /* CONN_PROBE_REPLY or unrecognized - no action required */
 570                break;
 571        }
 572        p_ptr->probing_state = CONFIRMED;
 573        tipc_port_unlock(p_ptr);
 574exit:
 575        tipc_net_route_msg(r_buf);
 576        kfree_skb(buf);
 577}
 578
 579static int port_print(struct tipc_port *p_ptr, char *buf, int len, int full_id)
 580{
 581        struct publication *publ;
 582        int ret;
 583
 584        if (full_id)
 585                ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:",
 586                                    tipc_zone(tipc_own_addr),
 587                                    tipc_cluster(tipc_own_addr),
 588                                    tipc_node(tipc_own_addr), p_ptr->ref);
 589        else
 590                ret = tipc_snprintf(buf, len, "%-10u:", p_ptr->ref);
 591
 592        if (p_ptr->connected) {
 593                u32 dport = port_peerport(p_ptr);
 594                u32 destnode = port_peernode(p_ptr);
 595
 596                ret += tipc_snprintf(buf + ret, len - ret,
 597                                     " connected to <%u.%u.%u:%u>",
 598                                     tipc_zone(destnode),
 599                                     tipc_cluster(destnode),
 600                                     tipc_node(destnode), dport);
 601                if (p_ptr->conn_type != 0)
 602                        ret += tipc_snprintf(buf + ret, len - ret,
 603                                             " via {%u,%u}", p_ptr->conn_type,
 604                                             p_ptr->conn_instance);
 605        } else if (p_ptr->published) {
 606                ret += tipc_snprintf(buf + ret, len - ret, " bound to");
 607                list_for_each_entry(publ, &p_ptr->publications, pport_list) {
 608                        if (publ->lower == publ->upper)
 609                                ret += tipc_snprintf(buf + ret, len - ret,
 610                                                     " {%u,%u}", publ->type,
 611                                                     publ->lower);
 612                        else
 613                                ret += tipc_snprintf(buf + ret, len - ret,
 614                                                     " {%u,%u,%u}", publ->type,
 615                                                     publ->lower, publ->upper);
 616                }
 617        }
 618        ret += tipc_snprintf(buf + ret, len - ret, "\n");
 619        return ret;
 620}
 621
 622struct sk_buff *tipc_port_get_ports(void)
 623{
 624        struct sk_buff *buf;
 625        struct tlv_desc *rep_tlv;
 626        char *pb;
 627        int pb_len;
 628        struct tipc_port *p_ptr;
 629        int str_len = 0;
 630
 631        buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
 632        if (!buf)
 633                return NULL;
 634        rep_tlv = (struct tlv_desc *)buf->data;
 635        pb = TLV_DATA(rep_tlv);
 636        pb_len = ULTRA_STRING_MAX_LEN;
 637
 638        spin_lock_bh(&tipc_port_list_lock);
 639        list_for_each_entry(p_ptr, &ports, port_list) {
 640                spin_lock_bh(p_ptr->lock);
 641                str_len += port_print(p_ptr, pb, pb_len, 0);
 642                spin_unlock_bh(p_ptr->lock);
 643        }
 644        spin_unlock_bh(&tipc_port_list_lock);
 645        str_len += 1;   /* for "\0" */
 646        skb_put(buf, TLV_SPACE(str_len));
 647        TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
 648
 649        return buf;
 650}
 651
 652void tipc_port_reinit(void)
 653{
 654        struct tipc_port *p_ptr;
 655        struct tipc_msg *msg;
 656
 657        spin_lock_bh(&tipc_port_list_lock);
 658        list_for_each_entry(p_ptr, &ports, port_list) {
 659                msg = &p_ptr->phdr;
 660                msg_set_prevnode(msg, tipc_own_addr);
 661                msg_set_orignode(msg, tipc_own_addr);
 662        }
 663        spin_unlock_bh(&tipc_port_list_lock);
 664}
 665
 666void tipc_acknowledge(u32 ref, u32 ack)
 667{
 668        struct tipc_port *p_ptr;
 669        struct sk_buff *buf = NULL;
 670
 671        p_ptr = tipc_port_lock(ref);
 672        if (!p_ptr)
 673                return;
 674        if (p_ptr->connected) {
 675                p_ptr->conn_unacked -= ack;
 676                buf = port_build_proto_msg(p_ptr, CONN_ACK, ack);
 677        }
 678        tipc_port_unlock(p_ptr);
 679        tipc_net_route_msg(buf);
 680}
 681
 682int tipc_portimportance(u32 ref, unsigned int *importance)
 683{
 684        struct tipc_port *p_ptr;
 685
 686        p_ptr = tipc_port_lock(ref);
 687        if (!p_ptr)
 688                return -EINVAL;
 689        *importance = (unsigned int)msg_importance(&p_ptr->phdr);
 690        tipc_port_unlock(p_ptr);
 691        return 0;
 692}
 693
 694int tipc_set_portimportance(u32 ref, unsigned int imp)
 695{
 696        struct tipc_port *p_ptr;
 697
 698        if (imp > TIPC_CRITICAL_IMPORTANCE)
 699                return -EINVAL;
 700
 701        p_ptr = tipc_port_lock(ref);
 702        if (!p_ptr)
 703                return -EINVAL;
 704        msg_set_importance(&p_ptr->phdr, (u32)imp);
 705        tipc_port_unlock(p_ptr);
 706        return 0;
 707}
 708
 709
 710int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
 711{
 712        struct tipc_port *p_ptr;
 713        struct publication *publ;
 714        u32 key;
 715        int res = -EINVAL;
 716
 717        p_ptr = tipc_port_lock(ref);
 718        if (!p_ptr)
 719                return -EINVAL;
 720
 721        if (p_ptr->connected)
 722                goto exit;
 723        key = ref + p_ptr->pub_count + 1;
 724        if (key == ref) {
 725                res = -EADDRINUSE;
 726                goto exit;
 727        }
 728        publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
 729                                    scope, p_ptr->ref, key);
 730        if (publ) {
 731                list_add(&publ->pport_list, &p_ptr->publications);
 732                p_ptr->pub_count++;
 733                p_ptr->published = 1;
 734                res = 0;
 735        }
 736exit:
 737        tipc_port_unlock(p_ptr);
 738        return res;
 739}
 740
 741int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
 742{
 743        struct tipc_port *p_ptr;
 744        struct publication *publ;
 745        struct publication *tpubl;
 746        int res = -EINVAL;
 747
 748        p_ptr = tipc_port_lock(ref);
 749        if (!p_ptr)
 750                return -EINVAL;
 751        if (!seq) {
 752                list_for_each_entry_safe(publ, tpubl,
 753                                         &p_ptr->publications, pport_list) {
 754                        tipc_nametbl_withdraw(publ->type, publ->lower,
 755                                              publ->ref, publ->key);
 756                }
 757                res = 0;
 758        } else {
 759                list_for_each_entry_safe(publ, tpubl,
 760                                         &p_ptr->publications, pport_list) {
 761                        if (publ->scope != scope)
 762                                continue;
 763                        if (publ->type != seq->type)
 764                                continue;
 765                        if (publ->lower != seq->lower)
 766                                continue;
 767                        if (publ->upper != seq->upper)
 768                                break;
 769                        tipc_nametbl_withdraw(publ->type, publ->lower,
 770                                              publ->ref, publ->key);
 771                        res = 0;
 772                        break;
 773                }
 774        }
 775        if (list_empty(&p_ptr->publications))
 776                p_ptr->published = 0;
 777        tipc_port_unlock(p_ptr);
 778        return res;
 779}
 780
 781int tipc_connect(u32 ref, struct tipc_portid const *peer)
 782{
 783        struct tipc_port *p_ptr;
 784        int res;
 785
 786        p_ptr = tipc_port_lock(ref);
 787        if (!p_ptr)
 788                return -EINVAL;
 789        res = __tipc_connect(ref, p_ptr, peer);
 790        tipc_port_unlock(p_ptr);
 791        return res;
 792}
 793
 794/*
 795 * __tipc_connect - connect to a remote peer
 796 *
 797 * Port must be locked.
 798 */
 799int __tipc_connect(u32 ref, struct tipc_port *p_ptr,
 800                        struct tipc_portid const *peer)
 801{
 802        struct tipc_msg *msg;
 803        int res = -EINVAL;
 804
 805        if (p_ptr->published || p_ptr->connected)
 806                goto exit;
 807        if (!peer->ref)
 808                goto exit;
 809
 810        msg = &p_ptr->phdr;
 811        msg_set_destnode(msg, peer->node);
 812        msg_set_destport(msg, peer->ref);
 813        msg_set_type(msg, TIPC_CONN_MSG);
 814        msg_set_lookup_scope(msg, 0);
 815        msg_set_hdr_sz(msg, SHORT_H_SIZE);
 816
 817        p_ptr->probing_interval = PROBING_INTERVAL;
 818        p_ptr->probing_state = CONFIRMED;
 819        p_ptr->connected = 1;
 820        k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
 821
 822        tipc_nodesub_subscribe(&p_ptr->subscription, peer->node,
 823                          (void *)(unsigned long)ref,
 824                          (net_ev_handler)port_handle_node_down);
 825        res = 0;
 826exit:
 827        p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
 828        return res;
 829}
 830
 831/*
 832 * __tipc_disconnect - disconnect port from peer
 833 *
 834 * Port must be locked.
 835 */
 836int __tipc_disconnect(struct tipc_port *tp_ptr)
 837{
 838        int res;
 839
 840        if (tp_ptr->connected) {
 841                tp_ptr->connected = 0;
 842                /* let timer expire on it's own to avoid deadlock! */
 843                tipc_nodesub_unsubscribe(&tp_ptr->subscription);
 844                res = 0;
 845        } else {
 846                res = -ENOTCONN;
 847        }
 848        return res;
 849}
 850
 851/*
 852 * tipc_disconnect(): Disconnect port form peer.
 853 *                    This is a node local operation.
 854 */
 855int tipc_disconnect(u32 ref)
 856{
 857        struct tipc_port *p_ptr;
 858        int res;
 859
 860        p_ptr = tipc_port_lock(ref);
 861        if (!p_ptr)
 862                return -EINVAL;
 863        res = __tipc_disconnect(p_ptr);
 864        tipc_port_unlock(p_ptr);
 865        return res;
 866}
 867
 868/*
 869 * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect
 870 */
 871int tipc_shutdown(u32 ref)
 872{
 873        struct tipc_port *p_ptr;
 874        struct sk_buff *buf = NULL;
 875
 876        p_ptr = tipc_port_lock(ref);
 877        if (!p_ptr)
 878                return -EINVAL;
 879
 880        buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);
 881        tipc_port_unlock(p_ptr);
 882        tipc_net_route_msg(buf);
 883        return tipc_disconnect(ref);
 884}
 885
 886/**
 887 * tipc_port_recv_msg - receive message from lower layer and deliver to port user
 888 */
 889int tipc_port_recv_msg(struct sk_buff *buf)
 890{
 891        struct tipc_port *p_ptr;
 892        struct tipc_msg *msg = buf_msg(buf);
 893        u32 destport = msg_destport(msg);
 894        u32 dsz = msg_data_sz(msg);
 895        u32 err;
 896
 897        /* forward unresolved named message */
 898        if (unlikely(!destport)) {
 899                tipc_net_route_msg(buf);
 900                return dsz;
 901        }
 902
 903        /* validate destination & pass to port, otherwise reject message */
 904        p_ptr = tipc_port_lock(destport);
 905        if (likely(p_ptr)) {
 906                err = p_ptr->dispatcher(p_ptr, buf);
 907                tipc_port_unlock(p_ptr);
 908                if (likely(!err))
 909                        return dsz;
 910        } else {
 911                err = TIPC_ERR_NO_PORT;
 912        }
 913
 914        return tipc_reject_msg(buf, err);
 915}
 916
 917/*
 918 *  tipc_port_recv_sections(): Concatenate and deliver sectioned
 919 *                        message for this node.
 920 */
 921static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_sect,
 922                                   struct iovec const *msg_sect,
 923                                   unsigned int total_len)
 924{
 925        struct sk_buff *buf;
 926        int res;
 927
 928        res = tipc_msg_build(&sender->phdr, msg_sect, num_sect, total_len,
 929                             MAX_MSG_SIZE, &buf);
 930        if (likely(buf))
 931                tipc_port_recv_msg(buf);
 932        return res;
 933}
 934
 935/**
 936 * tipc_send - send message sections on connection
 937 */
 938int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect,
 939              unsigned int total_len)
 940{
 941        struct tipc_port *p_ptr;
 942        u32 destnode;
 943        int res;
 944
 945        p_ptr = tipc_port_deref(ref);
 946        if (!p_ptr || !p_ptr->connected)
 947                return -EINVAL;
 948
 949        p_ptr->congested = 1;
 950        if (!tipc_port_congested(p_ptr)) {
 951                destnode = port_peernode(p_ptr);
 952                if (likely(!in_own_node(destnode)))
 953                        res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
 954                                                           total_len, destnode);
 955                else
 956                        res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
 957                                                      total_len);
 958
 959                if (likely(res != -ELINKCONG)) {
 960                        p_ptr->congested = 0;
 961                        if (res > 0)
 962                                p_ptr->sent++;
 963                        return res;
 964                }
 965        }
 966        if (port_unreliable(p_ptr)) {
 967                p_ptr->congested = 0;
 968                return total_len;
 969        }
 970        return -ELINKCONG;
 971}
 972
 973/**
 974 * tipc_send2name - send message sections to port name
 975 */
 976int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
 977                   unsigned int num_sect, struct iovec const *msg_sect,
 978                   unsigned int total_len)
 979{
 980        struct tipc_port *p_ptr;
 981        struct tipc_msg *msg;
 982        u32 destnode = domain;
 983        u32 destport;
 984        int res;
 985
 986        p_ptr = tipc_port_deref(ref);
 987        if (!p_ptr || p_ptr->connected)
 988                return -EINVAL;
 989
 990        msg = &p_ptr->phdr;
 991        msg_set_type(msg, TIPC_NAMED_MSG);
 992        msg_set_hdr_sz(msg, NAMED_H_SIZE);
 993        msg_set_nametype(msg, name->type);
 994        msg_set_nameinst(msg, name->instance);
 995        msg_set_lookup_scope(msg, tipc_addr_scope(domain));
 996        destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
 997        msg_set_destnode(msg, destnode);
 998        msg_set_destport(msg, destport);
 999
1000        if (likely(destport || destnode)) {
1001                if (likely(in_own_node(destnode)))
1002                        res = tipc_port_recv_sections(p_ptr, num_sect,
1003                                                      msg_sect, total_len);
1004                else if (tipc_own_addr)
1005                        res = tipc_link_send_sections_fast(p_ptr, msg_sect,
1006                                                           num_sect, total_len,
1007                                                           destnode);
1008                else
1009                        res = tipc_port_reject_sections(p_ptr, msg, msg_sect,
1010                                                        num_sect, total_len,
1011                                                        TIPC_ERR_NO_NODE);
1012                if (likely(res != -ELINKCONG)) {
1013                        if (res > 0)
1014                                p_ptr->sent++;
1015                        return res;
1016                }
1017                if (port_unreliable(p_ptr)) {
1018                        return total_len;
1019                }
1020                return -ELINKCONG;
1021        }
1022        return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1023                                         total_len, TIPC_ERR_NO_NAME);
1024}
1025
1026/**
1027 * tipc_send2port - send message sections to port identity
1028 */
1029int tipc_send2port(u32 ref, struct tipc_portid const *dest,
1030                   unsigned int num_sect, struct iovec const *msg_sect,
1031                   unsigned int total_len)
1032{
1033        struct tipc_port *p_ptr;
1034        struct tipc_msg *msg;
1035        int res;
1036
1037        p_ptr = tipc_port_deref(ref);
1038        if (!p_ptr || p_ptr->connected)
1039                return -EINVAL;
1040
1041        msg = &p_ptr->phdr;
1042        msg_set_type(msg, TIPC_DIRECT_MSG);
1043        msg_set_lookup_scope(msg, 0);
1044        msg_set_destnode(msg, dest->node);
1045        msg_set_destport(msg, dest->ref);
1046        msg_set_hdr_sz(msg, BASIC_H_SIZE);
1047
1048        if (in_own_node(dest->node))
1049                res =  tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1050                                               total_len);
1051        else if (tipc_own_addr)
1052                res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1053                                                   total_len, dest->node);
1054        else
1055                res = tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1056                                                total_len, TIPC_ERR_NO_NODE);
1057        if (likely(res != -ELINKCONG)) {
1058                if (res > 0)
1059                        p_ptr->sent++;
1060                return res;
1061        }
1062        if (port_unreliable(p_ptr)) {
1063                return total_len;
1064        }
1065        return -ELINKCONG;
1066}
1067