linux/net/tipc/port.c
<<
>>
Prefs
   1/*
   2 * net/tipc/port.c: TIPC port code
   3 *
   4 * Copyright (c) 1992-2007, Ericsson AB
   5 * Copyright (c) 2004-2008, Wind River Systems
   6 * All rights reserved.
   7 *
   8 * Redistribution and use in source and binary forms, with or without
   9 * modification, are permitted provided that the following conditions are met:
  10 *
  11 * 1. Redistributions of source code must retain the above copyright
  12 *    notice, this list of conditions and the following disclaimer.
  13 * 2. Redistributions in binary form must reproduce the above copyright
  14 *    notice, this list of conditions and the following disclaimer in the
  15 *    documentation and/or other materials provided with the distribution.
  16 * 3. Neither the names of the copyright holders nor the names of its
  17 *    contributors may be used to endorse or promote products derived from
  18 *    this software without specific prior written permission.
  19 *
  20 * Alternatively, this software may be distributed under the terms of the
  21 * GNU General Public License ("GPL") version 2 as published by the Free
  22 * Software Foundation.
  23 *
  24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  34 * POSSIBILITY OF SUCH DAMAGE.
  35 */
  36
  37#include "core.h"
  38#include "config.h"
  39#include "dbg.h"
  40#include "port.h"
  41#include "addr.h"
  42#include "link.h"
  43#include "node.h"
  44#include "name_table.h"
  45#include "user_reg.h"
  46#include "msg.h"
  47#include "bcast.h"
  48
  49/* Connection management: */
  50#define PROBING_INTERVAL 3600000        /* [ms] => 1 h */
  51#define CONFIRMED 0
  52#define PROBING 1
  53
  54#define MAX_REJECT_SIZE 1024
  55
  56static struct sk_buff *msg_queue_head = NULL;
  57static struct sk_buff *msg_queue_tail = NULL;
  58
  59DEFINE_SPINLOCK(tipc_port_list_lock);
  60static DEFINE_SPINLOCK(queue_lock);
  61
  62static LIST_HEAD(ports);
  63static void port_handle_node_down(unsigned long ref);
  64static struct sk_buff* port_build_self_abort_msg(struct port *,u32 err);
  65static struct sk_buff* port_build_peer_abort_msg(struct port *,u32 err);
  66static void port_timeout(unsigned long ref);
  67
  68
  69static u32 port_peernode(struct port *p_ptr)
  70{
  71        return msg_destnode(&p_ptr->publ.phdr);
  72}
  73
  74static u32 port_peerport(struct port *p_ptr)
  75{
  76        return msg_destport(&p_ptr->publ.phdr);
  77}
  78
  79static u32 port_out_seqno(struct port *p_ptr)
  80{
  81        return msg_transp_seqno(&p_ptr->publ.phdr);
  82}
  83
  84static void port_incr_out_seqno(struct port *p_ptr)
  85{
  86        struct tipc_msg *m = &p_ptr->publ.phdr;
  87
  88        if (likely(!msg_routed(m)))
  89                return;
  90        msg_set_transp_seqno(m, (msg_transp_seqno(m) + 1));
  91}
  92
  93/**
  94 * tipc_multicast - send a multicast message to local and remote destinations
  95 */
  96
  97int tipc_multicast(u32 ref, struct tipc_name_seq const *seq, u32 domain,
  98                   u32 num_sect, struct iovec const *msg_sect)
  99{
 100        struct tipc_msg *hdr;
 101        struct sk_buff *buf;
 102        struct sk_buff *ibuf = NULL;
 103        struct port_list dports = {0, NULL, };
 104        struct port *oport = tipc_port_deref(ref);
 105        int ext_targets;
 106        int res;
 107
 108        if (unlikely(!oport))
 109                return -EINVAL;
 110
 111        /* Create multicast message */
 112
 113        hdr = &oport->publ.phdr;
 114        msg_set_type(hdr, TIPC_MCAST_MSG);
 115        msg_set_nametype(hdr, seq->type);
 116        msg_set_namelower(hdr, seq->lower);
 117        msg_set_nameupper(hdr, seq->upper);
 118        msg_set_hdr_sz(hdr, MCAST_H_SIZE);
 119        res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE,
 120                        !oport->user_port, &buf);
 121        if (unlikely(!buf))
 122                return res;
 123
 124        /* Figure out where to send multicast message */
 125
 126        ext_targets = tipc_nametbl_mc_translate(seq->type, seq->lower, seq->upper,
 127                                                TIPC_NODE_SCOPE, &dports);
 128
 129        /* Send message to destinations (duplicate it only if necessary) */
 130
 131        if (ext_targets) {
 132                if (dports.count != 0) {
 133                        ibuf = skb_copy(buf, GFP_ATOMIC);
 134                        if (ibuf == NULL) {
 135                                tipc_port_list_free(&dports);
 136                                buf_discard(buf);
 137                                return -ENOMEM;
 138                        }
 139                }
 140                res = tipc_bclink_send_msg(buf);
 141                if ((res < 0) && (dports.count != 0)) {
 142                        buf_discard(ibuf);
 143                }
 144        } else {
 145                ibuf = buf;
 146        }
 147
 148        if (res >= 0) {
 149                if (ibuf)
 150                        tipc_port_recv_mcast(ibuf, &dports);
 151        } else {
 152                tipc_port_list_free(&dports);
 153        }
 154        return res;
 155}
 156
 157/**
 158 * tipc_port_recv_mcast - deliver multicast message to all destination ports
 159 *
 160 * If there is no port list, perform a lookup to create one
 161 */
 162
 163void tipc_port_recv_mcast(struct sk_buff *buf, struct port_list *dp)
 164{
 165        struct tipc_msg* msg;
 166        struct port_list dports = {0, NULL, };
 167        struct port_list *item = dp;
 168        int cnt = 0;
 169
 170        msg = buf_msg(buf);
 171
 172        /* Create destination port list, if one wasn't supplied */
 173
 174        if (dp == NULL) {
 175                tipc_nametbl_mc_translate(msg_nametype(msg),
 176                                     msg_namelower(msg),
 177                                     msg_nameupper(msg),
 178                                     TIPC_CLUSTER_SCOPE,
 179                                     &dports);
 180                item = dp = &dports;
 181        }
 182
 183        /* Deliver a copy of message to each destination port */
 184
 185        if (dp->count != 0) {
 186                if (dp->count == 1) {
 187                        msg_set_destport(msg, dp->ports[0]);
 188                        tipc_port_recv_msg(buf);
 189                        tipc_port_list_free(dp);
 190                        return;
 191                }
 192                for (; cnt < dp->count; cnt++) {
 193                        int index = cnt % PLSIZE;
 194                        struct sk_buff *b = skb_clone(buf, GFP_ATOMIC);
 195
 196                        if (b == NULL) {
 197                                warn("Unable to deliver multicast message(s)\n");
 198                                msg_dbg(msg, "LOST:");
 199                                goto exit;
 200                        }
 201                        if ((index == 0) && (cnt != 0)) {
 202                                item = item->next;
 203                        }
 204                        msg_set_destport(buf_msg(b),item->ports[index]);
 205                        tipc_port_recv_msg(b);
 206                }
 207        }
 208exit:
 209        buf_discard(buf);
 210        tipc_port_list_free(dp);
 211}
 212
 213/**
 214 * tipc_createport_raw - create a generic TIPC port
 215 *
 216 * Returns pointer to (locked) TIPC port, or NULL if unable to create it
 217 */
 218
 219struct tipc_port *tipc_createport_raw(void *usr_handle,
 220                        u32 (*dispatcher)(struct tipc_port *, struct sk_buff *),
 221                        void (*wakeup)(struct tipc_port *),
 222                        const u32 importance)
 223{
 224        struct port *p_ptr;
 225        struct tipc_msg *msg;
 226        u32 ref;
 227
 228        p_ptr = kzalloc(sizeof(*p_ptr), GFP_ATOMIC);
 229        if (!p_ptr) {
 230                warn("Port creation failed, no memory\n");
 231                return NULL;
 232        }
 233        ref = tipc_ref_acquire(p_ptr, &p_ptr->publ.lock);
 234        if (!ref) {
 235                warn("Port creation failed, reference table exhausted\n");
 236                kfree(p_ptr);
 237                return NULL;
 238        }
 239
 240        p_ptr->publ.usr_handle = usr_handle;
 241        p_ptr->publ.max_pkt = MAX_PKT_DEFAULT;
 242        p_ptr->publ.ref = ref;
 243        msg = &p_ptr->publ.phdr;
 244        msg_init(msg, importance, TIPC_NAMED_MSG, LONG_H_SIZE, 0);
 245        msg_set_origport(msg, ref);
 246        p_ptr->last_in_seqno = 41;
 247        p_ptr->sent = 1;
 248        INIT_LIST_HEAD(&p_ptr->wait_list);
 249        INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
 250        p_ptr->congested_link = NULL;
 251        p_ptr->dispatcher = dispatcher;
 252        p_ptr->wakeup = wakeup;
 253        p_ptr->user_port = NULL;
 254        k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
 255        spin_lock_bh(&tipc_port_list_lock);
 256        INIT_LIST_HEAD(&p_ptr->publications);
 257        INIT_LIST_HEAD(&p_ptr->port_list);
 258        list_add_tail(&p_ptr->port_list, &ports);
 259        spin_unlock_bh(&tipc_port_list_lock);
 260        return &(p_ptr->publ);
 261}
 262
 263int tipc_deleteport(u32 ref)
 264{
 265        struct port *p_ptr;
 266        struct sk_buff *buf = NULL;
 267
 268        tipc_withdraw(ref, 0, NULL);
 269        p_ptr = tipc_port_lock(ref);
 270        if (!p_ptr)
 271                return -EINVAL;
 272
 273        tipc_ref_discard(ref);
 274        tipc_port_unlock(p_ptr);
 275
 276        k_cancel_timer(&p_ptr->timer);
 277        if (p_ptr->publ.connected) {
 278                buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
 279                tipc_nodesub_unsubscribe(&p_ptr->subscription);
 280        }
 281        if (p_ptr->user_port) {
 282                tipc_reg_remove_port(p_ptr->user_port);
 283                kfree(p_ptr->user_port);
 284        }
 285
 286        spin_lock_bh(&tipc_port_list_lock);
 287        list_del(&p_ptr->port_list);
 288        list_del(&p_ptr->wait_list);
 289        spin_unlock_bh(&tipc_port_list_lock);
 290        k_term_timer(&p_ptr->timer);
 291        kfree(p_ptr);
 292        dbg("Deleted port %u\n", ref);
 293        tipc_net_route_msg(buf);
 294        return 0;
 295}
 296
 297/**
 298 * tipc_get_port() - return port associated with 'ref'
 299 *
 300 * Note: Port is not locked.
 301 */
 302
 303struct tipc_port *tipc_get_port(const u32 ref)
 304{
 305        return (struct tipc_port *)tipc_ref_deref(ref);
 306}
 307
 308/**
 309 * tipc_get_handle - return user handle associated to port 'ref'
 310 */
 311
 312void *tipc_get_handle(const u32 ref)
 313{
 314        struct port *p_ptr;
 315        void * handle;
 316
 317        p_ptr = tipc_port_lock(ref);
 318        if (!p_ptr)
 319                return NULL;
 320        handle = p_ptr->publ.usr_handle;
 321        tipc_port_unlock(p_ptr);
 322        return handle;
 323}
 324
 325static int port_unreliable(struct port *p_ptr)
 326{
 327        return msg_src_droppable(&p_ptr->publ.phdr);
 328}
 329
 330int tipc_portunreliable(u32 ref, unsigned int *isunreliable)
 331{
 332        struct port *p_ptr;
 333
 334        p_ptr = tipc_port_lock(ref);
 335        if (!p_ptr)
 336                return -EINVAL;
 337        *isunreliable = port_unreliable(p_ptr);
 338        tipc_port_unlock(p_ptr);
 339        return 0;
 340}
 341
 342int tipc_set_portunreliable(u32 ref, unsigned int isunreliable)
 343{
 344        struct port *p_ptr;
 345
 346        p_ptr = tipc_port_lock(ref);
 347        if (!p_ptr)
 348                return -EINVAL;
 349        msg_set_src_droppable(&p_ptr->publ.phdr, (isunreliable != 0));
 350        tipc_port_unlock(p_ptr);
 351        return 0;
 352}
 353
 354static int port_unreturnable(struct port *p_ptr)
 355{
 356        return msg_dest_droppable(&p_ptr->publ.phdr);
 357}
 358
 359int tipc_portunreturnable(u32 ref, unsigned int *isunrejectable)
 360{
 361        struct port *p_ptr;
 362
 363        p_ptr = tipc_port_lock(ref);
 364        if (!p_ptr)
 365                return -EINVAL;
 366        *isunrejectable = port_unreturnable(p_ptr);
 367        tipc_port_unlock(p_ptr);
 368        return 0;
 369}
 370
 371int tipc_set_portunreturnable(u32 ref, unsigned int isunrejectable)
 372{
 373        struct port *p_ptr;
 374
 375        p_ptr = tipc_port_lock(ref);
 376        if (!p_ptr)
 377                return -EINVAL;
 378        msg_set_dest_droppable(&p_ptr->publ.phdr, (isunrejectable != 0));
 379        tipc_port_unlock(p_ptr);
 380        return 0;
 381}
 382
 383/*
 384 * port_build_proto_msg(): build a port level protocol
 385 * or a connection abortion message. Called with
 386 * tipc_port lock on.
 387 */
 388static struct sk_buff *port_build_proto_msg(u32 destport, u32 destnode,
 389                                            u32 origport, u32 orignode,
 390                                            u32 usr, u32 type, u32 err,
 391                                            u32 seqno, u32 ack)
 392{
 393        struct sk_buff *buf;
 394        struct tipc_msg *msg;
 395
 396        buf = buf_acquire(LONG_H_SIZE);
 397        if (buf) {
 398                msg = buf_msg(buf);
 399                msg_init(msg, usr, type, LONG_H_SIZE, destnode);
 400                msg_set_errcode(msg, err);
 401                msg_set_destport(msg, destport);
 402                msg_set_origport(msg, origport);
 403                msg_set_orignode(msg, orignode);
 404                msg_set_transp_seqno(msg, seqno);
 405                msg_set_msgcnt(msg, ack);
 406                msg_dbg(msg, "PORT>SEND>:");
 407        }
 408        return buf;
 409}
 410
 411int tipc_reject_msg(struct sk_buff *buf, u32 err)
 412{
 413        struct tipc_msg *msg = buf_msg(buf);
 414        struct sk_buff *rbuf;
 415        struct tipc_msg *rmsg;
 416        int hdr_sz;
 417        u32 imp = msg_importance(msg);
 418        u32 data_sz = msg_data_sz(msg);
 419
 420        if (data_sz > MAX_REJECT_SIZE)
 421                data_sz = MAX_REJECT_SIZE;
 422        if (msg_connected(msg) && (imp < TIPC_CRITICAL_IMPORTANCE))
 423                imp++;
 424        msg_dbg(msg, "port->rej: ");
 425
 426        /* discard rejected message if it shouldn't be returned to sender */
 427        if (msg_errcode(msg) || msg_dest_droppable(msg)) {
 428                buf_discard(buf);
 429                return data_sz;
 430        }
 431
 432        /* construct rejected message */
 433        if (msg_mcast(msg))
 434                hdr_sz = MCAST_H_SIZE;
 435        else
 436                hdr_sz = LONG_H_SIZE;
 437        rbuf = buf_acquire(data_sz + hdr_sz);
 438        if (rbuf == NULL) {
 439                buf_discard(buf);
 440                return data_sz;
 441        }
 442        rmsg = buf_msg(rbuf);
 443        msg_init(rmsg, imp, msg_type(msg), hdr_sz, msg_orignode(msg));
 444        msg_set_errcode(rmsg, err);
 445        msg_set_destport(rmsg, msg_origport(msg));
 446        msg_set_origport(rmsg, msg_destport(msg));
 447        if (msg_short(msg)) {
 448                msg_set_orignode(rmsg, tipc_own_addr);
 449                /* leave name type & instance as zeroes */
 450        } else {
 451                msg_set_orignode(rmsg, msg_destnode(msg));
 452                msg_set_nametype(rmsg, msg_nametype(msg));
 453                msg_set_nameinst(rmsg, msg_nameinst(msg));
 454        }
 455        msg_set_size(rmsg, data_sz + hdr_sz);
 456        skb_copy_to_linear_data_offset(rbuf, hdr_sz, msg_data(msg), data_sz);
 457
 458        /* send self-abort message when rejecting on a connected port */
 459        if (msg_connected(msg)) {
 460                struct sk_buff *abuf = NULL;
 461                struct port *p_ptr = tipc_port_lock(msg_destport(msg));
 462
 463                if (p_ptr) {
 464                        if (p_ptr->publ.connected)
 465                                abuf = port_build_self_abort_msg(p_ptr, err);
 466                        tipc_port_unlock(p_ptr);
 467                }
 468                tipc_net_route_msg(abuf);
 469        }
 470
 471        /* send rejected message */
 472        buf_discard(buf);
 473        tipc_net_route_msg(rbuf);
 474        return data_sz;
 475}
 476
 477int tipc_port_reject_sections(struct port *p_ptr, struct tipc_msg *hdr,
 478                              struct iovec const *msg_sect, u32 num_sect,
 479                              int err)
 480{
 481        struct sk_buff *buf;
 482        int res;
 483
 484        res = msg_build(hdr, msg_sect, num_sect, MAX_MSG_SIZE,
 485                        !p_ptr->user_port, &buf);
 486        if (!buf)
 487                return res;
 488
 489        return tipc_reject_msg(buf, err);
 490}
 491
 492static void port_timeout(unsigned long ref)
 493{
 494        struct port *p_ptr = tipc_port_lock(ref);
 495        struct sk_buff *buf = NULL;
 496
 497        if (!p_ptr)
 498                return;
 499
 500        if (!p_ptr->publ.connected) {
 501                tipc_port_unlock(p_ptr);
 502                return;
 503        }
 504
 505        /* Last probe answered ? */
 506        if (p_ptr->probing_state == PROBING) {
 507                buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
 508        } else {
 509                buf = port_build_proto_msg(port_peerport(p_ptr),
 510                                           port_peernode(p_ptr),
 511                                           p_ptr->publ.ref,
 512                                           tipc_own_addr,
 513                                           CONN_MANAGER,
 514                                           CONN_PROBE,
 515                                           TIPC_OK,
 516                                           port_out_seqno(p_ptr),
 517                                           0);
 518                port_incr_out_seqno(p_ptr);
 519                p_ptr->probing_state = PROBING;
 520                k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
 521        }
 522        tipc_port_unlock(p_ptr);
 523        tipc_net_route_msg(buf);
 524}
 525
 526
 527static void port_handle_node_down(unsigned long ref)
 528{
 529        struct port *p_ptr = tipc_port_lock(ref);
 530        struct sk_buff* buf = NULL;
 531
 532        if (!p_ptr)
 533                return;
 534        buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
 535        tipc_port_unlock(p_ptr);
 536        tipc_net_route_msg(buf);
 537}
 538
 539
 540static struct sk_buff *port_build_self_abort_msg(struct port *p_ptr, u32 err)
 541{
 542        u32 imp = msg_importance(&p_ptr->publ.phdr);
 543
 544        if (!p_ptr->publ.connected)
 545                return NULL;
 546        if (imp < TIPC_CRITICAL_IMPORTANCE)
 547                imp++;
 548        return port_build_proto_msg(p_ptr->publ.ref,
 549                                    tipc_own_addr,
 550                                    port_peerport(p_ptr),
 551                                    port_peernode(p_ptr),
 552                                    imp,
 553                                    TIPC_CONN_MSG,
 554                                    err,
 555                                    p_ptr->last_in_seqno + 1,
 556                                    0);
 557}
 558
 559
 560static struct sk_buff *port_build_peer_abort_msg(struct port *p_ptr, u32 err)
 561{
 562        u32 imp = msg_importance(&p_ptr->publ.phdr);
 563
 564        if (!p_ptr->publ.connected)
 565                return NULL;
 566        if (imp < TIPC_CRITICAL_IMPORTANCE)
 567                imp++;
 568        return port_build_proto_msg(port_peerport(p_ptr),
 569                                    port_peernode(p_ptr),
 570                                    p_ptr->publ.ref,
 571                                    tipc_own_addr,
 572                                    imp,
 573                                    TIPC_CONN_MSG,
 574                                    err,
 575                                    port_out_seqno(p_ptr),
 576                                    0);
 577}
 578
 579void tipc_port_recv_proto_msg(struct sk_buff *buf)
 580{
 581        struct tipc_msg *msg = buf_msg(buf);
 582        struct port *p_ptr = tipc_port_lock(msg_destport(msg));
 583        u32 err = TIPC_OK;
 584        struct sk_buff *r_buf = NULL;
 585        struct sk_buff *abort_buf = NULL;
 586
 587        msg_dbg(msg, "PORT<RECV<:");
 588
 589        if (!p_ptr) {
 590                err = TIPC_ERR_NO_PORT;
 591        } else if (p_ptr->publ.connected) {
 592                if (port_peernode(p_ptr) != msg_orignode(msg))
 593                        err = TIPC_ERR_NO_PORT;
 594                if (port_peerport(p_ptr) != msg_origport(msg))
 595                        err = TIPC_ERR_NO_PORT;
 596                if (!err && msg_routed(msg)) {
 597                        u32 seqno = msg_transp_seqno(msg);
 598                        u32 myno =  ++p_ptr->last_in_seqno;
 599                        if (seqno != myno) {
 600                                err = TIPC_ERR_NO_PORT;
 601                                abort_buf = port_build_self_abort_msg(p_ptr, err);
 602                        }
 603                }
 604                if (msg_type(msg) == CONN_ACK) {
 605                        int wakeup = tipc_port_congested(p_ptr) &&
 606                                     p_ptr->publ.congested &&
 607                                     p_ptr->wakeup;
 608                        p_ptr->acked += msg_msgcnt(msg);
 609                        if (tipc_port_congested(p_ptr))
 610                                goto exit;
 611                        p_ptr->publ.congested = 0;
 612                        if (!wakeup)
 613                                goto exit;
 614                        p_ptr->wakeup(&p_ptr->publ);
 615                        goto exit;
 616                }
 617        } else if (p_ptr->publ.published) {
 618                err = TIPC_ERR_NO_PORT;
 619        }
 620        if (err) {
 621                r_buf = port_build_proto_msg(msg_origport(msg),
 622                                             msg_orignode(msg),
 623                                             msg_destport(msg),
 624                                             tipc_own_addr,
 625                                             TIPC_HIGH_IMPORTANCE,
 626                                             TIPC_CONN_MSG,
 627                                             err,
 628                                             0,
 629                                             0);
 630                goto exit;
 631        }
 632
 633        /* All is fine */
 634        if (msg_type(msg) == CONN_PROBE) {
 635                r_buf = port_build_proto_msg(msg_origport(msg),
 636                                             msg_orignode(msg),
 637                                             msg_destport(msg),
 638                                             tipc_own_addr,
 639                                             CONN_MANAGER,
 640                                             CONN_PROBE_REPLY,
 641                                             TIPC_OK,
 642                                             port_out_seqno(p_ptr),
 643                                             0);
 644        }
 645        p_ptr->probing_state = CONFIRMED;
 646        port_incr_out_seqno(p_ptr);
 647exit:
 648        if (p_ptr)
 649                tipc_port_unlock(p_ptr);
 650        tipc_net_route_msg(r_buf);
 651        tipc_net_route_msg(abort_buf);
 652        buf_discard(buf);
 653}
 654
 655static void port_print(struct port *p_ptr, struct print_buf *buf, int full_id)
 656{
 657        struct publication *publ;
 658
 659        if (full_id)
 660                tipc_printf(buf, "<%u.%u.%u:%u>:",
 661                            tipc_zone(tipc_own_addr), tipc_cluster(tipc_own_addr),
 662                            tipc_node(tipc_own_addr), p_ptr->publ.ref);
 663        else
 664                tipc_printf(buf, "%-10u:", p_ptr->publ.ref);
 665
 666        if (p_ptr->publ.connected) {
 667                u32 dport = port_peerport(p_ptr);
 668                u32 destnode = port_peernode(p_ptr);
 669
 670                tipc_printf(buf, " connected to <%u.%u.%u:%u>",
 671                            tipc_zone(destnode), tipc_cluster(destnode),
 672                            tipc_node(destnode), dport);
 673                if (p_ptr->publ.conn_type != 0)
 674                        tipc_printf(buf, " via {%u,%u}",
 675                                    p_ptr->publ.conn_type,
 676                                    p_ptr->publ.conn_instance);
 677        }
 678        else if (p_ptr->publ.published) {
 679                tipc_printf(buf, " bound to");
 680                list_for_each_entry(publ, &p_ptr->publications, pport_list) {
 681                        if (publ->lower == publ->upper)
 682                                tipc_printf(buf, " {%u,%u}", publ->type,
 683                                            publ->lower);
 684                        else
 685                                tipc_printf(buf, " {%u,%u,%u}", publ->type,
 686                                            publ->lower, publ->upper);
 687                }
 688        }
 689        tipc_printf(buf, "\n");
 690}
 691
 692#define MAX_PORT_QUERY 32768
 693
 694struct sk_buff *tipc_port_get_ports(void)
 695{
 696        struct sk_buff *buf;
 697        struct tlv_desc *rep_tlv;
 698        struct print_buf pb;
 699        struct port *p_ptr;
 700        int str_len;
 701
 702        buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_QUERY));
 703        if (!buf)
 704                return NULL;
 705        rep_tlv = (struct tlv_desc *)buf->data;
 706
 707        tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_QUERY);
 708        spin_lock_bh(&tipc_port_list_lock);
 709        list_for_each_entry(p_ptr, &ports, port_list) {
 710                spin_lock_bh(p_ptr->publ.lock);
 711                port_print(p_ptr, &pb, 0);
 712                spin_unlock_bh(p_ptr->publ.lock);
 713        }
 714        spin_unlock_bh(&tipc_port_list_lock);
 715        str_len = tipc_printbuf_validate(&pb);
 716
 717        skb_put(buf, TLV_SPACE(str_len));
 718        TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
 719
 720        return buf;
 721}
 722
 723#if 0
 724
 725#define MAX_PORT_STATS 2000
 726
 727struct sk_buff *port_show_stats(const void *req_tlv_area, int req_tlv_space)
 728{
 729        u32 ref;
 730        struct port *p_ptr;
 731        struct sk_buff *buf;
 732        struct tlv_desc *rep_tlv;
 733        struct print_buf pb;
 734        int str_len;
 735
 736        if (!TLV_CHECK(req_tlv_area, req_tlv_space, TIPC_TLV_PORT_REF))
 737                return cfg_reply_error_string(TIPC_CFG_TLV_ERROR);
 738
 739        ref = *(u32 *)TLV_DATA(req_tlv_area);
 740        ref = ntohl(ref);
 741
 742        p_ptr = tipc_port_lock(ref);
 743        if (!p_ptr)
 744                return cfg_reply_error_string("port not found");
 745
 746        buf = tipc_cfg_reply_alloc(TLV_SPACE(MAX_PORT_STATS));
 747        if (!buf) {
 748                tipc_port_unlock(p_ptr);
 749                return NULL;
 750        }
 751        rep_tlv = (struct tlv_desc *)buf->data;
 752
 753        tipc_printbuf_init(&pb, TLV_DATA(rep_tlv), MAX_PORT_STATS);
 754        port_print(p_ptr, &pb, 1);
 755        /* NEED TO FILL IN ADDITIONAL PORT STATISTICS HERE */
 756        tipc_port_unlock(p_ptr);
 757        str_len = tipc_printbuf_validate(&pb);
 758
 759        skb_put(buf, TLV_SPACE(str_len));
 760        TLV_SET(rep_tlv, TIPC_TLV_ULTRA_STRING, NULL, str_len);
 761
 762        return buf;
 763}
 764
 765#endif
 766
 767void tipc_port_reinit(void)
 768{
 769        struct port *p_ptr;
 770        struct tipc_msg *msg;
 771
 772        spin_lock_bh(&tipc_port_list_lock);
 773        list_for_each_entry(p_ptr, &ports, port_list) {
 774                msg = &p_ptr->publ.phdr;
 775                if (msg_orignode(msg) == tipc_own_addr)
 776                        break;
 777                msg_set_prevnode(msg, tipc_own_addr);
 778                msg_set_orignode(msg, tipc_own_addr);
 779        }
 780        spin_unlock_bh(&tipc_port_list_lock);
 781}
 782
 783
 784/*
 785 *  port_dispatcher_sigh(): Signal handler for messages destinated
 786 *                          to the tipc_port interface.
 787 */
 788
 789static void port_dispatcher_sigh(void *dummy)
 790{
 791        struct sk_buff *buf;
 792
 793        spin_lock_bh(&queue_lock);
 794        buf = msg_queue_head;
 795        msg_queue_head = NULL;
 796        spin_unlock_bh(&queue_lock);
 797
 798        while (buf) {
 799                struct port *p_ptr;
 800                struct user_port *up_ptr;
 801                struct tipc_portid orig;
 802                struct tipc_name_seq dseq;
 803                void *usr_handle;
 804                int connected;
 805                int published;
 806                u32 message_type;
 807
 808                struct sk_buff *next = buf->next;
 809                struct tipc_msg *msg = buf_msg(buf);
 810                u32 dref = msg_destport(msg);
 811
 812                message_type = msg_type(msg);
 813                if (message_type > TIPC_DIRECT_MSG)
 814                        goto reject;    /* Unsupported message type */
 815
 816                p_ptr = tipc_port_lock(dref);
 817                if (!p_ptr)
 818                        goto reject;    /* Port deleted while msg in queue */
 819
 820                orig.ref = msg_origport(msg);
 821                orig.node = msg_orignode(msg);
 822                up_ptr = p_ptr->user_port;
 823                usr_handle = up_ptr->usr_handle;
 824                connected = p_ptr->publ.connected;
 825                published = p_ptr->publ.published;
 826
 827                if (unlikely(msg_errcode(msg)))
 828                        goto err;
 829
 830                switch (message_type) {
 831
 832                case TIPC_CONN_MSG:{
 833                                tipc_conn_msg_event cb = up_ptr->conn_msg_cb;
 834                                u32 peer_port = port_peerport(p_ptr);
 835                                u32 peer_node = port_peernode(p_ptr);
 836
 837                                tipc_port_unlock(p_ptr);
 838                                if (unlikely(!cb))
 839                                        goto reject;
 840                                if (unlikely(!connected)) {
 841                                        if (tipc_connect2port(dref, &orig))
 842                                                goto reject;
 843                                } else if ((msg_origport(msg) != peer_port) ||
 844                                           (msg_orignode(msg) != peer_node))
 845                                        goto reject;
 846                                if (unlikely(++p_ptr->publ.conn_unacked >=
 847                                             TIPC_FLOW_CONTROL_WIN))
 848                                        tipc_acknowledge(dref,
 849                                                         p_ptr->publ.conn_unacked);
 850                                skb_pull(buf, msg_hdr_sz(msg));
 851                                cb(usr_handle, dref, &buf, msg_data(msg),
 852                                   msg_data_sz(msg));
 853                                break;
 854                        }
 855                case TIPC_DIRECT_MSG:{
 856                                tipc_msg_event cb = up_ptr->msg_cb;
 857
 858                                tipc_port_unlock(p_ptr);
 859                                if (unlikely(!cb || connected))
 860                                        goto reject;
 861                                skb_pull(buf, msg_hdr_sz(msg));
 862                                cb(usr_handle, dref, &buf, msg_data(msg),
 863                                   msg_data_sz(msg), msg_importance(msg),
 864                                   &orig);
 865                                break;
 866                        }
 867                case TIPC_MCAST_MSG:
 868                case TIPC_NAMED_MSG:{
 869                                tipc_named_msg_event cb = up_ptr->named_msg_cb;
 870
 871                                tipc_port_unlock(p_ptr);
 872                                if (unlikely(!cb || connected || !published))
 873                                        goto reject;
 874                                dseq.type =  msg_nametype(msg);
 875                                dseq.lower = msg_nameinst(msg);
 876                                dseq.upper = (message_type == TIPC_NAMED_MSG)
 877                                        ? dseq.lower : msg_nameupper(msg);
 878                                skb_pull(buf, msg_hdr_sz(msg));
 879                                cb(usr_handle, dref, &buf, msg_data(msg),
 880                                   msg_data_sz(msg), msg_importance(msg),
 881                                   &orig, &dseq);
 882                                break;
 883                        }
 884                }
 885                if (buf)
 886                        buf_discard(buf);
 887                buf = next;
 888                continue;
 889err:
 890                switch (message_type) {
 891
 892                case TIPC_CONN_MSG:{
 893                                tipc_conn_shutdown_event cb =
 894                                        up_ptr->conn_err_cb;
 895                                u32 peer_port = port_peerport(p_ptr);
 896                                u32 peer_node = port_peernode(p_ptr);
 897
 898                                tipc_port_unlock(p_ptr);
 899                                if (!cb || !connected)
 900                                        break;
 901                                if ((msg_origport(msg) != peer_port) ||
 902                                    (msg_orignode(msg) != peer_node))
 903                                        break;
 904                                tipc_disconnect(dref);
 905                                skb_pull(buf, msg_hdr_sz(msg));
 906                                cb(usr_handle, dref, &buf, msg_data(msg),
 907                                   msg_data_sz(msg), msg_errcode(msg));
 908                                break;
 909                        }
 910                case TIPC_DIRECT_MSG:{
 911                                tipc_msg_err_event cb = up_ptr->err_cb;
 912
 913                                tipc_port_unlock(p_ptr);
 914                                if (!cb || connected)
 915                                        break;
 916                                skb_pull(buf, msg_hdr_sz(msg));
 917                                cb(usr_handle, dref, &buf, msg_data(msg),
 918                                   msg_data_sz(msg), msg_errcode(msg), &orig);
 919                                break;
 920                        }
 921                case TIPC_MCAST_MSG:
 922                case TIPC_NAMED_MSG:{
 923                                tipc_named_msg_err_event cb =
 924                                        up_ptr->named_err_cb;
 925
 926                                tipc_port_unlock(p_ptr);
 927                                if (!cb || connected)
 928                                        break;
 929                                dseq.type =  msg_nametype(msg);
 930                                dseq.lower = msg_nameinst(msg);
 931                                dseq.upper = (message_type == TIPC_NAMED_MSG)
 932                                        ? dseq.lower : msg_nameupper(msg);
 933                                skb_pull(buf, msg_hdr_sz(msg));
 934                                cb(usr_handle, dref, &buf, msg_data(msg),
 935                                   msg_data_sz(msg), msg_errcode(msg), &dseq);
 936                                break;
 937                        }
 938                }
 939                if (buf)
 940                        buf_discard(buf);
 941                buf = next;
 942                continue;
 943reject:
 944                tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
 945                buf = next;
 946        }
 947}
 948
 949/*
 950 *  port_dispatcher(): Dispatcher for messages destinated
 951 *  to the tipc_port interface. Called with port locked.
 952 */
 953
 954static u32 port_dispatcher(struct tipc_port *dummy, struct sk_buff *buf)
 955{
 956        buf->next = NULL;
 957        spin_lock_bh(&queue_lock);
 958        if (msg_queue_head) {
 959                msg_queue_tail->next = buf;
 960                msg_queue_tail = buf;
 961        } else {
 962                msg_queue_tail = msg_queue_head = buf;
 963                tipc_k_signal((Handler)port_dispatcher_sigh, 0);
 964        }
 965        spin_unlock_bh(&queue_lock);
 966        return 0;
 967}
 968
 969/*
 970 * Wake up port after congestion: Called with port locked,
 971 *
 972 */
 973
 974static void port_wakeup_sh(unsigned long ref)
 975{
 976        struct port *p_ptr;
 977        struct user_port *up_ptr;
 978        tipc_continue_event cb = NULL;
 979        void *uh = NULL;
 980
 981        p_ptr = tipc_port_lock(ref);
 982        if (p_ptr) {
 983                up_ptr = p_ptr->user_port;
 984                if (up_ptr) {
 985                        cb = up_ptr->continue_event_cb;
 986                        uh = up_ptr->usr_handle;
 987                }
 988                tipc_port_unlock(p_ptr);
 989        }
 990        if (cb)
 991                cb(uh, ref);
 992}
 993
 994
 995static void port_wakeup(struct tipc_port *p_ptr)
 996{
 997        tipc_k_signal((Handler)port_wakeup_sh, p_ptr->ref);
 998}
 999
1000void tipc_acknowledge(u32 ref, u32 ack)
1001{
1002        struct port *p_ptr;
1003        struct sk_buff *buf = NULL;
1004
1005        p_ptr = tipc_port_lock(ref);
1006        if (!p_ptr)
1007                return;
1008        if (p_ptr->publ.connected) {
1009                p_ptr->publ.conn_unacked -= ack;
1010                buf = port_build_proto_msg(port_peerport(p_ptr),
1011                                           port_peernode(p_ptr),
1012                                           ref,
1013                                           tipc_own_addr,
1014                                           CONN_MANAGER,
1015                                           CONN_ACK,
1016                                           TIPC_OK,
1017                                           port_out_seqno(p_ptr),
1018                                           ack);
1019        }
1020        tipc_port_unlock(p_ptr);
1021        tipc_net_route_msg(buf);
1022}
1023
1024/*
1025 * tipc_createport(): user level call. Will add port to
1026 *                    registry if non-zero user_ref.
1027 */
1028
1029int tipc_createport(u32 user_ref,
1030                    void *usr_handle,
1031                    unsigned int importance,
1032                    tipc_msg_err_event error_cb,
1033                    tipc_named_msg_err_event named_error_cb,
1034                    tipc_conn_shutdown_event conn_error_cb,
1035                    tipc_msg_event msg_cb,
1036                    tipc_named_msg_event named_msg_cb,
1037                    tipc_conn_msg_event conn_msg_cb,
1038                    tipc_continue_event continue_event_cb,/* May be zero */
1039                    u32 *portref)
1040{
1041        struct user_port *up_ptr;
1042        struct port *p_ptr;
1043
1044        up_ptr = kmalloc(sizeof(*up_ptr), GFP_ATOMIC);
1045        if (!up_ptr) {
1046                warn("Port creation failed, no memory\n");
1047                return -ENOMEM;
1048        }
1049        p_ptr = (struct port *)tipc_createport_raw(NULL, port_dispatcher,
1050                                                   port_wakeup, importance);
1051        if (!p_ptr) {
1052                kfree(up_ptr);
1053                return -ENOMEM;
1054        }
1055
1056        p_ptr->user_port = up_ptr;
1057        up_ptr->user_ref = user_ref;
1058        up_ptr->usr_handle = usr_handle;
1059        up_ptr->ref = p_ptr->publ.ref;
1060        up_ptr->err_cb = error_cb;
1061        up_ptr->named_err_cb = named_error_cb;
1062        up_ptr->conn_err_cb = conn_error_cb;
1063        up_ptr->msg_cb = msg_cb;
1064        up_ptr->named_msg_cb = named_msg_cb;
1065        up_ptr->conn_msg_cb = conn_msg_cb;
1066        up_ptr->continue_event_cb = continue_event_cb;
1067        INIT_LIST_HEAD(&up_ptr->uport_list);
1068        tipc_reg_add_port(up_ptr);
1069        *portref = p_ptr->publ.ref;
1070        tipc_port_unlock(p_ptr);
1071        return 0;
1072}
1073
1074int tipc_ownidentity(u32 ref, struct tipc_portid *id)
1075{
1076        id->ref = ref;
1077        id->node = tipc_own_addr;
1078        return 0;
1079}
1080
1081int tipc_portimportance(u32 ref, unsigned int *importance)
1082{
1083        struct port *p_ptr;
1084
1085        p_ptr = tipc_port_lock(ref);
1086        if (!p_ptr)
1087                return -EINVAL;
1088        *importance = (unsigned int)msg_importance(&p_ptr->publ.phdr);
1089        tipc_port_unlock(p_ptr);
1090        return 0;
1091}
1092
1093int tipc_set_portimportance(u32 ref, unsigned int imp)
1094{
1095        struct port *p_ptr;
1096
1097        if (imp > TIPC_CRITICAL_IMPORTANCE)
1098                return -EINVAL;
1099
1100        p_ptr = tipc_port_lock(ref);
1101        if (!p_ptr)
1102                return -EINVAL;
1103        msg_set_importance(&p_ptr->publ.phdr, (u32)imp);
1104        tipc_port_unlock(p_ptr);
1105        return 0;
1106}
1107
1108
1109int tipc_publish(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1110{
1111        struct port *p_ptr;
1112        struct publication *publ;
1113        u32 key;
1114        int res = -EINVAL;
1115
1116        p_ptr = tipc_port_lock(ref);
1117        if (!p_ptr)
1118                return -EINVAL;
1119
1120        dbg("tipc_publ %u, p_ptr = %x, conn = %x, scope = %x, "
1121            "lower = %u, upper = %u\n",
1122            ref, p_ptr, p_ptr->publ.connected, scope, seq->lower, seq->upper);
1123        if (p_ptr->publ.connected)
1124                goto exit;
1125        if (seq->lower > seq->upper)
1126                goto exit;
1127        if ((scope < TIPC_ZONE_SCOPE) || (scope > TIPC_NODE_SCOPE))
1128                goto exit;
1129        key = ref + p_ptr->pub_count + 1;
1130        if (key == ref) {
1131                res = -EADDRINUSE;
1132                goto exit;
1133        }
1134        publ = tipc_nametbl_publish(seq->type, seq->lower, seq->upper,
1135                                    scope, p_ptr->publ.ref, key);
1136        if (publ) {
1137                list_add(&publ->pport_list, &p_ptr->publications);
1138                p_ptr->pub_count++;
1139                p_ptr->publ.published = 1;
1140                res = 0;
1141        }
1142exit:
1143        tipc_port_unlock(p_ptr);
1144        return res;
1145}
1146
1147int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1148{
1149        struct port *p_ptr;
1150        struct publication *publ;
1151        struct publication *tpubl;
1152        int res = -EINVAL;
1153
1154        p_ptr = tipc_port_lock(ref);
1155        if (!p_ptr)
1156                return -EINVAL;
1157        if (!seq) {
1158                list_for_each_entry_safe(publ, tpubl,
1159                                         &p_ptr->publications, pport_list) {
1160                        tipc_nametbl_withdraw(publ->type, publ->lower,
1161                                              publ->ref, publ->key);
1162                }
1163                res = 0;
1164        } else {
1165                list_for_each_entry_safe(publ, tpubl,
1166                                         &p_ptr->publications, pport_list) {
1167                        if (publ->scope != scope)
1168                                continue;
1169                        if (publ->type != seq->type)
1170                                continue;
1171                        if (publ->lower != seq->lower)
1172                                continue;
1173                        if (publ->upper != seq->upper)
1174                                break;
1175                        tipc_nametbl_withdraw(publ->type, publ->lower,
1176                                              publ->ref, publ->key);
1177                        res = 0;
1178                        break;
1179                }
1180        }
1181        if (list_empty(&p_ptr->publications))
1182                p_ptr->publ.published = 0;
1183        tipc_port_unlock(p_ptr);
1184        return res;
1185}
1186
1187int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1188{
1189        struct port *p_ptr;
1190        struct tipc_msg *msg;
1191        int res = -EINVAL;
1192
1193        p_ptr = tipc_port_lock(ref);
1194        if (!p_ptr)
1195                return -EINVAL;
1196        if (p_ptr->publ.published || p_ptr->publ.connected)
1197                goto exit;
1198        if (!peer->ref)
1199                goto exit;
1200
1201        msg = &p_ptr->publ.phdr;
1202        msg_set_destnode(msg, peer->node);
1203        msg_set_destport(msg, peer->ref);
1204        msg_set_orignode(msg, tipc_own_addr);
1205        msg_set_origport(msg, p_ptr->publ.ref);
1206        msg_set_transp_seqno(msg, 42);
1207        msg_set_type(msg, TIPC_CONN_MSG);
1208        if (!may_route(peer->node))
1209                msg_set_hdr_sz(msg, SHORT_H_SIZE);
1210        else
1211                msg_set_hdr_sz(msg, LONG_H_SIZE);
1212
1213        p_ptr->probing_interval = PROBING_INTERVAL;
1214        p_ptr->probing_state = CONFIRMED;
1215        p_ptr->publ.connected = 1;
1216        k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
1217
1218        tipc_nodesub_subscribe(&p_ptr->subscription,peer->node,
1219                          (void *)(unsigned long)ref,
1220                          (net_ev_handler)port_handle_node_down);
1221        res = 0;
1222exit:
1223        tipc_port_unlock(p_ptr);
1224        p_ptr->publ.max_pkt = tipc_link_get_max_pkt(peer->node, ref);
1225        return res;
1226}
1227
1228/**
1229 * tipc_disconnect_port - disconnect port from peer
1230 *
1231 * Port must be locked.
1232 */
1233
1234int tipc_disconnect_port(struct tipc_port *tp_ptr)
1235{
1236        int res;
1237
1238        if (tp_ptr->connected) {
1239                tp_ptr->connected = 0;
1240                /* let timer expire on it's own to avoid deadlock! */
1241                tipc_nodesub_unsubscribe(
1242                        &((struct port *)tp_ptr)->subscription);
1243                res = 0;
1244        } else {
1245                res = -ENOTCONN;
1246        }
1247        return res;
1248}
1249
1250/*
1251 * tipc_disconnect(): Disconnect port form peer.
1252 *                    This is a node local operation.
1253 */
1254
1255int tipc_disconnect(u32 ref)
1256{
1257        struct port *p_ptr;
1258        int res;
1259
1260        p_ptr = tipc_port_lock(ref);
1261        if (!p_ptr)
1262                return -EINVAL;
1263        res = tipc_disconnect_port((struct tipc_port *)p_ptr);
1264        tipc_port_unlock(p_ptr);
1265        return res;
1266}
1267
1268/*
1269 * tipc_shutdown(): Send a SHUTDOWN msg to peer and disconnect
1270 */
1271int tipc_shutdown(u32 ref)
1272{
1273        struct port *p_ptr;
1274        struct sk_buff *buf = NULL;
1275
1276        p_ptr = tipc_port_lock(ref);
1277        if (!p_ptr)
1278                return -EINVAL;
1279
1280        if (p_ptr->publ.connected) {
1281                u32 imp = msg_importance(&p_ptr->publ.phdr);
1282                if (imp < TIPC_CRITICAL_IMPORTANCE)
1283                        imp++;
1284                buf = port_build_proto_msg(port_peerport(p_ptr),
1285                                           port_peernode(p_ptr),
1286                                           ref,
1287                                           tipc_own_addr,
1288                                           imp,
1289                                           TIPC_CONN_MSG,
1290                                           TIPC_CONN_SHUTDOWN,
1291                                           port_out_seqno(p_ptr),
1292                                           0);
1293        }
1294        tipc_port_unlock(p_ptr);
1295        tipc_net_route_msg(buf);
1296        return tipc_disconnect(ref);
1297}
1298
1299int tipc_isconnected(u32 ref, int *isconnected)
1300{
1301        struct port *p_ptr;
1302
1303        p_ptr = tipc_port_lock(ref);
1304        if (!p_ptr)
1305                return -EINVAL;
1306        *isconnected = p_ptr->publ.connected;
1307        tipc_port_unlock(p_ptr);
1308        return 0;
1309}
1310
1311int tipc_peer(u32 ref, struct tipc_portid *peer)
1312{
1313        struct port *p_ptr;
1314        int res;
1315
1316        p_ptr = tipc_port_lock(ref);
1317        if (!p_ptr)
1318                return -EINVAL;
1319        if (p_ptr->publ.connected) {
1320                peer->ref = port_peerport(p_ptr);
1321                peer->node = port_peernode(p_ptr);
1322                res = 0;
1323        } else
1324                res = -ENOTCONN;
1325        tipc_port_unlock(p_ptr);
1326        return res;
1327}
1328
1329int tipc_ref_valid(u32 ref)
1330{
1331        /* Works irrespective of type */
1332        return !!tipc_ref_deref(ref);
1333}
1334
1335
1336/*
1337 *  tipc_port_recv_sections(): Concatenate and deliver sectioned
1338 *                        message for this node.
1339 */
1340
1341int tipc_port_recv_sections(struct port *sender, unsigned int num_sect,
1342                       struct iovec const *msg_sect)
1343{
1344        struct sk_buff *buf;
1345        int res;
1346
1347        res = msg_build(&sender->publ.phdr, msg_sect, num_sect,
1348                        MAX_MSG_SIZE, !sender->user_port, &buf);
1349        if (likely(buf))
1350                tipc_port_recv_msg(buf);
1351        return res;
1352}
1353
1354/**
1355 * tipc_send - send message sections on connection
1356 */
1357
1358int tipc_send(u32 ref, unsigned int num_sect, struct iovec const *msg_sect)
1359{
1360        struct port *p_ptr;
1361        u32 destnode;
1362        int res;
1363
1364        p_ptr = tipc_port_deref(ref);
1365        if (!p_ptr || !p_ptr->publ.connected)
1366                return -EINVAL;
1367
1368        p_ptr->publ.congested = 1;
1369        if (!tipc_port_congested(p_ptr)) {
1370                destnode = port_peernode(p_ptr);
1371                if (likely(destnode != tipc_own_addr))
1372                        res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1373                                                           destnode);
1374                else
1375                        res = tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
1376
1377                if (likely(res != -ELINKCONG)) {
1378                        port_incr_out_seqno(p_ptr);
1379                        p_ptr->publ.congested = 0;
1380                        p_ptr->sent++;
1381                        return res;
1382                }
1383        }
1384        if (port_unreliable(p_ptr)) {
1385                p_ptr->publ.congested = 0;
1386                /* Just calculate msg length and return */
1387                return msg_calc_data_size(msg_sect, num_sect);
1388        }
1389        return -ELINKCONG;
1390}
1391
1392/**
1393 * tipc_send_buf - send message buffer on connection
1394 */
1395
1396int tipc_send_buf(u32 ref, struct sk_buff *buf, unsigned int dsz)
1397{
1398        struct port *p_ptr;
1399        struct tipc_msg *msg;
1400        u32 destnode;
1401        u32 hsz;
1402        u32 sz;
1403        u32 res;
1404
1405        p_ptr = tipc_port_deref(ref);
1406        if (!p_ptr || !p_ptr->publ.connected)
1407                return -EINVAL;
1408
1409        msg = &p_ptr->publ.phdr;
1410        hsz = msg_hdr_sz(msg);
1411        sz = hsz + dsz;
1412        msg_set_size(msg, sz);
1413        if (skb_cow(buf, hsz))
1414                return -ENOMEM;
1415
1416        skb_push(buf, hsz);
1417        skb_copy_to_linear_data(buf, msg, hsz);
1418        destnode = msg_destnode(msg);
1419        p_ptr->publ.congested = 1;
1420        if (!tipc_port_congested(p_ptr)) {
1421                if (likely(destnode != tipc_own_addr))
1422                        res = tipc_send_buf_fast(buf, destnode);
1423                else {
1424                        tipc_port_recv_msg(buf);
1425                        res = sz;
1426                }
1427                if (likely(res != -ELINKCONG)) {
1428                        port_incr_out_seqno(p_ptr);
1429                        p_ptr->sent++;
1430                        p_ptr->publ.congested = 0;
1431                        return res;
1432                }
1433        }
1434        if (port_unreliable(p_ptr)) {
1435                p_ptr->publ.congested = 0;
1436                return dsz;
1437        }
1438        return -ELINKCONG;
1439}
1440
1441/**
1442 * tipc_forward2name - forward message sections to port name
1443 */
1444
1445int tipc_forward2name(u32 ref,
1446                      struct tipc_name const *name,
1447                      u32 domain,
1448                      u32 num_sect,
1449                      struct iovec const *msg_sect,
1450                      struct tipc_portid const *orig,
1451                      unsigned int importance)
1452{
1453        struct port *p_ptr;
1454        struct tipc_msg *msg;
1455        u32 destnode = domain;
1456        u32 destport = 0;
1457        int res;
1458
1459        p_ptr = tipc_port_deref(ref);
1460        if (!p_ptr || p_ptr->publ.connected)
1461                return -EINVAL;
1462
1463        msg = &p_ptr->publ.phdr;
1464        msg_set_type(msg, TIPC_NAMED_MSG);
1465        msg_set_orignode(msg, orig->node);
1466        msg_set_origport(msg, orig->ref);
1467        msg_set_hdr_sz(msg, LONG_H_SIZE);
1468        msg_set_nametype(msg, name->type);
1469        msg_set_nameinst(msg, name->instance);
1470        msg_set_lookup_scope(msg, addr_scope(domain));
1471        if (importance <= TIPC_CRITICAL_IMPORTANCE)
1472                msg_set_importance(msg,importance);
1473        destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
1474        msg_set_destnode(msg, destnode);
1475        msg_set_destport(msg, destport);
1476
1477        if (likely(destport || destnode)) {
1478                p_ptr->sent++;
1479                if (likely(destnode == tipc_own_addr))
1480                        return tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
1481                res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect,
1482                                                   destnode);
1483                if (likely(res != -ELINKCONG))
1484                        return res;
1485                if (port_unreliable(p_ptr)) {
1486                        /* Just calculate msg length and return */
1487                        return msg_calc_data_size(msg_sect, num_sect);
1488                }
1489                return -ELINKCONG;
1490        }
1491        return tipc_port_reject_sections(p_ptr, msg, msg_sect, num_sect,
1492                                         TIPC_ERR_NO_NAME);
1493}
1494
1495/**
1496 * tipc_send2name - send message sections to port name
1497 */
1498
1499int tipc_send2name(u32 ref,
1500                   struct tipc_name const *name,
1501                   unsigned int domain,
1502                   unsigned int num_sect,
1503                   struct iovec const *msg_sect)
1504{
1505        struct tipc_portid orig;
1506
1507        orig.ref = ref;
1508        orig.node = tipc_own_addr;
1509        return tipc_forward2name(ref, name, domain, num_sect, msg_sect, &orig,
1510                                 TIPC_PORT_IMPORTANCE);
1511}
1512
1513/**
1514 * tipc_forward_buf2name - forward message buffer to port name
1515 */
1516
1517int tipc_forward_buf2name(u32 ref,
1518                          struct tipc_name const *name,
1519                          u32 domain,
1520                          struct sk_buff *buf,
1521                          unsigned int dsz,
1522                          struct tipc_portid const *orig,
1523                          unsigned int importance)
1524{
1525        struct port *p_ptr;
1526        struct tipc_msg *msg;
1527        u32 destnode = domain;
1528        u32 destport = 0;
1529        int res;
1530
1531        p_ptr = (struct port *)tipc_ref_deref(ref);
1532        if (!p_ptr || p_ptr->publ.connected)
1533                return -EINVAL;
1534
1535        msg = &p_ptr->publ.phdr;
1536        if (importance <= TIPC_CRITICAL_IMPORTANCE)
1537                msg_set_importance(msg, importance);
1538        msg_set_type(msg, TIPC_NAMED_MSG);
1539        msg_set_orignode(msg, orig->node);
1540        msg_set_origport(msg, orig->ref);
1541        msg_set_nametype(msg, name->type);
1542        msg_set_nameinst(msg, name->instance);
1543        msg_set_lookup_scope(msg, addr_scope(domain));
1544        msg_set_hdr_sz(msg, LONG_H_SIZE);
1545        msg_set_size(msg, LONG_H_SIZE + dsz);
1546        destport = tipc_nametbl_translate(name->type, name->instance, &destnode);
1547        msg_set_destnode(msg, destnode);
1548        msg_set_destport(msg, destport);
1549        msg_dbg(msg, "forw2name ==> ");
1550        if (skb_cow(buf, LONG_H_SIZE))
1551                return -ENOMEM;
1552        skb_push(buf, LONG_H_SIZE);
1553        skb_copy_to_linear_data(buf, msg, LONG_H_SIZE);
1554        msg_dbg(buf_msg(buf),"PREP:");
1555        if (likely(destport || destnode)) {
1556                p_ptr->sent++;
1557                if (destnode == tipc_own_addr)
1558                        return tipc_port_recv_msg(buf);
1559                res = tipc_send_buf_fast(buf, destnode);
1560                if (likely(res != -ELINKCONG))
1561                        return res;
1562                if (port_unreliable(p_ptr))
1563                        return dsz;
1564                return -ELINKCONG;
1565        }
1566        return tipc_reject_msg(buf, TIPC_ERR_NO_NAME);
1567}
1568
1569/**
1570 * tipc_send_buf2name - send message buffer to port name
1571 */
1572
1573int tipc_send_buf2name(u32 ref,
1574                       struct tipc_name const *dest,
1575                       u32 domain,
1576                       struct sk_buff *buf,
1577                       unsigned int dsz)
1578{
1579        struct tipc_portid orig;
1580
1581        orig.ref = ref;
1582        orig.node = tipc_own_addr;
1583        return tipc_forward_buf2name(ref, dest, domain, buf, dsz, &orig,
1584                                     TIPC_PORT_IMPORTANCE);
1585}
1586
1587/**
1588 * tipc_forward2port - forward message sections to port identity
1589 */
1590
1591int tipc_forward2port(u32 ref,
1592                      struct tipc_portid const *dest,
1593                      unsigned int num_sect,
1594                      struct iovec const *msg_sect,
1595                      struct tipc_portid const *orig,
1596                      unsigned int importance)
1597{
1598        struct port *p_ptr;
1599        struct tipc_msg *msg;
1600        int res;
1601
1602        p_ptr = tipc_port_deref(ref);
1603        if (!p_ptr || p_ptr->publ.connected)
1604                return -EINVAL;
1605
1606        msg = &p_ptr->publ.phdr;
1607        msg_set_type(msg, TIPC_DIRECT_MSG);
1608        msg_set_orignode(msg, orig->node);
1609        msg_set_origport(msg, orig->ref);
1610        msg_set_destnode(msg, dest->node);
1611        msg_set_destport(msg, dest->ref);
1612        msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1613        if (importance <= TIPC_CRITICAL_IMPORTANCE)
1614                msg_set_importance(msg, importance);
1615        p_ptr->sent++;
1616        if (dest->node == tipc_own_addr)
1617                return tipc_port_recv_sections(p_ptr, num_sect, msg_sect);
1618        res = tipc_link_send_sections_fast(p_ptr, msg_sect, num_sect, dest->node);
1619        if (likely(res != -ELINKCONG))
1620                return res;
1621        if (port_unreliable(p_ptr)) {
1622                /* Just calculate msg length and return */
1623                return msg_calc_data_size(msg_sect, num_sect);
1624        }
1625        return -ELINKCONG;
1626}
1627
1628/**
1629 * tipc_send2port - send message sections to port identity
1630 */
1631
1632int tipc_send2port(u32 ref,
1633                   struct tipc_portid const *dest,
1634                   unsigned int num_sect,
1635                   struct iovec const *msg_sect)
1636{
1637        struct tipc_portid orig;
1638
1639        orig.ref = ref;
1640        orig.node = tipc_own_addr;
1641        return tipc_forward2port(ref, dest, num_sect, msg_sect, &orig,
1642                                 TIPC_PORT_IMPORTANCE);
1643}
1644
1645/**
1646 * tipc_forward_buf2port - forward message buffer to port identity
1647 */
1648int tipc_forward_buf2port(u32 ref,
1649                          struct tipc_portid const *dest,
1650                          struct sk_buff *buf,
1651                          unsigned int dsz,
1652                          struct tipc_portid const *orig,
1653                          unsigned int importance)
1654{
1655        struct port *p_ptr;
1656        struct tipc_msg *msg;
1657        int res;
1658
1659        p_ptr = (struct port *)tipc_ref_deref(ref);
1660        if (!p_ptr || p_ptr->publ.connected)
1661                return -EINVAL;
1662
1663        msg = &p_ptr->publ.phdr;
1664        msg_set_type(msg, TIPC_DIRECT_MSG);
1665        msg_set_orignode(msg, orig->node);
1666        msg_set_origport(msg, orig->ref);
1667        msg_set_destnode(msg, dest->node);
1668        msg_set_destport(msg, dest->ref);
1669        msg_set_hdr_sz(msg, DIR_MSG_H_SIZE);
1670        if (importance <= TIPC_CRITICAL_IMPORTANCE)
1671                msg_set_importance(msg, importance);
1672        msg_set_size(msg, DIR_MSG_H_SIZE + dsz);
1673        if (skb_cow(buf, DIR_MSG_H_SIZE))
1674                return -ENOMEM;
1675
1676        skb_push(buf, DIR_MSG_H_SIZE);
1677        skb_copy_to_linear_data(buf, msg, DIR_MSG_H_SIZE);
1678        msg_dbg(msg, "buf2port: ");
1679        p_ptr->sent++;
1680        if (dest->node == tipc_own_addr)
1681                return tipc_port_recv_msg(buf);
1682        res = tipc_send_buf_fast(buf, dest->node);
1683        if (likely(res != -ELINKCONG))
1684                return res;
1685        if (port_unreliable(p_ptr))
1686                return dsz;
1687        return -ELINKCONG;
1688}
1689
1690/**
1691 * tipc_send_buf2port - send message buffer to port identity
1692 */
1693
1694int tipc_send_buf2port(u32 ref,
1695                       struct tipc_portid const *dest,
1696                       struct sk_buff *buf,
1697                       unsigned int dsz)
1698{
1699        struct tipc_portid orig;
1700
1701        orig.ref = ref;
1702        orig.node = tipc_own_addr;
1703        return tipc_forward_buf2port(ref, dest, buf, dsz, &orig,
1704                                     TIPC_PORT_IMPORTANCE);
1705}
1706
1707