linux/net/tipc/port.c
<<
>>
Prefs
   1/*
   2 * net/tipc/port.c: TIPC port code
   3 *
   4 * Copyright (c) 1992-2007, Ericsson AB
   5 * Copyright (c) 2004-2008, 2010-2011, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include "core.h"
  38#include "config.h"
  39#include "port.h"
  40#include "name_table.h"
  41
  42/* Connection management: */
  43#define PROBING_INTERVAL 3600000        /* [ms] => 1 h */
  44#define CONFIRMED 0
  45#define PROBING 1
  46
  47#define MAX_REJECT_SIZE 1024
  48
  49static struct sk_buff *msg_queue_head;
  50static struct sk_buff *msg_queue_tail;
  51
  52DEFINE_SPINLOCK(tipc_port_list_lock);
  53static DEFINE_SPINLOCK(queue_lock);
  54
  55static LIST_HEAD(ports);
  56static void port_handle_node_down(unsigned long ref);
  57static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err);
  58static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err);
  59static void port_timeout(unsigned long ref);
  60
  61
  62static u32 port_peernode(struct tipc_port *p_ptr)
  63{
  64        return msg_destnode(&p_ptr->phdr);
  65}
  66
  67static u32 port_peerport(struct tipc_port *p_ptr)
  68{
  69        return msg_destport(&p_ptr->phdr);
  70}
  71
  72/**
  73 * tipc_port_peer_msg - verify message was sent by connected port's peer
  74 *
  75 * Handles cases where the node's network address has changed from
  76 * the default of <0.0.0> to its configured setting.
  77 */
  78int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg)
  79{
  80        u32 peernode;
  81        u32 orignode;
  82
  83        if (msg_origport(msg) != port_peerport(p_ptr))
  84                return 0;
  85
  86        orignode = msg_orignode(msg);
  87        peernode = port_peernode(p_ptr);
  88        return (orignode == peernode) ||
  89                (!orignode && (peernode == tipc_own_addr)) ||
  90                (!peernode && (orignode == tipc_own_addr));
  91}
  92
  93/**
  94 * tipc_multicast - send a multicast message to local and remote destinations
  95 */
  96int tipc_multicast(u32 ref, struct tipc_name_seq const *seq,
  97                   u32 num_sect, struct iovec const *msg_sect,
  98                   unsigned int total_len)
  99{
 100        struct tipc_msg *hdr;
 101        struct sk_buff *buf;
 102        struct sk_buff *ibuf = NULL;
 103        struct tipc_port_list dports = {0, NULL, };
 104        struct tipc_port *oport = tipc_port_deref(ref);
 105        int ext_targets;
 106        int res;
 107
 108        if (unlikely(!oport))
 109                return -EINVAL;
 110
 111        /* Create multicast message */
 112        hdr = &oport->phdr;
 113        msg_set_type(hdr, TIPC_MCAST_MSG);
 114        msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
 115        msg_set_destport(hdr, 0);
 116        msg_set_destnode(hdr, 0);
 117        msg_set_nametype(hdr, seq->type);
 118        msg_set_namelower(hdr, seq->lower);
 119        msg_set_nameupper(hdr, seq->upper);
 120        msg_set_hdr_sz(hdr, MCAST_H_SIZE);
 121        res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
 122                        !oport->user_port, &buf);
 123        if (unlikely(!buf))
 124                return res;
 125
 126        /* Figure out where to send multicast message */
 127        ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
 128                                                TIPC_NODE_SCOPE, &dports);
 129
 130        /* Send message to destinations (duplicate it only if necessary) */
 131        if (ext_targets) {
 132                if (dports.count != 0) {
 133                        ibuf = skb_copy(buf, GFP_ATOMIC);
 134                        if (ibuf == NULL) {
 135                                tipc_port_list_free(&dports);
 136                                kfree_skb(buf);
 137                                return -ENOMEM;
 138                        }
 139                }
 140                res = tipc_bclink_send_msg(buf);
 141                if ((res < 0) && (dports.count != 0))
 142                        kfree_skb(ibuf);
 143        } else {
 144                ibuf = buf;
 145        }
 146
 147        if (res >= 0) {
 148                if (ibuf)
 149                        tipc_port_recv_mcast(ibuf, &dports);
 150        } else {
 151                tipc_port_list_free(&dports);
 152        }
 153        return res;
 154}
 155
 156/**
 157 * tipc_port_recv_mcast - deliver multicast message to all destination ports
 158 *
 159 * If there is no port list, perform a lookup to create one
 160 */
 161void tipc_port_recv_mcast(struct sk_buff *buf, struct tipc_port_list *dp)
 162{
 163        struct tipc_msg *msg;
 164        struct tipc_port_list dports = {0, NULL, };
 165        struct tipc_port_list *item = dp;
 166        int cnt = 0;
 167
 168        msg = buf_msg(buf);
 169
 170        /* Create destination port list, if one wasn't supplied */
 171        if (dp == NULL) {
 172                tipc_nametbl_mc_translate(msg_nametype(msg),
 173                                     msg_namelower(msg),
 174                                     msg_nameupper(msg),
 175                                     TIPC_CLUSTER_SCOPE,
 176                                     &dports);
 177                item = dp = &dports;
 178        }
 179
 180        /* Deliver a copy of message to each destination port */
 181        if (dp->count != 0) {
 182                msg_set_destnode(msg, tipc_own_addr);
 183                if (dp->count == 1) {
 184                        msg_set_destport(msg, dp->ports[0]);
 185                        tipc_port_recv_msg(buf);
 186                        tipc_port_list_free(dp);
 187                        return;
 188                }
 189                for (; cnt < dp->count; cnt++) {
 190                        int index = cnt % PLSIZE;
 191                        struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
 192
 193                        if (b == NULL) {
 194                                pr_warn("Unable to deliver multicast message(s)\n");
 195                                goto exit;
 196                        }
 197                        if ((index == 0) && (cnt != 0))
 198                                item = item->next;
 199                        msg_set_destport(buf_msg(b), item->ports[index]);
 200                        tipc_port_recv_msg(b);
 201                }
 202        }
 203exit:
 204        kfree_skb(buf);
 205        tipc_port_list_free(dp);
 206}
 207
 208/**
 209 * tipc_createport_raw - create a generic TIPC port
 210 *
 211 * Returns pointer to (locked) TIPC port, or NULL if unable to create it
 212 */
 213struct tipc_port *tipc_createport_raw(void *usr_handle,
 214                        u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
 215                        void (*wakeup)(struct tipc_port *),
 216                        const u32 importance)
 217{
 218        struct tipc_port *p_ptr;
 219        struct tipc_msg *msg;
 220        u32 ref;
 221
 222        p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC);
 223        if (!p_ptr) {
 224                pr_warn("Port creation failed, no memory\n");
 225                return NULL;
 226        }
 227        ref = tipc_ref_acquire(p_ptr, &p_ptr->lock);
 228        if (!ref) {
 229                pr_warn("Port creation failed, ref. table exhausted\n");
 230                kfree(p_ptr);
 231                return NULL;
 232        }
 233
 234        p_ptr->usr_handle = usr_handle;
 235        p_ptr->max_pkt = MAX_PKT_DEFAULT;
 236        p_ptr->ref = ref;
 237        INIT_LIST_HEAD(&p_ptr->wait_list);
 238        INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
 239        p_ptr->dispatcher = dispatcher;
 240        p_ptr->wakeup = wakeup;
 241        p_ptr->user_port = NULL;
 242        k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
 243        INIT_LIST_HEAD(&p_ptr->publications);
 244        INIT_LIST_HEAD(&p_ptr->port_list);
 245
 246        /*
 247         * Must hold port list lock while initializing message header template
 248         * to ensure a change to node's own network address doesn't result
 249         * in template containing out-dated network address information
 250         */
 251        spin_lock_bh(&tipc_port_list_lock);
 252        msg = &p_ptr->phdr;
 253        tipc_msg_init(msg, importance, TIPC_NAMED_MSG, NAMED_H_SIZE, 0);
 254        msg_set_origport(msg, ref);
 255        list_add_tail(&p_ptr->port_list, &ports);
 256        spin_unlock_bh(&tipc_port_list_lock);
 257        return p_ptr;
 258}
 259
 260int tipc_deleteport(u32 ref)
 261{
 262        struct tipc_port *p_ptr;
 263        struct sk_buff *buf = NULL;
 264
 265        tipc_withdraw(ref, 0, NULL);
 266        p_ptr = tipc_port_lock(ref);
 267        if (!p_ptr)
 268                return -EINVAL;
 269
 270        tipc_ref_discard(ref);
 271        tipc_port_unlock(p_ptr);
 272
 273        k_cancel_timer(&p_ptr->timer);
 274        if (p_ptr->connected) {
 275                buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
 276                tipc_nodesub_unsubscribe(&p_ptr->subscription);
 277        }
 278        kfree(p_ptr->user_port);
 279
 280        spin_lock_bh(&tipc_port_list_lock);
 281        list_del(&p_ptr->port_list);
 282        list_del(&p_ptr->wait_list);
 283        spin_unlock_bh(&tipc_port_list_lock);
 284        k_term_timer(&p_ptr->timer);
 285        kfree(p_ptr);
 286        tipc_net_route_msg(buf);
 287        return 0;
 288}
 289
 290static int port_unreliable(struct tipc_port *p_ptr)
 291{
 292        return msg_src_droppable(&p_ptr->phdr);
 293}
 294
 295int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
 296{
 297        struct tipc_port *p_ptr;
 298
 299        p_ptr = tipc_port_lock(ref);
 300        if (!p_ptr)
 301                return -EINVAL;
 302        *isunreliable = port_unreliable(p_ptr);
 303        tipc_port_unlock(p_ptr);
 304        return 0;
 305}
 306
 307int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
 308{
 309        struct tipc_port *p_ptr;
 310
 311        p_ptr = tipc_port_lock(ref);
 312        if (!p_ptr)
 313                return -EINVAL;
 314        msg_set_src_droppable(&p_ptr->phdr, (isunreliable != 0));
 315        tipc_port_unlock(p_ptr);
 316        return 0;
 317}
 318
 319static int port_unreturnable(struct tipc_port *p_ptr)
 320{
 321        return msg_dest_droppable(&p_ptr->phdr);
 322}
 323
 324int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
 325{
 326        struct tipc_port *p_ptr;
 327
 328        p_ptr = tipc_port_lock(ref);
 329        if (!p_ptr)
 330                return -EINVAL;
 331        *isunrejectable = port_unreturnable(p_ptr);
 332        tipc_port_unlock(p_ptr);
 333        return 0;
 334}
 335
 336int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
 337{
 338        struct tipc_port *p_ptr;
 339
 340        p_ptr = tipc_port_lock(ref);
 341        if (!p_ptr)
 342                return -EINVAL;
 343        msg_set_dest_droppable(&p_ptr->phdr, (isunrejectable != 0));
 344        tipc_port_unlock(p_ptr);
 345        return 0;
 346}
 347
 348/*
 349 * port_build_proto_msg(): create connection protocol message for port
 350 *
 351 * On entry the port must be locked and connected.
 352 */
 353static struct sk_buff *port_build_proto_msg(struct tipc_port *p_ptr,
 354                                            u32 type, u32 ack)
 355{
 356        struct sk_buff *buf;
 357        struct tipc_msg *msg;
 358
 359        buf = tipc_buf_acquire(INT_H_SIZE);
 360        if (buf) {
 361                msg = buf_msg(buf);
 362                tipc_msg_init(msg, CONN_MANAGER, type, INT_H_SIZE,
 363                              port_peernode(p_ptr));
 364                msg_set_destport(msg, port_peerport(p_ptr));
 365                msg_set_origport(msg, p_ptr->ref);
 366                msg_set_msgcnt(msg, ack);
 367        }
 368        return buf;
 369}
 370
 371int tipc_reject_msg(struct sk_buff *buf, u32 err)
 372{
 373        struct tipc_msg *msg = buf_msg(buf);
 374        struct sk_buff *rbuf;
 375        struct tipc_msg *rmsg;
 376        int hdr_sz;
 377        u32 imp;
 378        u32 data_sz = msg_data_sz(msg);
 379        u32 src_node;
 380        u32 rmsg_sz;
 381
 382        /* discard rejected message if it shouldn't be returned to sender */
 383        if (WARN(!msg_isdata(msg),
 384                 "attempt to reject message with user=%u", msg_user(msg))) {
 385                dump_stack();
 386                goto exit;
 387        }
 388        if (msg_errcode(msg) || msg_dest_droppable(msg))
 389                goto exit;
 390
 391        /*
 392         * construct returned message by copying rejected message header and
 393         * data (or subset), then updating header fields that need adjusting
 394         */
 395        hdr_sz = msg_hdr_sz(msg);
 396        rmsg_sz = hdr_sz + min_t(u32, data_sz, MAX_REJECT_SIZE);
 397
 398        rbuf = tipc_buf_acquire(rmsg_sz);
 399        if (rbuf == NULL)
 400                goto exit;
 401
 402        rmsg = buf_msg(rbuf);
 403        skb_copy_to_linear_data(rbuf, msg, rmsg_sz);
 404
 405        if (msg_connected(rmsg)) {
 406                imp = msg_importance(rmsg);
 407                if (imp < TIPC_CRITICAL_IMPORTANCE)
 408                        msg_set_importance(rmsg, ++imp);
 409        }
 410        msg_set_non_seq(rmsg, 0);
 411        msg_set_size(rmsg, rmsg_sz);
 412        msg_set_errcode(rmsg, err);
 413        msg_set_prevnode(rmsg, tipc_own_addr);
 414        msg_swap_words(rmsg, 4, 5);
 415        if (!msg_short(rmsg))
 416                msg_swap_words(rmsg, 6, 7);
 417
 418        /* send self-abort message when rejecting on a connected port */
 419        if (msg_connected(msg)) {
 420                struct tipc_port *p_ptr = tipc_port_lock(msg_destport(msg));
 421
 422                if (p_ptr) {
 423                        struct sk_buff *abuf = NULL;
 424
 425                        if (p_ptr->connected)
 426                                abuf = port_build_self_abort_msg(p_ptr, err);
 427                        tipc_port_unlock(p_ptr);
 428                        tipc_net_route_msg(abuf);
 429                }
 430        }
 431
 432        /* send returned message & dispose of rejected message */
 433        src_node = msg_prevnode(msg);
 434        if (in_own_node(src_node))
 435                tipc_port_recv_msg(rbuf);
 436        else
 437                tipc_link_send(rbuf, src_node, msg_link_selector(rmsg));
 438exit:
 439        kfree_skb(buf);
 440        return data_sz;
 441}
 442
 443int tipc_port_reject_sections(struct tipc_port *p_ptr, struct tipc_msg *hdr,
 444                              struct iovec const *msg_sect, u32 num_sect,
 445                              unsigned int total_len, int err)
 446{
 447        struct sk_buff *buf;
 448        int res;
 449
 450        res = tipc_msg_build(hdr, msg_sect, num_sect, total_len, MAX_MSG_SIZE,
 451                        !p_ptr->user_port, &buf);
 452        if (!buf)
 453                return res;
 454
 455        return tipc_reject_msg(buf, err);
 456}
 457
 458static void port_timeout(unsigned long ref)
 459{
 460        struct tipc_port *p_ptr = tipc_port_lock(ref);
 461        struct sk_buff *buf = NULL;
 462
 463        if (!p_ptr)
 464                return;
 465
 466        if (!p_ptr->connected) {
 467                tipc_port_unlock(p_ptr);
 468                return;
 469        }
 470
 471        /* Last probe answered ? */
 472        if (p_ptr->probing_state == PROBING) {
 473                buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
 474        } else {
 475                buf = port_build_proto_msg(p_ptr, CONN_PROBE, 0);
 476                p_ptr->probing_state = PROBING;
 477                k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
 478        }
 479        tipc_port_unlock(p_ptr);
 480        tipc_net_route_msg(buf);
 481}
 482
 483
 484static void port_handle_node_down(unsigned long ref)
 485{
 486        struct tipc_port *p_ptr = tipc_port_lock(ref);
 487        struct sk_buff *buf = NULL;
 488
 489        if (!p_ptr)
 490                return;
 491        buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
 492        tipc_port_unlock(p_ptr);
 493        tipc_net_route_msg(buf);
 494}
 495
 496
 497static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err)
 498{
 499        struct sk_buff *buf = port_build_peer_abort_msg(p_ptr, err);
 500
 501        if (buf) {
 502                struct tipc_msg *msg = buf_msg(buf);
 503                msg_swap_words(msg, 4, 5);
 504                msg_swap_words(msg, 6, 7);
 505        }
 506        return buf;
 507}
 508
 509
 510static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err)
 511{
 512        struct sk_buff *buf;
 513        struct tipc_msg *msg;
 514        u32 imp;
 515
 516        if (!p_ptr->connected)
 517                return NULL;
 518
 519        buf = tipc_buf_acquire(BASIC_H_SIZE);
 520        if (buf) {
 521                msg = buf_msg(buf);
 522                memcpy(msg, &p_ptr->phdr, BASIC_H_SIZE);
 523                msg_set_hdr_sz(msg, BASIC_H_SIZE);
 524                msg_set_size(msg, BASIC_H_SIZE);
 525                imp = msg_importance(msg);
 526                if (imp < TIPC_CRITICAL_IMPORTANCE)
 527                        msg_set_importance(msg, ++imp);
 528                msg_set_errcode(msg, err);
 529        }
 530        return buf;
 531}
 532
 533void tipc_port_recv_proto_msg(struct sk_buff *buf)
 534{
 535        struct tipc_msg *msg = buf_msg(buf);
 536        struct tipc_port *p_ptr;
 537        struct sk_buff *r_buf = NULL;
 538        u32 destport = msg_destport(msg);
 539        int wakeable;
 540
 541        /* Validate connection */
 542        p_ptr = tipc_port_lock(destport);
 543        if (!p_ptr || !p_ptr->connected || !tipc_port_peer_msg(p_ptr, msg)) {
 544                r_buf = tipc_buf_acquire(BASIC_H_SIZE);
 545                if (r_buf) {
 546                        msg = buf_msg(r_buf);
 547                        tipc_msg_init(msg, TIPC_HIGH_IMPORTANCE, TIPC_CONN_MSG,
 548                                      BASIC_H_SIZE, msg_orignode(msg));
 549                        msg_set_errcode(msg, TIPC_ERR_NO_PORT);
 550                        msg_set_origport(msg, destport);
 551                        msg_set_destport(msg, msg_origport(msg));
 552                }
 553                if (p_ptr)
 554                        tipc_port_unlock(p_ptr);
 555                goto exit;
 556        }
 557
 558        /* Process protocol message sent by peer */
 559        switch (msg_type(msg)) {
 560        case CONN_ACK:
 561                wakeable = tipc_port_congested(p_ptr) && p_ptr->congested &&
 562                        p_ptr->wakeup;
 563                p_ptr->acked += msg_msgcnt(msg);
 564                if (!tipc_port_congested(p_ptr)) {
 565                        p_ptr->congested = 0;
 566                        if (wakeable)
 567                                p_ptr->wakeup(p_ptr);
 568                }
 569                break;
 570        case CONN_PROBE:
 571                r_buf = port_build_proto_msg(p_ptr, CONN_PROBE_REPLY, 0);
 572                break;
 573        default:
 574                /* CONN_PROBE_REPLY or unrecognized - no action required */
 575                break;
 576        }
 577        p_ptr->probing_state = CONFIRMED;
 578        tipc_port_unlock(p_ptr);
 579exit:
 580        tipc_net_route_msg(r_buf);
 581        kfree_skb(buf);
 582}
 583
 584static int port_print(struct tipc_port *p_ptr, char *buf, int len, int full_id)
 585{
 586        struct publication *publ;
 587        int ret;
 588
 589        if (full_id)
 590                ret = tipc_snprintf(buf, len, "<%u.%u.%u:%u>:",
 591                                    tipc_zone(tipc_own_addr),
 592                                    tipc_cluster(tipc_own_addr),
 593                                    tipc_node(tipc_own_addr), p_ptr->ref);
 594        else
 595                ret = tipc_snprintf(buf, len, "%-10u:", p_ptr->ref);
 596
 597        if (p_ptr->connected) {
 598                u32 dport = port_peerport(p_ptr);
 599                u32 destnode = port_peernode(p_ptr);
 600
 601                ret += tipc_snprintf(buf + ret, len - ret,
 602                                     " connected to <%u.%u.%u:%u>",
 603                                     tipc_zone(destnode),
 604                                     tipc_cluster(destnode),
 605                                     tipc_node(destnode), dport);
 606                if (p_ptr->conn_type != 0)
 607                        ret += tipc_snprintf(buf + ret, len - ret,
 608                                             " via {%u,%u}", p_ptr->conn_type,
 609                                             p_ptr->conn_instance);
 610        } else if (p_ptr->published) {
 611                ret += tipc_snprintf(buf + ret, len - ret, " bound to");
 612                list_for_each_entry(publ, &p_ptr->publications, pport_list) {
 613                        if (publ->lower == publ->upper)
 614                                ret += tipc_snprintf(buf + ret, len - ret,
 615                                                     " {%u,%u}", publ->type,
 616                                                     publ->lower);
 617                        else
 618                                ret += tipc_snprintf(buf + ret, len - ret,
 619                                                     " {%u,%u,%u}", publ->type,
 620                                                     publ->lower, publ->upper);
 621                }
 622        }
 623        ret += tipc_snprintf(buf + ret, len - ret, "\n");
 624        return ret;
 625}
 626
 627struct sk_buff *tipc_port_get_ports(void)
 628{
 629        struct sk_buff *buf;
 630        struct tlv_desc *rep_tlv;
 631        char *pb;
 632        int pb_len;
 633        struct tipc_port *p_ptr;
 634        int str_len = 0;
 635
 636        buf = tipc_cfg_reply_alloc(TLV_SPACE(ULTRA_STRING_MAX_LEN));
 637        if (!buf)
 638                return NULL;
 639        rep_tlv = (struct tlv_desc *)buf->data;
 640        pb = TLV_DATA(rep_tlv);
 641        pb_len = ULTRA_STRING_MAX_LEN;
 642
 643        spin_lock_bh(&tipc_port_list_lock);
 644        list_for_each_entry(p_ptr, &ports, port_list) {
 645                spin_lock_bh(p_ptr->lock);
 646                str_len += port_print(p_ptr, pb, pb_len, 0);
 647                spin_unlock_bh(p_ptr->lock);
 648        }
 649        spin_unlock_bh(&tipc_port_list_lock);
 650        str_len += 1;   /* for "\0" */
 651        skb_put(buf, TLV_SPACE(str_len));
 652        TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
 653
 654        return buf;
 655}
 656
 657void tipc_port_reinit(void)
 658{
 659        struct tipc_port *p_ptr;
 660        struct tipc_msg *msg;
 661
 662        spin_lock_bh(&tipc_port_list_lock);
 663        list_for_each_entry(p_ptr, &ports, port_list) {
 664                msg = &p_ptr->phdr;
 665                msg_set_prevnode(msg, tipc_own_addr);
 666                msg_set_orignode(msg, tipc_own_addr);
 667        }
 668        spin_unlock_bh(&tipc_port_list_lock);
 669}
 670
 671
 672/*
 673 *  port_dispatcher_sigh(): Signal handler for messages destinated
 674 *                          to the tipc_port interface.
 675 */
 676static void port_dispatcher_sigh(void *dummy)
 677{
 678        struct sk_buff *buf;
 679
 680        spin_lock_bh(&queue_lock);
 681        buf = msg_queue_head;
 682        msg_queue_head = NULL;
 683        spin_unlock_bh(&queue_lock);
 684
 685        while (buf) {
 686                struct tipc_port *p_ptr;
 687                struct user_port *up_ptr;
 688                struct tipc_portid orig;
 689                struct tipc_name_seq dseq;
 690                void *usr_handle;
 691                int connected;
 692                int peer_invalid;
 693                int published;
 694                u32 message_type;
 695
 696                struct sk_buff *next = buf->next;
 697                struct tipc_msg *msg = buf_msg(buf);
 698                u32 dref = msg_destport(msg);
 699
 700                message_type = msg_type(msg);
 701                if (message_type > TIPC_DIRECT_MSG)
 702                        goto reject;    /* Unsupported message type */
 703
 704                p_ptr = tipc_port_lock(dref);
 705                if (!p_ptr)
 706                        goto reject;    /* Port deleted while msg in queue */
 707
 708                orig.ref = msg_origport(msg);
 709                orig.node = msg_orignode(msg);
 710                up_ptr = p_ptr->user_port;
 711                usr_handle = up_ptr->usr_handle;
 712                connected = p_ptr->connected;
 713                peer_invalid = connected && !tipc_port_peer_msg(p_ptr, msg);
 714                published = p_ptr->published;
 715
 716                if (unlikely(msg_errcode(msg)))
 717                        goto err;
 718
 719                switch (message_type) {
 720
 721                case TIPC_CONN_MSG:{
 722                                tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
 723                                u32 dsz;
 724
 725                                tipc_port_unlock(p_ptr);
 726                                if (unlikely(!cb))
 727                                        goto reject;
 728                                if (unlikely(!connected)) {
 729                                        if (tipc_connect2port(dref, &orig))
 730                                                goto reject;
 731                                } else if (peer_invalid)
 732                                        goto reject;
 733                                dsz = msg_data_sz(msg);
 734                                if (unlikely(dsz &&
 735                                             (++p_ptr->conn_unacked >=
 736                                              TIPC_FLOW_CONTROL_WIN)))
 737                                        tipc_acknowledge(dref,
 738                                                         p_ptr->conn_unacked);
 739                                skb_pull(buf, msg_hdr_sz(msg));
 740                                cb(usr_handle, dref, &buf, msg_data(msg), dsz);
 741                                break;
 742                        }
 743                case TIPC_DIRECT_MSG:{
 744                                tipc_msg_event cb = up_ptr->msg_cb;
 745
 746                                tipc_port_unlock(p_ptr);
 747                                if (unlikely(!cb || connected))
 748                                        goto reject;
 749                                skb_pull(buf, msg_hdr_sz(msg));
 750                                cb(usr_handle, dref, &buf, msg_data(msg),
 751                                   msg_data_sz(msg), msg_importance(msg),
 752                                   &orig);
 753                                break;
 754                        }
 755                case TIPC_MCAST_MSG:
 756                case TIPC_NAMED_MSG:{
 757                                tipc_named_msg_event cb = up_ptr->named_msg_cb;
 758
 759                                tipc_port_unlock(p_ptr);
 760                                if (unlikely(!cb || connected || !published))
 761                                        goto reject;
 762                                dseq.type =  msg_nametype(msg);
 763                                dseq.lower = msg_nameinst(msg);
 764                                dseq.upper = (message_type == TIPC_NAMED_MSG)
 765                                        ? dseq.lower : msg_nameupper(msg);
 766                                skb_pull(buf, msg_hdr_sz(msg));
 767                                cb(usr_handle, dref, &buf, msg_data(msg),
 768                                   msg_data_sz(msg), msg_importance(msg),
 769                                   &orig, &dseq);
 770                                break;
 771                        }
 772                }
 773                if (buf)
 774                        kfree_skb(buf);
 775                buf = next;
 776                continue;
 777err:
 778                switch (message_type) {
 779
 780                case TIPC_CONN_MSG:{
 781                                tipc_conn_shutdown_event cb =
 782                                        up_ptr->conn_err_cb;
 783
 784                                tipc_port_unlock(p_ptr);
 785                                if (!cb || !connected || peer_invalid)
 786                                        break;
 787                                tipc_disconnect(dref);
 788                                skb_pull(buf, msg_hdr_sz(msg));
 789                                cb(usr_handle, dref, &buf, msg_data(msg),
 790                                   msg_data_sz(msg), msg_errcode(msg));
 791                                break;
 792                        }
 793                case TIPC_DIRECT_MSG:{
 794                                tipc_msg_err_event cb = up_ptr->err_cb;
 795
 796                                tipc_port_unlock(p_ptr);
 797                                if (!cb || connected)
 798                                        break;
 799                                skb_pull(buf, msg_hdr_sz(msg));
 800                                cb(usr_handle, dref, &buf, msg_data(msg),
 801                                   msg_data_sz(msg), msg_errcode(msg), &orig);
 802                                break;
 803                        }
 804                case TIPC_MCAST_MSG:
 805                case TIPC_NAMED_MSG:{
 806                                tipc_named_msg_err_event cb =
 807                                        up_ptr->named_err_cb;
 808
 809                                tipc_port_unlock(p_ptr);
 810                                if (!cb || connected)
 811                                        break;
 812                                dseq.type =  msg_nametype(msg);
 813                                dseq.lower = msg_nameinst(msg);
 814                                dseq.upper = (message_type == TIPC_NAMED_MSG)
 815                                        ? dseq.lower : msg_nameupper(msg);
 816                                skb_pull(buf, msg_hdr_sz(msg));
 817                                cb(usr_handle, dref, &buf, msg_data(msg),
 818                                   msg_data_sz(msg), msg_errcode(msg), &dseq);
 819                                break;
 820                        }
 821                }
 822                if (buf)
 823                        kfree_skb(buf);
 824                buf = next;
 825                continue;
 826reject:
 827                tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
 828                buf = next;
 829        }
 830}
 831
 832/*
 833 *  port_dispatcher(): Dispatcher for messages destinated
 834 *  to the tipc_port interface. Called with port locked.
 835 */
 836static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
 837{
 838        buf->next = NULL;
 839        spin_lock_bh(&queue_lock);
 840        if (msg_queue_head) {
 841                msg_queue_tail->next = buf;
 842                msg_queue_tail = buf;
 843        } else {
 844                msg_queue_tail = msg_queue_head = buf;
 845                tipc_k_signal((Handler)port_dispatcher_sigh, 0);
 846        }
 847        spin_unlock_bh(&queue_lock);
 848        return 0;
 849}
 850
 851/*
 852 * Wake up port after congestion: Called with port locked
 853 */
 854static void port_wakeup_sh(unsigned long ref)
 855{
 856        struct tipc_port *p_ptr;
 857        struct user_port *up_ptr;
 858        tipc_continue_event cb = NULL;
 859        void *uh = NULL;
 860
 861        p_ptr = tipc_port_lock(ref);
 862        if (p_ptr) {
 863                up_ptr = p_ptr->user_port;
 864                if (up_ptr) {
 865                        cb = up_ptr->continue_event_cb;
 866                        uh = up_ptr->usr_handle;
 867                }
 868                tipc_port_unlock(p_ptr);
 869        }
 870        if (cb)
 871                cb(uh, ref);
 872}
 873
 874
 875static void port_wakeup(struct tipc_port *p_ptr)
 876{
 877        tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref);
 878}
 879
 880void tipc_acknowledge(u32 ref, u32 ack)
 881{
 882        struct tipc_port *p_ptr;
 883        struct sk_buff *buf = NULL;
 884
 885        p_ptr = tipc_port_lock(ref);
 886        if (!p_ptr)
 887                return;
 888        if (p_ptr->connected) {
 889                p_ptr->conn_unacked -= ack;
 890                buf = port_build_proto_msg(p_ptr, CONN_ACK, ack);
 891        }
 892        tipc_port_unlock(p_ptr);
 893        tipc_net_route_msg(buf);
 894}
 895
 896/*
 897 * tipc_createport(): user level call.
 898 */
 899int tipc_createport(void *usr_handle,
 900                    unsigned int importance,
 901                    tipc_msg_err_event error_cb,
 902                    tipc_named_msg_err_event named_error_cb,
 903                    tipc_conn_shutdown_event conn_error_cb,
 904                    tipc_msg_event msg_cb,
 905                    tipc_named_msg_event named_msg_cb,
 906                    tipc_conn_msg_event conn_msg_cb,
 907                    tipc_continue_event continue_event_cb, /* May be zero */
 908                    u32 *portref)
 909{
 910        struct user_port *up_ptr;
 911        struct tipc_port *p_ptr;
 912
 913        up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
 914        if (!up_ptr) {
 915                pr_warn("Port creation failed, no memory\n");
 916                return -ENOMEM;
 917        }
 918        p_ptr = tipc_createport_raw(NULL, port_dispatcher, port_wakeup,
 919                                    importance);
 920        if (!p_ptr) {
 921                kfree(up_ptr);
 922                return -ENOMEM;
 923        }
 924
 925        p_ptr->user_port = up_ptr;
 926        up_ptr->usr_handle = usr_handle;
 927        up_ptr->ref = p_ptr->ref;
 928        up_ptr->err_cb = error_cb;
 929        up_ptr->named_err_cb = named_error_cb;
 930        up_ptr->conn_err_cb = conn_error_cb;
 931        up_ptr->msg_cb = msg_cb;
 932        up_ptr->named_msg_cb = named_msg_cb;
 933        up_ptr->conn_msg_cb = conn_msg_cb;
 934        up_ptr->continue_event_cb = continue_event_cb;
 935        *portref = p_ptr->ref;
 936        tipc_port_unlock(p_ptr);
 937        return 0;
 938}
 939
 940int tipc_portimportance(u32 ref, unsigned int *importance)
 941{
 942        struct tipc_port *p_ptr;
 943
 944        p_ptr = tipc_port_lock(ref);
 945        if (!p_ptr)
 946                return -EINVAL;
 947        *importance = (unsigned int)msg_importance(&p_ptr->phdr);
 948        tipc_port_unlock(p_ptr);
 949        return 0;
 950}
 951
 952int tipc_set_portimportance(u32 ref, unsigned int imp)
 953{
 954        struct tipc_port *p_ptr;
 955
 956        if (imp > TIPC_CRITICAL_IMPORTANCE)
 957                return -EINVAL;
 958
 959        p_ptr = tipc_port_lock(ref);
 960        if (!p_ptr)
 961                return -EINVAL;
 962        msg_set_importance(&p_ptr->phdr, (u32)imp);
 963        tipc_port_unlock(p_ptr);
 964        return 0;
 965}
 966
 967
 968int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
 969{
 970        struct tipc_port *p_ptr;
 971        struct publication *publ;
 972        u32 key;
 973        int res = -EINVAL;
 974
 975        p_ptr = tipc_port_lock(ref);
 976        if (!p_ptr)
 977                return -EINVAL;
 978
 979        if (p_ptr->connected)
 980                goto exit;
 981        key = ref + p_ptr->pub_count + 1;
 982        if (key == ref) {
 983                res = -EADDRINUSE;
 984                goto exit;
 985        }
 986        publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
 987                                    scope, p_ptr->ref, key);
 988        if (publ) {
 989                list_add(&publ->pport_list, &p_ptr->publications);
 990                p_ptr->pub_count++;
 991                p_ptr->published = 1;
 992                res = 0;
 993        }
 994exit:
 995        tipc_port_unlock(p_ptr);
 996        return res;
 997}
 998
 999int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1000{
1001        struct tipc_port *p_ptr;
1002        struct publication *publ;
1003        struct publication *tpubl;
1004        int res = -EINVAL;
1005
1006        p_ptr = tipc_port_lock(ref);
1007        if (!p_ptr)
1008                return -EINVAL;
1009        if (!seq) {
1010                list_for_each_entry_safe(publ, tpubl,
1011                                         &p_ptr->publications, pport_list) {
1012                        tipc_nametbl_withdraw(publ->type, publ->lower,
1013                                              publ->ref, publ->key);
1014                }
1015                res = 0;
1016        } else {
1017                list_for_each_entry_safe(publ, tpubl,
1018                                         &p_ptr->publications, pport_list) {
1019                        if (publ->scope != scope)
1020                                continue;
1021                        if (publ->type != seq->type)
1022                                continue;
1023                        if (publ->lower != seq->lower)
1024                                continue;
1025                        if (publ->upper != seq->upper)
1026                                break;
1027                        tipc_nametbl_withdraw(publ->type, publ->lower,
1028                                              publ->ref, publ->key);
1029                        res = 0;
1030                        break;
1031                }
1032        }
1033        if (list_empty(&p_ptr->publications))
1034                p_ptr->published = 0;
1035        tipc_port_unlock(p_ptr);
1036        return res;
1037}
1038
1039int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1040{
1041        struct tipc_port *p_ptr;
1042        struct tipc_msg *msg;
1043        int res = -EINVAL;
1044
1045        p_ptr = tipc_port_lock(ref);
1046        if (!p_ptr)
1047                return -EINVAL;
1048        if (p_ptr->published || p_ptr->connected)
1049                goto exit;
1050        if (!peer->ref)
1051                goto exit;
1052
1053        msg = &p_ptr->phdr;
1054        msg_set_destnode(msg, peer->node);
1055        msg_set_destport(msg, peer->ref);
1056        msg_set_type(msg, TIPC_CONN_MSG);
1057        msg_set_lookup_scope(msg, 0);
1058        msg_set_hdr_sz(msg, SHORT_H_SIZE);
1059
1060        p_ptr->probing_interval = PROBING_INTERVAL;
1061        p_ptr->probing_state = CONFIRMED;
1062        p_ptr->connected = 1;
1063        k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
1064
1065        tipc_nodesub_subscribe(&p_ptr->subscription, peer->node,
1066                          (void *)(unsigned long)ref,
1067                          (net_ev_handler)port_handle_node_down);
1068        res = 0;
1069exit:
1070        tipc_port_unlock(p_ptr);
1071        p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
1072        return res;
1073}
1074
1075/**
1076 * tipc_disconnect_port - disconnect port from peer
1077 *
1078 * Port must be locked.
1079 */
1080int tipc_disconnect_port(struct tipc_port *tp_ptr)
1081{
1082        int res;
1083
1084        if (tp_ptr->connected) {
1085                tp_ptr->connected = 0;
1086                /* let timer expire on it's own to avoid deadlock! */
1087                tipc_nodesub_unsubscribe(&tp_ptr->subscription);
1088                res = 0;
1089        } else {
1090                res = -ENOTCONN;
1091        }
1092        return res;
1093}
1094
1095/*
1096 * tipc_disconnect(): Disconnect port form peer.
1097 *                    This is a node local operation.
1098 */
1099int tipc_disconnect(u32 ref)
1100{
1101        struct tipc_port *p_ptr;
1102        int res;
1103
1104        p_ptr = tipc_port_lock(ref);
1105        if (!p_ptr)
1106                return -EINVAL;
1107        res = tipc_disconnect_port(p_ptr);
1108        tipc_port_unlock(p_ptr);
1109        return res;
1110}
1111
1112/*
1113 * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect
1114 */
1115int tipc_shutdown(u32 ref)
1116{
1117        struct tipc_port *p_ptr;
1118        struct sk_buff *buf = NULL;
1119
1120        p_ptr = tipc_port_lock(ref);
1121        if (!p_ptr)
1122                return -EINVAL;
1123
1124        buf = port_build_peer_abort_msg(p_ptr, TIPC_CONN_SHUTDOWN);
1125        tipc_port_unlock(p_ptr);
1126        tipc_net_route_msg(buf);
1127        return tipc_disconnect(ref);
1128}
1129
1130/**
1131 * tipc_port_recv_msg - receive message from lower layer and deliver to port user
1132 */
1133int tipc_port_recv_msg(struct sk_buff *buf)
1134{
1135        struct tipc_port *p_ptr;
1136        struct tipc_msg *msg = buf_msg(buf);
1137        u32 destport = msg_destport(msg);
1138        u32 dsz = msg_data_sz(msg);
1139        u32 err;
1140
1141        /* forward unresolved named message */
1142        if (unlikely(!destport)) {
1143                tipc_net_route_msg(buf);
1144                return dsz;
1145        }
1146
1147        /* validate destination & pass to port, otherwise reject message */
1148        p_ptr = tipc_port_lock(destport);
1149        if (likely(p_ptr)) {
1150                err = p_ptr->dispatcher(p_ptr, buf);
1151                tipc_port_unlock(p_ptr);
1152                if (likely(!err))
1153                        return dsz;
1154        } else {
1155                err = TIPC_ERR_NO_PORT;
1156        }
1157
1158        return tipc_reject_msg(buf, err);
1159}
1160
1161/*
1162 *  tipc_port_recv_sections(): Concatenate and deliver sectioned
1163 *                        message for this node.
1164 */
1165static int tipc_port_recv_sections(struct tipc_port *sender, unsigned int num_sect,
1166                                   struct iovec const *msg_sect,
1167                                   unsigned int total_len)
1168{
1169        struct sk_buff *buf;
1170        int res;
1171
1172        res = tipc_msg_build(&sender->phdr, msg_sect, num_sect, total_len,
1173                        MAX_MSG_SIZE, !sender->user_port, &buf);
1174        if (likely(buf))
1175                tipc_port_recv_msg(buf);
1176        return res;
1177}
1178
1179/**
1180 * tipc_send - send message sections on connection
1181 */
1182int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect,
1183              unsigned int total_len)
1184{
1185        struct tipc_port *p_ptr;
1186        u32 destnode;
1187        int res;
1188
1189        p_ptr = tipc_port_deref(ref);
1190        if (!p_ptr || !p_ptr->connected)
1191                return -EINVAL;
1192
1193        p_ptr->congested = 1;
1194        if (!tipc_port_congested(p_ptr)) {
1195                destnode = port_peernode(p_ptr);
1196                if (likely(!in_own_node(destnode)))
1197                        res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1198                                                           total_len, destnode);
1199                else
1200                        res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1201                                                      total_len);
1202
1203                if (likely(res != -ELINKCONG)) {
1204                        p_ptr->congested = 0;
1205                        if (res > 0)
1206                                p_ptr->sent++;
1207                        return res;
1208                }
1209        }
1210        if (port_unreliable(p_ptr)) {
1211                p_ptr->congested = 0;
1212                return total_len;
1213        }
1214        return -ELINKCONG;
1215}
1216
1217/**
1218 * tipc_send2name - send message sections to port name
1219 */
1220int tipc_send2name(u32 ref, struct tipc_name const *name, unsigned int domain,
1221                   unsigned int num_sect, struct iovec const *msg_sect,
1222                   unsigned int total_len)
1223{
1224        struct tipc_port *p_ptr;
1225        struct tipc_msg *msg;
1226        u32 destnode = domain;
1227        u32 destport;
1228        int res;
1229
1230        p_ptr = tipc_port_deref(ref);
1231        if (!p_ptr || p_ptr->connected)
1232                return -EINVAL;
1233
1234        msg = &p_ptr->phdr;
1235        msg_set_type(msg, TIPC_NAMED_MSG);
1236        msg_set_hdr_sz(msg, NAMED_H_SIZE);
1237        msg_set_nametype(msg, name->type);
1238        msg_set_nameinst(msg, name->instance);
1239        msg_set_lookup_scope(msg, tipc_addr_scope(domain));
1240        destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
1241        msg_set_destnode(msg, destnode);
1242        msg_set_destport(msg, destport);
1243
1244        if (likely(destport || destnode)) {
1245                if (likely(in_own_node(destnode)))
1246                        res = tipc_port_recv_sections(p_ptr, num_sect,
1247                                                      msg_sect, total_len);
1248                else if (tipc_own_addr)
1249                        res = tipc_link_send_sections_fast(p_ptr, msg_sect,
1250                                                           num_sect, total_len,
1251                                                           destnode);
1252                else
1253                        res = tipc_port_reject_sections(p_ptr, msg, msg_sect,
1254                                                        num_sect, total_len,
1255                                                        TIPC_ERR_NO_NODE);
1256                if (likely(res != -ELINKCONG)) {
1257                        if (res > 0)
1258                                p_ptr->sent++;
1259                        return res;
1260                }
1261                if (port_unreliable(p_ptr)) {
1262                        return total_len;
1263                }
1264                return -ELINKCONG;
1265        }
1266        return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1267                                         total_len, TIPC_ERR_NO_NAME);
1268}
1269
1270/**
1271 * tipc_send2port - send message sections to port identity
1272 */
1273int tipc_send2port(u32 ref, struct tipc_portid const *dest,
1274                   unsigned int num_sect, struct iovec const *msg_sect,
1275                   unsigned int total_len)
1276{
1277        struct tipc_port *p_ptr;
1278        struct tipc_msg *msg;
1279        int res;
1280
1281        p_ptr = tipc_port_deref(ref);
1282        if (!p_ptr || p_ptr->connected)
1283                return -EINVAL;
1284
1285        msg = &p_ptr->phdr;
1286        msg_set_type(msg, TIPC_DIRECT_MSG);
1287        msg_set_lookup_scope(msg, 0);
1288        msg_set_destnode(msg, dest->node);
1289        msg_set_destport(msg, dest->ref);
1290        msg_set_hdr_sz(msg, BASIC_H_SIZE);
1291
1292        if (in_own_node(dest->node))
1293                res =  tipc_port_recv_sections(p_ptr, num_sect, msg_sect,
1294                                               total_len);
1295        else if (tipc_own_addr)
1296                res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1297                                                   total_len, dest->node);
1298        else
1299                res = tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1300                                                total_len, TIPC_ERR_NO_NODE);
1301        if (likely(res != -ELINKCONG)) {
1302                if (res > 0)
1303                        p_ptr->sent++;
1304                return res;
1305        }
1306        if (port_unreliable(p_ptr)) {
1307                return total_len;
1308        }
1309        return -ELINKCONG;
1310}
1311
1312/**
1313 * tipc_send_buf2port - send message buffer to port identity
1314 */
1315int tipc_send_buf2port(u32 ref, struct tipc_portid const *dest,
1316               struct sk_buff *buf, unsigned int dsz)
1317{
1318        struct tipc_port *p_ptr;
1319        struct tipc_msg *msg;
1320        int res;
1321
1322        p_ptr = (struct tipc_port *)tipc_ref_deref(ref);
1323        if (!p_ptr || p_ptr->connected)
1324                return -EINVAL;
1325
1326        msg = &p_ptr->phdr;
1327        msg_set_type(msg, TIPC_DIRECT_MSG);
1328        msg_set_destnode(msg, dest->node);
1329        msg_set_destport(msg, dest->ref);
1330        msg_set_hdr_sz(msg, BASIC_H_SIZE);
1331        msg_set_size(msg, BASIC_H_SIZE + dsz);
1332        if (skb_cow(buf, BASIC_H_SIZE))
1333                return -ENOMEM;
1334
1335        skb_push(buf, BASIC_H_SIZE);
1336        skb_copy_to_linear_data(buf, msg, BASIC_H_SIZE);
1337
1338        if (in_own_node(dest->node))
1339                res = tipc_port_recv_msg(buf);
1340        else
1341                res = tipc_send_buf_fast(buf, dest->node);
1342        if (likely(res != -ELINKCONG)) {
1343                if (res > 0)
1344                        p_ptr->sent++;
1345                return res;
1346        }
1347        if (port_unreliable(p_ptr))
1348                return dsz;
1349        return -ELINKCONG;
1350}
1351