linux/fs/dlm/lowcomms.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14/*
  15 * lowcomms.c
  16 *
  17 * This is the "low-level" comms layer.
  18 *
  19 * It is responsible for sending/receiving messages
  20 * from other nodes in the cluster.
  21 *
  22 * Cluster nodes are referred to by their nodeids. nodeids are
  23 * simply 32 bit numbers to the locking module - if they need to
  24 * be expanded for the cluster infrastructure then that is its
  25 * responsibility. It is this layer's
  26 * responsibility to resolve these into IP address or
  27 * whatever it needs for inter-node communication.
  28 *
  29 * The comms level is two kernel threads that deal mainly with
  30 * the receiving of messages from other nodes and passing them
  31 * up to the mid-level comms layer (which understands the
  32 * message format) for execution by the locking core, and
  33 * a send thread which does all the setting up of connections
  34 * to remote nodes and the sending of data. Threads are not allowed
  35 * to send their own data because it may cause them to wait in times
  36 * of high load. Also, this way, the sending thread can collect together
  37 * messages bound for one node and send them in one block.
  38 *
  39 * lowcomms will choose to use either TCP or SCTP as its transport layer
  40 * depending on the configuration variable 'protocol'. This should be set
  41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
  42 * cluster-wide mechanism as it must be the same on all nodes of the cluster
  43 * for the DLM to function.
  44 *
  45 */
  46
  47#include <asm/ioctls.h>
  48#include <net/sock.h>
  49#include <net/tcp.h>
  50#include <linux/pagemap.h>
  51#include <linux/file.h>
  52#include <linux/mutex.h>
  53#include <linux/sctp.h>
  54#include <linux/slab.h>
  55#include <net/sctp/sctp.h>
  56#include <net/ipv6.h>
  57
  58#include "dlm_internal.h"
  59#include "lowcomms.h"
  60#include "midcomms.h"
  61#include "config.h"
  62
  63#define NEEDED_RMEM (4*1024*1024)
  64#define CONN_HASH_SIZE 32
  65
  66/* Number of messages to send before rescheduling */
  67#define MAX_SEND_MSG_COUNT 25
  68
  69struct cbuf {
  70        unsigned int base;
  71        unsigned int len;
  72        unsigned int mask;
  73};
  74
  75static void cbuf_add(struct cbuf *cb, int n)
  76{
  77        cb->len += n;
  78}
  79
  80static int cbuf_data(struct cbuf *cb)
  81{
  82        return ((cb->base + cb->len) & cb->mask);
  83}
  84
  85static void cbuf_init(struct cbuf *cb, int size)
  86{
  87        cb->base = cb->len = 0;
  88        cb->mask = size-1;
  89}
  90
  91static void cbuf_eat(struct cbuf *cb, int n)
  92{
  93        cb->len  -= n;
  94        cb->base += n;
  95        cb->base &= cb->mask;
  96}
  97
  98static bool cbuf_empty(struct cbuf *cb)
  99{
 100        return cb->len == 0;
 101}
 102
 103struct connection {
 104        struct socket *sock;    /* NULL if not connected */
 105        uint32_t nodeid;        /* So we know who we are in the list */
 106        struct mutex sock_mutex;
 107        unsigned long flags;
 108#define CF_READ_PENDING 1
 109#define CF_WRITE_PENDING 2
 110#define CF_CONNECT_PENDING 3
 111#define CF_INIT_PENDING 4
 112#define CF_IS_OTHERCON 5
 113#define CF_CLOSE 6
 114#define CF_APP_LIMITED 7
 115        struct list_head writequeue;  /* List of outgoing writequeue_entries */
 116        spinlock_t writequeue_lock;
 117        int (*rx_action) (struct connection *); /* What to do when active */
 118        void (*connect_action) (struct connection *);   /* What to do to connect */
 119        struct page *rx_page;
 120        struct cbuf cb;
 121        int retries;
 122#define MAX_CONNECT_RETRIES 3
 123        int sctp_assoc;
 124        struct hlist_node list;
 125        struct connection *othercon;
 126        struct work_struct rwork; /* Receive workqueue */
 127        struct work_struct swork; /* Send workqueue */
 128        bool try_new_addr;
 129};
 130#define sock2con(x) ((struct connection *)(x)->sk_user_data)
 131
 132/* An entry waiting to be sent */
 133struct writequeue_entry {
 134        struct list_head list;
 135        struct page *page;
 136        int offset;
 137        int len;
 138        int end;
 139        int users;
 140        struct connection *con;
 141};
 142
 143struct dlm_node_addr {
 144        struct list_head list;
 145        int nodeid;
 146        int addr_count;
 147        int curr_addr_index;
 148        struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
 149};
 150
 151static LIST_HEAD(dlm_node_addrs);
 152static DEFINE_SPINLOCK(dlm_node_addrs_spin);
 153
 154static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
 155static int dlm_local_count;
 156static int dlm_allow_conn;
 157
 158/* Work queues */
 159static struct workqueue_struct *recv_workqueue;
 160static struct workqueue_struct *send_workqueue;
 161
 162static struct hlist_head connection_hash[CONN_HASH_SIZE];
 163static DEFINE_MUTEX(connections_lock);
 164static struct kmem_cache *con_cache;
 165
 166static void process_recv_sockets(struct work_struct *work);
 167static void process_send_sockets(struct work_struct *work);
 168
 169
 170/* This is deliberately very simple because most clusters have simple
 171   sequential nodeids, so we should be able to go straight to a connection
 172   struct in the array */
 173static inline int nodeid_hash(int nodeid)
 174{
 175        return nodeid & (CONN_HASH_SIZE-1);
 176}
 177
 178static struct connection *__find_con(int nodeid)
 179{
 180        int r;
 181        struct connection *con;
 182
 183        r = nodeid_hash(nodeid);
 184
 185        hlist_for_each_entry(con, &connection_hash[r], list) {
 186                if (con->nodeid == nodeid)
 187                        return con;
 188        }
 189        return NULL;
 190}
 191
 192/*
 193 * If 'allocation' is zero then we don't attempt to create a new
 194 * connection structure for this node.
 195 */
 196static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
 197{
 198        struct connection *con = NULL;
 199        int r;
 200
 201        con = __find_con(nodeid);
 202        if (con || !alloc)
 203                return con;
 204
 205        con = kmem_cache_zalloc(con_cache, alloc);
 206        if (!con)
 207                return NULL;
 208
 209        r = nodeid_hash(nodeid);
 210        hlist_add_head(&con->list, &connection_hash[r]);
 211
 212        con->nodeid = nodeid;
 213        mutex_init(&con->sock_mutex);
 214        INIT_LIST_HEAD(&con->writequeue);
 215        spin_lock_init(&con->writequeue_lock);
 216        INIT_WORK(&con->swork, process_send_sockets);
 217        INIT_WORK(&con->rwork, process_recv_sockets);
 218
 219        /* Setup action pointers for child sockets */
 220        if (con->nodeid) {
 221                struct connection *zerocon = __find_con(0);
 222
 223                con->connect_action = zerocon->connect_action;
 224                if (!con->rx_action)
 225                        con->rx_action = zerocon->rx_action;
 226        }
 227
 228        return con;
 229}
 230
 231/* Loop round all connections */
 232static void foreach_conn(void (*conn_func)(struct connection *c))
 233{
 234        int i;
 235        struct hlist_node *n;
 236        struct connection *con;
 237
 238        for (i = 0; i < CONN_HASH_SIZE; i++) {
 239                hlist_for_each_entry_safe(con, n, &connection_hash[i], list)
 240                        conn_func(con);
 241        }
 242}
 243
 244static struct connection *nodeid2con(int nodeid, gfp_t allocation)
 245{
 246        struct connection *con;
 247
 248        mutex_lock(&connections_lock);
 249        con = __nodeid2con(nodeid, allocation);
 250        mutex_unlock(&connections_lock);
 251
 252        return con;
 253}
 254
 255/* This is a bit drastic, but only called when things go wrong */
 256static struct connection *assoc2con(int assoc_id)
 257{
 258        int i;
 259        struct connection *con;
 260
 261        mutex_lock(&connections_lock);
 262
 263        for (i = 0 ; i < CONN_HASH_SIZE; i++) {
 264                hlist_for_each_entry(con, &connection_hash[i], list) {
 265                        if (con->sctp_assoc == assoc_id) {
 266                                mutex_unlock(&connections_lock);
 267                                return con;
 268                        }
 269                }
 270        }
 271        mutex_unlock(&connections_lock);
 272        return NULL;
 273}
 274
 275static struct dlm_node_addr *find_node_addr(int nodeid)
 276{
 277        struct dlm_node_addr *na;
 278
 279        list_for_each_entry(na, &dlm_node_addrs, list) {
 280                if (na->nodeid == nodeid)
 281                        return na;
 282        }
 283        return NULL;
 284}
 285
 286static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
 287{
 288        switch (x->ss_family) {
 289        case AF_INET: {
 290                struct sockaddr_in *sinx = (struct sockaddr_in *)x;
 291                struct sockaddr_in *siny = (struct sockaddr_in *)y;
 292                if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
 293                        return 0;
 294                if (sinx->sin_port != siny->sin_port)
 295                        return 0;
 296                break;
 297        }
 298        case AF_INET6: {
 299                struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
 300                struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
 301                if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
 302                        return 0;
 303                if (sinx->sin6_port != siny->sin6_port)
 304                        return 0;
 305                break;
 306        }
 307        default:
 308                return 0;
 309        }
 310        return 1;
 311}
 312
 313static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
 314                          struct sockaddr *sa_out, bool try_new_addr)
 315{
 316        struct sockaddr_storage sas;
 317        struct dlm_node_addr *na;
 318
 319        if (!dlm_local_count)
 320                return -1;
 321
 322        spin_lock(&dlm_node_addrs_spin);
 323        na = find_node_addr(nodeid);
 324        if (na && na->addr_count) {
 325                if (try_new_addr) {
 326                        na->curr_addr_index++;
 327                        if (na->curr_addr_index == na->addr_count)
 328                                na->curr_addr_index = 0;
 329                }
 330
 331                memcpy(&sas, na->addr[na->curr_addr_index ],
 332                        sizeof(struct sockaddr_storage));
 333        }
 334        spin_unlock(&dlm_node_addrs_spin);
 335
 336        if (!na)
 337                return -EEXIST;
 338
 339        if (!na->addr_count)
 340                return -ENOENT;
 341
 342        if (sas_out)
 343                memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
 344
 345        if (!sa_out)
 346                return 0;
 347
 348        if (dlm_local_addr[0]->ss_family == AF_INET) {
 349                struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
 350                struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
 351                ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
 352        } else {
 353                struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
 354                struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
 355                ret6->sin6_addr = in6->sin6_addr;
 356        }
 357
 358        return 0;
 359}
 360
 361static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
 362{
 363        struct dlm_node_addr *na;
 364        int rv = -EEXIST;
 365        int addr_i;
 366
 367        spin_lock(&dlm_node_addrs_spin);
 368        list_for_each_entry(na, &dlm_node_addrs, list) {
 369                if (!na->addr_count)
 370                        continue;
 371
 372                for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
 373                        if (addr_compare(na->addr[addr_i], addr)) {
 374                                *nodeid = na->nodeid;
 375                                rv = 0;
 376                                goto unlock;
 377                        }
 378                }
 379        }
 380unlock:
 381        spin_unlock(&dlm_node_addrs_spin);
 382        return rv;
 383}
 384
 385int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
 386{
 387        struct sockaddr_storage *new_addr;
 388        struct dlm_node_addr *new_node, *na;
 389
 390        new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
 391        if (!new_node)
 392                return -ENOMEM;
 393
 394        new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
 395        if (!new_addr) {
 396                kfree(new_node);
 397                return -ENOMEM;
 398        }
 399
 400        memcpy(new_addr, addr, len);
 401
 402        spin_lock(&dlm_node_addrs_spin);
 403        na = find_node_addr(nodeid);
 404        if (!na) {
 405                new_node->nodeid = nodeid;
 406                new_node->addr[0] = new_addr;
 407                new_node->addr_count = 1;
 408                list_add(&new_node->list, &dlm_node_addrs);
 409                spin_unlock(&dlm_node_addrs_spin);
 410                return 0;
 411        }
 412
 413        if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
 414                spin_unlock(&dlm_node_addrs_spin);
 415                kfree(new_addr);
 416                kfree(new_node);
 417                return -ENOSPC;
 418        }
 419
 420        na->addr[na->addr_count++] = new_addr;
 421        spin_unlock(&dlm_node_addrs_spin);
 422        kfree(new_node);
 423        return 0;
 424}
 425
 426/* Data available on socket or listen socket received a connect */
 427static void lowcomms_data_ready(struct sock *sk, int count_unused)
 428{
 429        struct connection *con = sock2con(sk);
 430        if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
 431                queue_work(recv_workqueue, &con->rwork);
 432}
 433
 434static void lowcomms_write_space(struct sock *sk)
 435{
 436        struct connection *con = sock2con(sk);
 437
 438        if (!con)
 439                return;
 440
 441        clear_bit(SOCK_NOSPACE, &con->sock->flags);
 442
 443        if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
 444                con->sock->sk->sk_write_pending--;
 445                clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags);
 446        }
 447
 448        if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
 449                queue_work(send_workqueue, &con->swork);
 450}
 451
 452static inline void lowcomms_connect_sock(struct connection *con)
 453{
 454        if (test_bit(CF_CLOSE, &con->flags))
 455                return;
 456        if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
 457                queue_work(send_workqueue, &con->swork);
 458}
 459
 460static void lowcomms_state_change(struct sock *sk)
 461{
 462        if (sk->sk_state == TCP_ESTABLISHED)
 463                lowcomms_write_space(sk);
 464}
 465
 466int dlm_lowcomms_connect_node(int nodeid)
 467{
 468        struct connection *con;
 469
 470        /* with sctp there's no connecting without sending */
 471        if (dlm_config.ci_protocol != 0)
 472                return 0;
 473
 474        if (nodeid == dlm_our_nodeid())
 475                return 0;
 476
 477        con = nodeid2con(nodeid, GFP_NOFS);
 478        if (!con)
 479                return -ENOMEM;
 480        lowcomms_connect_sock(con);
 481        return 0;
 482}
 483
 484/* Make a socket active */
 485static void add_sock(struct socket *sock, struct connection *con)
 486{
 487        con->sock = sock;
 488
 489        /* Install a data_ready callback */
 490        con->sock->sk->sk_data_ready = lowcomms_data_ready;
 491        con->sock->sk->sk_write_space = lowcomms_write_space;
 492        con->sock->sk->sk_state_change = lowcomms_state_change;
 493        con->sock->sk->sk_user_data = con;
 494        con->sock->sk->sk_allocation = GFP_NOFS;
 495}
 496
 497/* Add the port number to an IPv6 or 4 sockaddr and return the address
 498   length */
 499static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
 500                          int *addr_len)
 501{
 502        saddr->ss_family =  dlm_local_addr[0]->ss_family;
 503        if (saddr->ss_family == AF_INET) {
 504                struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
 505                in4_addr->sin_port = cpu_to_be16(port);
 506                *addr_len = sizeof(struct sockaddr_in);
 507                memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
 508        } else {
 509                struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
 510                in6_addr->sin6_port = cpu_to_be16(port);
 511                *addr_len = sizeof(struct sockaddr_in6);
 512        }
 513        memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
 514}
 515
 516/* Close a remote connection and tidy up */
 517static void close_connection(struct connection *con, bool and_other)
 518{
 519        mutex_lock(&con->sock_mutex);
 520
 521        if (con->sock) {
 522                sock_release(con->sock);
 523                con->sock = NULL;
 524        }
 525        if (con->othercon && and_other) {
 526                /* Will only re-enter once. */
 527                close_connection(con->othercon, false);
 528        }
 529        if (con->rx_page) {
 530                __free_page(con->rx_page);
 531                con->rx_page = NULL;
 532        }
 533
 534        con->retries = 0;
 535        mutex_unlock(&con->sock_mutex);
 536}
 537
 538/* We only send shutdown messages to nodes that are not part of the cluster */
 539static void sctp_send_shutdown(sctp_assoc_t associd)
 540{
 541        static char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
 542        struct msghdr outmessage;
 543        struct cmsghdr *cmsg;
 544        struct sctp_sndrcvinfo *sinfo;
 545        int ret;
 546        struct connection *con;
 547
 548        con = nodeid2con(0,0);
 549        BUG_ON(con == NULL);
 550
 551        outmessage.msg_name = NULL;
 552        outmessage.msg_namelen = 0;
 553        outmessage.msg_control = outcmsg;
 554        outmessage.msg_controllen = sizeof(outcmsg);
 555        outmessage.msg_flags = MSG_EOR;
 556
 557        cmsg = CMSG_FIRSTHDR(&outmessage);
 558        cmsg->cmsg_level = IPPROTO_SCTP;
 559        cmsg->cmsg_type = SCTP_SNDRCV;
 560        cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
 561        outmessage.msg_controllen = cmsg->cmsg_len;
 562        sinfo = CMSG_DATA(cmsg);
 563        memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
 564
 565        sinfo->sinfo_flags |= MSG_EOF;
 566        sinfo->sinfo_assoc_id = associd;
 567
 568        ret = kernel_sendmsg(con->sock, &outmessage, NULL, 0, 0);
 569
 570        if (ret != 0)
 571                log_print("send EOF to node failed: %d", ret);
 572}
 573
 574static void sctp_init_failed_foreach(struct connection *con)
 575{
 576
 577        /*
 578         * Don't try to recover base con and handle race where the
 579         * other node's assoc init creates a assoc and we get that
 580         * notification, then we get a notification that our attempt
 581         * failed due. This happens when we are still trying the primary
 582         * address, but the other node has already tried secondary addrs
 583         * and found one that worked.
 584         */
 585        if (!con->nodeid || con->sctp_assoc)
 586                return;
 587
 588        log_print("Retrying SCTP association init for node %d\n", con->nodeid);
 589
 590        con->try_new_addr = true;
 591        con->sctp_assoc = 0;
 592        if (test_and_clear_bit(CF_INIT_PENDING, &con->flags)) {
 593                if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
 594                        queue_work(send_workqueue, &con->swork);
 595        }
 596}
 597
 598/* INIT failed but we don't know which node...
 599   restart INIT on all pending nodes */
 600static void sctp_init_failed(void)
 601{
 602        mutex_lock(&connections_lock);
 603
 604        foreach_conn(sctp_init_failed_foreach);
 605
 606        mutex_unlock(&connections_lock);
 607}
 608
 609static void retry_failed_sctp_send(struct connection *recv_con,
 610                                   struct sctp_send_failed *sn_send_failed,
 611                                   char *buf)
 612{
 613        int len = sn_send_failed->ssf_length - sizeof(struct sctp_send_failed);
 614        struct dlm_mhandle *mh;
 615        struct connection *con;
 616        char *retry_buf;
 617        int nodeid = sn_send_failed->ssf_info.sinfo_ppid;
 618
 619        log_print("Retry sending %d bytes to node id %d", len, nodeid);
 620
 621        con = nodeid2con(nodeid, 0);
 622        if (!con) {
 623                log_print("Could not look up con for nodeid %d\n",
 624                          nodeid);
 625                return;
 626        }
 627
 628        mh = dlm_lowcomms_get_buffer(nodeid, len, GFP_NOFS, &retry_buf);
 629        if (!mh) {
 630                log_print("Could not allocate buf for retry.");
 631                return;
 632        }
 633        memcpy(retry_buf, buf + sizeof(struct sctp_send_failed), len);
 634        dlm_lowcomms_commit_buffer(mh);
 635
 636        /*
 637         * If we got a assoc changed event before the send failed event then
 638         * we only need to retry the send.
 639         */
 640        if (con->sctp_assoc) {
 641                if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
 642                        queue_work(send_workqueue, &con->swork);
 643        } else
 644                sctp_init_failed_foreach(con);
 645}
 646
 647/* Something happened to an association */
 648static void process_sctp_notification(struct connection *con,
 649                                      struct msghdr *msg, char *buf)
 650{
 651        union sctp_notification *sn = (union sctp_notification *)buf;
 652        struct linger linger;
 653
 654        switch (sn->sn_header.sn_type) {
 655        case SCTP_SEND_FAILED:
 656                retry_failed_sctp_send(con, &sn->sn_send_failed, buf);
 657                break;
 658        case SCTP_ASSOC_CHANGE:
 659                switch (sn->sn_assoc_change.sac_state) {
 660                case SCTP_COMM_UP:
 661                case SCTP_RESTART:
 662                {
 663                        /* Check that the new node is in the lockspace */
 664                        struct sctp_prim prim;
 665                        int nodeid;
 666                        int prim_len, ret;
 667                        int addr_len;
 668                        struct connection *new_con;
 669
 670                        /*
 671                         * We get this before any data for an association.
 672                         * We verify that the node is in the cluster and
 673                         * then peel off a socket for it.
 674                         */
 675                        if ((int)sn->sn_assoc_change.sac_assoc_id <= 0) {
 676                                log_print("COMM_UP for invalid assoc ID %d",
 677                                         (int)sn->sn_assoc_change.sac_assoc_id);
 678                                sctp_init_failed();
 679                                return;
 680                        }
 681                        memset(&prim, 0, sizeof(struct sctp_prim));
 682                        prim_len = sizeof(struct sctp_prim);
 683                        prim.ssp_assoc_id = sn->sn_assoc_change.sac_assoc_id;
 684
 685                        ret = kernel_getsockopt(con->sock,
 686                                                IPPROTO_SCTP,
 687                                                SCTP_PRIMARY_ADDR,
 688                                                (char*)&prim,
 689                                                &prim_len);
 690                        if (ret < 0) {
 691                                log_print("getsockopt/sctp_primary_addr on "
 692                                          "new assoc %d failed : %d",
 693                                          (int)sn->sn_assoc_change.sac_assoc_id,
 694                                          ret);
 695
 696                                /* Retry INIT later */
 697                                new_con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
 698                                if (new_con)
 699                                        clear_bit(CF_CONNECT_PENDING, &con->flags);
 700                                return;
 701                        }
 702                        make_sockaddr(&prim.ssp_addr, 0, &addr_len);
 703                        if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
 704                                unsigned char *b=(unsigned char *)&prim.ssp_addr;
 705                                log_print("reject connect from unknown addr");
 706                                print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 
 707                                                     b, sizeof(struct sockaddr_storage));
 708                                sctp_send_shutdown(prim.ssp_assoc_id);
 709                                return;
 710                        }
 711
 712                        new_con = nodeid2con(nodeid, GFP_NOFS);
 713                        if (!new_con)
 714                                return;
 715
 716                        /* Peel off a new sock */
 717                        lock_sock(con->sock->sk);
 718                        ret = sctp_do_peeloff(con->sock->sk,
 719                                sn->sn_assoc_change.sac_assoc_id,
 720                                &new_con->sock);
 721                        release_sock(con->sock->sk);
 722                        if (ret < 0) {
 723                                log_print("Can't peel off a socket for "
 724                                          "connection %d to node %d: err=%d",
 725                                          (int)sn->sn_assoc_change.sac_assoc_id,
 726                                          nodeid, ret);
 727                                return;
 728                        }
 729                        add_sock(new_con->sock, new_con);
 730
 731                        linger.l_onoff = 1;
 732                        linger.l_linger = 0;
 733                        ret = kernel_setsockopt(new_con->sock, SOL_SOCKET, SO_LINGER,
 734                                                (char *)&linger, sizeof(linger));
 735                        if (ret < 0)
 736                                log_print("set socket option SO_LINGER failed");
 737
 738                        log_print("connecting to %d sctp association %d",
 739                                 nodeid, (int)sn->sn_assoc_change.sac_assoc_id);
 740
 741                        new_con->sctp_assoc = sn->sn_assoc_change.sac_assoc_id;
 742                        new_con->try_new_addr = false;
 743                        /* Send any pending writes */
 744                        clear_bit(CF_CONNECT_PENDING, &new_con->flags);
 745                        clear_bit(CF_INIT_PENDING, &new_con->flags);
 746                        if (!test_and_set_bit(CF_WRITE_PENDING, &new_con->flags)) {
 747                                queue_work(send_workqueue, &new_con->swork);
 748                        }
 749                        if (!test_and_set_bit(CF_READ_PENDING, &new_con->flags))
 750                                queue_work(recv_workqueue, &new_con->rwork);
 751                }
 752                break;
 753
 754                case SCTP_COMM_LOST:
 755                case SCTP_SHUTDOWN_COMP:
 756                {
 757                        con = assoc2con(sn->sn_assoc_change.sac_assoc_id);
 758                        if (con) {
 759                                con->sctp_assoc = 0;
 760                        }
 761                }
 762                break;
 763
 764                case SCTP_CANT_STR_ASSOC:
 765                {
 766                        /* Will retry init when we get the send failed notification */
 767                        log_print("Can't start SCTP association - retrying");
 768                }
 769                break;
 770
 771                default:
 772                        log_print("unexpected SCTP assoc change id=%d state=%d",
 773                                  (int)sn->sn_assoc_change.sac_assoc_id,
 774                                  sn->sn_assoc_change.sac_state);
 775                }
 776        default:
 777                ; /* fall through */
 778        }
 779}
 780
 781/* Data received from remote end */
 782static int receive_from_sock(struct connection *con)
 783{
 784        int ret = 0;
 785        struct msghdr msg = {};
 786        struct kvec iov[2];
 787        unsigned len;
 788        int r;
 789        int call_again_soon = 0;
 790        int nvec;
 791        char incmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
 792
 793        mutex_lock(&con->sock_mutex);
 794
 795        if (con->sock == NULL) {
 796                ret = -EAGAIN;
 797                goto out_close;
 798        }
 799
 800        if (con->rx_page == NULL) {
 801                /*
 802                 * This doesn't need to be atomic, but I think it should
 803                 * improve performance if it is.
 804                 */
 805                con->rx_page = alloc_page(GFP_ATOMIC);
 806                if (con->rx_page == NULL)
 807                        goto out_resched;
 808                cbuf_init(&con->cb, PAGE_CACHE_SIZE);
 809        }
 810
 811        /* Only SCTP needs these really */
 812        memset(&incmsg, 0, sizeof(incmsg));
 813        msg.msg_control = incmsg;
 814        msg.msg_controllen = sizeof(incmsg);
 815
 816        /*
 817         * iov[0] is the bit of the circular buffer between the current end
 818         * point (cb.base + cb.len) and the end of the buffer.
 819         */
 820        iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
 821        iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
 822        iov[1].iov_len = 0;
 823        nvec = 1;
 824
 825        /*
 826         * iov[1] is the bit of the circular buffer between the start of the
 827         * buffer and the start of the currently used section (cb.base)
 828         */
 829        if (cbuf_data(&con->cb) >= con->cb.base) {
 830                iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
 831                iov[1].iov_len = con->cb.base;
 832                iov[1].iov_base = page_address(con->rx_page);
 833                nvec = 2;
 834        }
 835        len = iov[0].iov_len + iov[1].iov_len;
 836
 837        r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
 838                               MSG_DONTWAIT | MSG_NOSIGNAL);
 839        if (ret <= 0)
 840                goto out_close;
 841
 842        /* Process SCTP notifications */
 843        if (msg.msg_flags & MSG_NOTIFICATION) {
 844                msg.msg_control = incmsg;
 845                msg.msg_controllen = sizeof(incmsg);
 846
 847                process_sctp_notification(con, &msg,
 848                                page_address(con->rx_page) + con->cb.base);
 849                mutex_unlock(&con->sock_mutex);
 850                return 0;
 851        }
 852        BUG_ON(con->nodeid == 0);
 853
 854        if (ret == len)
 855                call_again_soon = 1;
 856        cbuf_add(&con->cb, ret);
 857        ret = dlm_process_incoming_buffer(con->nodeid,
 858                                          page_address(con->rx_page),
 859                                          con->cb.base, con->cb.len,
 860                                          PAGE_CACHE_SIZE);
 861        if (ret == -EBADMSG) {
 862                log_print("lowcomms: addr=%p, base=%u, len=%u, "
 863                          "iov_len=%u, iov_base[0]=%p, read=%d",
 864                          page_address(con->rx_page), con->cb.base, con->cb.len,
 865                          len, iov[0].iov_base, r);
 866        }
 867        if (ret < 0)
 868                goto out_close;
 869        cbuf_eat(&con->cb, ret);
 870
 871        if (cbuf_empty(&con->cb) && !call_again_soon) {
 872                __free_page(con->rx_page);
 873                con->rx_page = NULL;
 874        }
 875
 876        if (call_again_soon)
 877                goto out_resched;
 878        mutex_unlock(&con->sock_mutex);
 879        return 0;
 880
 881out_resched:
 882        if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
 883                queue_work(recv_workqueue, &con->rwork);
 884        mutex_unlock(&con->sock_mutex);
 885        return -EAGAIN;
 886
 887out_close:
 888        mutex_unlock(&con->sock_mutex);
 889        if (ret != -EAGAIN) {
 890                close_connection(con, false);
 891                /* Reconnect when there is something to send */
 892        }
 893        /* Don't return success if we really got EOF */
 894        if (ret == 0)
 895                ret = -EAGAIN;
 896
 897        return ret;
 898}
 899
 900/* Listening socket is busy, accept a connection */
 901static int tcp_accept_from_sock(struct connection *con)
 902{
 903        int result;
 904        struct sockaddr_storage peeraddr;
 905        struct socket *newsock;
 906        int len;
 907        int nodeid;
 908        struct connection *newcon;
 909        struct connection *addcon;
 910
 911        mutex_lock(&connections_lock);
 912        if (!dlm_allow_conn) {
 913                mutex_unlock(&connections_lock);
 914                return -1;
 915        }
 916        mutex_unlock(&connections_lock);
 917
 918        memset(&peeraddr, 0, sizeof(peeraddr));
 919        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
 920                                  IPPROTO_TCP, &newsock);
 921        if (result < 0)
 922                return -ENOMEM;
 923
 924        mutex_lock_nested(&con->sock_mutex, 0);
 925
 926        result = -ENOTCONN;
 927        if (con->sock == NULL)
 928                goto accept_err;
 929
 930        newsock->type = con->sock->type;
 931        newsock->ops = con->sock->ops;
 932
 933        result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
 934        if (result < 0)
 935                goto accept_err;
 936
 937        /* Get the connected socket's peer */
 938        memset(&peeraddr, 0, sizeof(peeraddr));
 939        if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
 940                                  &len, 2)) {
 941                result = -ECONNABORTED;
 942                goto accept_err;
 943        }
 944
 945        /* Get the new node's NODEID */
 946        make_sockaddr(&peeraddr, 0, &len);
 947        if (addr_to_nodeid(&peeraddr, &nodeid)) {
 948                unsigned char *b=(unsigned char *)&peeraddr;
 949                log_print("connect from non cluster node");
 950                print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 
 951                                     b, sizeof(struct sockaddr_storage));
 952                sock_release(newsock);
 953                mutex_unlock(&con->sock_mutex);
 954                return -1;
 955        }
 956
 957        log_print("got connection from %d", nodeid);
 958
 959        /*  Check to see if we already have a connection to this node. This
 960         *  could happen if the two nodes initiate a connection at roughly
 961         *  the same time and the connections cross on the wire.
 962         *  In this case we store the incoming one in "othercon"
 963         */
 964        newcon = nodeid2con(nodeid, GFP_NOFS);
 965        if (!newcon) {
 966                result = -ENOMEM;
 967                goto accept_err;
 968        }
 969        mutex_lock_nested(&newcon->sock_mutex, 1);
 970        if (newcon->sock) {
 971                struct connection *othercon = newcon->othercon;
 972
 973                if (!othercon) {
 974                        othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
 975                        if (!othercon) {
 976                                log_print("failed to allocate incoming socket");
 977                                mutex_unlock(&newcon->sock_mutex);
 978                                result = -ENOMEM;
 979                                goto accept_err;
 980                        }
 981                        othercon->nodeid = nodeid;
 982                        othercon->rx_action = receive_from_sock;
 983                        mutex_init(&othercon->sock_mutex);
 984                        INIT_WORK(&othercon->swork, process_send_sockets);
 985                        INIT_WORK(&othercon->rwork, process_recv_sockets);
 986                        set_bit(CF_IS_OTHERCON, &othercon->flags);
 987                }
 988                if (!othercon->sock) {
 989                        newcon->othercon = othercon;
 990                        othercon->sock = newsock;
 991                        newsock->sk->sk_user_data = othercon;
 992                        add_sock(newsock, othercon);
 993                        addcon = othercon;
 994                }
 995                else {
 996                        printk("Extra connection from node %d attempted\n", nodeid);
 997                        result = -EAGAIN;
 998                        mutex_unlock(&newcon->sock_mutex);
 999                        goto accept_err;
1000                }
1001        }
1002        else {
1003                newsock->sk->sk_user_data = newcon;
1004                newcon->rx_action = receive_from_sock;
1005                add_sock(newsock, newcon);
1006                addcon = newcon;
1007        }
1008
1009        mutex_unlock(&newcon->sock_mutex);
1010
1011        /*
1012         * Add it to the active queue in case we got data
1013         * between processing the accept adding the socket
1014         * to the read_sockets list
1015         */
1016        if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
1017                queue_work(recv_workqueue, &addcon->rwork);
1018        mutex_unlock(&con->sock_mutex);
1019
1020        return 0;
1021
1022accept_err:
1023        mutex_unlock(&con->sock_mutex);
1024        sock_release(newsock);
1025
1026        if (result != -EAGAIN)
1027                log_print("error accepting connection from node: %d", result);
1028        return result;
1029}
1030
1031static void free_entry(struct writequeue_entry *e)
1032{
1033        __free_page(e->page);
1034        kfree(e);
1035}
1036
1037/*
1038 * writequeue_entry_complete - try to delete and free write queue entry
1039 * @e: write queue entry to try to delete
1040 * @completed: bytes completed
1041 *
1042 * writequeue_lock must be held.
1043 */
1044static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
1045{
1046        e->offset += completed;
1047        e->len -= completed;
1048
1049        if (e->len == 0 && e->users == 0) {
1050                list_del(&e->list);
1051                free_entry(e);
1052        }
1053}
1054
1055/* Initiate an SCTP association.
1056   This is a special case of send_to_sock() in that we don't yet have a
1057   peeled-off socket for this association, so we use the listening socket
1058   and add the primary IP address of the remote node.
1059 */
1060static void sctp_init_assoc(struct connection *con)
1061{
1062        struct sockaddr_storage rem_addr;
1063        char outcmsg[CMSG_SPACE(sizeof(struct sctp_sndrcvinfo))];
1064        struct msghdr outmessage;
1065        struct cmsghdr *cmsg;
1066        struct sctp_sndrcvinfo *sinfo;
1067        struct connection *base_con;
1068        struct writequeue_entry *e;
1069        int len, offset;
1070        int ret;
1071        int addrlen;
1072        struct kvec iov[1];
1073
1074        mutex_lock(&con->sock_mutex);
1075        if (test_and_set_bit(CF_INIT_PENDING, &con->flags))
1076                goto unlock;
1077
1078        if (nodeid_to_addr(con->nodeid, NULL, (struct sockaddr *)&rem_addr,
1079                           con->try_new_addr)) {
1080                log_print("no address for nodeid %d", con->nodeid);
1081                goto unlock;
1082        }
1083        base_con = nodeid2con(0, 0);
1084        BUG_ON(base_con == NULL);
1085
1086        make_sockaddr(&rem_addr, dlm_config.ci_tcp_port, &addrlen);
1087
1088        outmessage.msg_name = &rem_addr;
1089        outmessage.msg_namelen = addrlen;
1090        outmessage.msg_control = outcmsg;
1091        outmessage.msg_controllen = sizeof(outcmsg);
1092        outmessage.msg_flags = MSG_EOR;
1093
1094        spin_lock(&con->writequeue_lock);
1095
1096        if (list_empty(&con->writequeue)) {
1097                spin_unlock(&con->writequeue_lock);
1098                log_print("writequeue empty for nodeid %d", con->nodeid);
1099                goto unlock;
1100        }
1101
1102        e = list_first_entry(&con->writequeue, struct writequeue_entry, list);
1103        len = e->len;
1104        offset = e->offset;
1105
1106        /* Send the first block off the write queue */
1107        iov[0].iov_base = page_address(e->page)+offset;
1108        iov[0].iov_len = len;
1109        spin_unlock(&con->writequeue_lock);
1110
1111        if (rem_addr.ss_family == AF_INET) {
1112                struct sockaddr_in *sin = (struct sockaddr_in *)&rem_addr;
1113                log_print("Trying to connect to %pI4", &sin->sin_addr.s_addr);
1114        } else {
1115                struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&rem_addr;
1116                log_print("Trying to connect to %pI6", &sin6->sin6_addr);
1117        }
1118
1119        cmsg = CMSG_FIRSTHDR(&outmessage);
1120        cmsg->cmsg_level = IPPROTO_SCTP;
1121        cmsg->cmsg_type = SCTP_SNDRCV;
1122        cmsg->cmsg_len = CMSG_LEN(sizeof(struct sctp_sndrcvinfo));
1123        sinfo = CMSG_DATA(cmsg);
1124        memset(sinfo, 0x00, sizeof(struct sctp_sndrcvinfo));
1125        sinfo->sinfo_ppid = cpu_to_le32(con->nodeid);
1126        outmessage.msg_controllen = cmsg->cmsg_len;
1127        sinfo->sinfo_flags |= SCTP_ADDR_OVER;
1128
1129        ret = kernel_sendmsg(base_con->sock, &outmessage, iov, 1, len);
1130        if (ret < 0) {
1131                log_print("Send first packet to node %d failed: %d",
1132                          con->nodeid, ret);
1133
1134                /* Try again later */
1135                clear_bit(CF_CONNECT_PENDING, &con->flags);
1136                clear_bit(CF_INIT_PENDING, &con->flags);
1137        }
1138        else {
1139                spin_lock(&con->writequeue_lock);
1140                writequeue_entry_complete(e, ret);
1141                spin_unlock(&con->writequeue_lock);
1142        }
1143
1144unlock:
1145        mutex_unlock(&con->sock_mutex);
1146}
1147
1148/* Connect a new socket to its peer */
1149static void tcp_connect_to_sock(struct connection *con)
1150{
1151        struct sockaddr_storage saddr, src_addr;
1152        int addr_len;
1153        struct socket *sock = NULL;
1154        int one = 1;
1155        int result;
1156
1157        if (con->nodeid == 0) {
1158                log_print("attempt to connect sock 0 foiled");
1159                return;
1160        }
1161
1162        mutex_lock(&con->sock_mutex);
1163        if (con->retries++ > MAX_CONNECT_RETRIES)
1164                goto out;
1165
1166        /* Some odd races can cause double-connects, ignore them */
1167        if (con->sock)
1168                goto out;
1169
1170        /* Create a socket to communicate with */
1171        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1172                                  IPPROTO_TCP, &sock);
1173        if (result < 0)
1174                goto out_err;
1175
1176        memset(&saddr, 0, sizeof(saddr));
1177        result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
1178        if (result < 0) {
1179                log_print("no address for nodeid %d", con->nodeid);
1180                goto out_err;
1181        }
1182
1183        sock->sk->sk_user_data = con;
1184        con->rx_action = receive_from_sock;
1185        con->connect_action = tcp_connect_to_sock;
1186        add_sock(sock, con);
1187
1188        /* Bind to our cluster-known address connecting to avoid
1189           routing problems */
1190        memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1191        make_sockaddr(&src_addr, 0, &addr_len);
1192        result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1193                                 addr_len);
1194        if (result < 0) {
1195                log_print("could not bind for connect: %d", result);
1196                /* This *may* not indicate a critical error */
1197        }
1198
1199        make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
1200
1201        log_print("connecting to %d", con->nodeid);
1202
1203        /* Turn off Nagle's algorithm */
1204        kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1205                          sizeof(one));
1206
1207        result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
1208                                   O_NONBLOCK);
1209        if (result == -EINPROGRESS)
1210                result = 0;
1211        if (result == 0)
1212                goto out;
1213
1214out_err:
1215        if (con->sock) {
1216                sock_release(con->sock);
1217                con->sock = NULL;
1218        } else if (sock) {
1219                sock_release(sock);
1220        }
1221        /*
1222         * Some errors are fatal and this list might need adjusting. For other
1223         * errors we try again until the max number of retries is reached.
1224         */
1225        if (result != -EHOSTUNREACH &&
1226            result != -ENETUNREACH &&
1227            result != -ENETDOWN && 
1228            result != -EINVAL &&
1229            result != -EPROTONOSUPPORT) {
1230                log_print("connect %d try %d error %d", con->nodeid,
1231                          con->retries, result);
1232                mutex_unlock(&con->sock_mutex);
1233                msleep(1000);
1234                lowcomms_connect_sock(con);
1235                return;
1236        }
1237out:
1238        mutex_unlock(&con->sock_mutex);
1239        return;
1240}
1241
1242static struct socket *tcp_create_listen_sock(struct connection *con,
1243                                             struct sockaddr_storage *saddr)
1244{
1245        struct socket *sock = NULL;
1246        int result = 0;
1247        int one = 1;
1248        int addr_len;
1249
1250        if (dlm_local_addr[0]->ss_family == AF_INET)
1251                addr_len = sizeof(struct sockaddr_in);
1252        else
1253                addr_len = sizeof(struct sockaddr_in6);
1254
1255        /* Create a socket to communicate with */
1256        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1257                                  IPPROTO_TCP, &sock);
1258        if (result < 0) {
1259                log_print("Can't create listening comms socket");
1260                goto create_out;
1261        }
1262
1263        /* Turn off Nagle's algorithm */
1264        kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1265                          sizeof(one));
1266
1267        result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1268                                   (char *)&one, sizeof(one));
1269
1270        if (result < 0) {
1271                log_print("Failed to set SO_REUSEADDR on socket: %d", result);
1272        }
1273        con->rx_action = tcp_accept_from_sock;
1274        con->connect_action = tcp_connect_to_sock;
1275
1276        /* Bind to our port */
1277        make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1278        result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1279        if (result < 0) {
1280                log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1281                sock_release(sock);
1282                sock = NULL;
1283                con->sock = NULL;
1284                goto create_out;
1285        }
1286        result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
1287                                 (char *)&one, sizeof(one));
1288        if (result < 0) {
1289                log_print("Set keepalive failed: %d", result);
1290        }
1291
1292        result = sock->ops->listen(sock, 5);
1293        if (result < 0) {
1294                log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1295                sock_release(sock);
1296                sock = NULL;
1297                goto create_out;
1298        }
1299
1300create_out:
1301        return sock;
1302}
1303
1304/* Get local addresses */
1305static void init_local(void)
1306{
1307        struct sockaddr_storage sas, *addr;
1308        int i;
1309
1310        dlm_local_count = 0;
1311        for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1312                if (dlm_our_addr(&sas, i))
1313                        break;
1314
1315                addr = kmalloc(sizeof(*addr), GFP_NOFS);
1316                if (!addr)
1317                        break;
1318                memcpy(addr, &sas, sizeof(*addr));
1319                dlm_local_addr[dlm_local_count++] = addr;
1320        }
1321}
1322
1323/* Bind to an IP address. SCTP allows multiple address so it can do
1324   multi-homing */
1325static int add_sctp_bind_addr(struct connection *sctp_con,
1326                              struct sockaddr_storage *addr,
1327                              int addr_len, int num)
1328{
1329        int result = 0;
1330
1331        if (num == 1)
1332                result = kernel_bind(sctp_con->sock,
1333                                     (struct sockaddr *) addr,
1334                                     addr_len);
1335        else
1336                result = kernel_setsockopt(sctp_con->sock, SOL_SCTP,
1337                                           SCTP_SOCKOPT_BINDX_ADD,
1338                                           (char *)addr, addr_len);
1339
1340        if (result < 0)
1341                log_print("Can't bind to port %d addr number %d",
1342                          dlm_config.ci_tcp_port, num);
1343
1344        return result;
1345}
1346
1347/* Initialise SCTP socket and bind to all interfaces */
1348static int sctp_listen_for_all(void)
1349{
1350        struct socket *sock = NULL;
1351        struct sockaddr_storage localaddr;
1352        struct sctp_event_subscribe subscribe;
1353        int result = -EINVAL, num = 1, i, addr_len;
1354        struct connection *con = nodeid2con(0, GFP_NOFS);
1355        int bufsize = NEEDED_RMEM;
1356        int one = 1;
1357
1358        if (!con)
1359                return -ENOMEM;
1360
1361        log_print("Using SCTP for communications");
1362
1363        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_SEQPACKET,
1364                                  IPPROTO_SCTP, &sock);
1365        if (result < 0) {
1366                log_print("Can't create comms socket, check SCTP is loaded");
1367                goto out;
1368        }
1369
1370        /* Listen for events */
1371        memset(&subscribe, 0, sizeof(subscribe));
1372        subscribe.sctp_data_io_event = 1;
1373        subscribe.sctp_association_event = 1;
1374        subscribe.sctp_send_failure_event = 1;
1375        subscribe.sctp_shutdown_event = 1;
1376        subscribe.sctp_partial_delivery_event = 1;
1377
1378        result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1379                                 (char *)&bufsize, sizeof(bufsize));
1380        if (result)
1381                log_print("Error increasing buffer space on socket %d", result);
1382
1383        result = kernel_setsockopt(sock, SOL_SCTP, SCTP_EVENTS,
1384                                   (char *)&subscribe, sizeof(subscribe));
1385        if (result < 0) {
1386                log_print("Failed to set SCTP_EVENTS on socket: result=%d",
1387                          result);
1388                goto create_delsock;
1389        }
1390
1391        result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
1392                                   sizeof(one));
1393        if (result < 0)
1394                log_print("Could not set SCTP NODELAY error %d\n", result);
1395
1396        /* Init con struct */
1397        sock->sk->sk_user_data = con;
1398        con->sock = sock;
1399        con->sock->sk->sk_data_ready = lowcomms_data_ready;
1400        con->rx_action = receive_from_sock;
1401        con->connect_action = sctp_init_assoc;
1402
1403        /* Bind to all interfaces. */
1404        for (i = 0; i < dlm_local_count; i++) {
1405                memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
1406                make_sockaddr(&localaddr, dlm_config.ci_tcp_port, &addr_len);
1407
1408                result = add_sctp_bind_addr(con, &localaddr, addr_len, num);
1409                if (result)
1410                        goto create_delsock;
1411                ++num;
1412        }
1413
1414        result = sock->ops->listen(sock, 5);
1415        if (result < 0) {
1416                log_print("Can't set socket listening");
1417                goto create_delsock;
1418        }
1419
1420        return 0;
1421
1422create_delsock:
1423        sock_release(sock);
1424        con->sock = NULL;
1425out:
1426        return result;
1427}
1428
1429static int tcp_listen_for_all(void)
1430{
1431        struct socket *sock = NULL;
1432        struct connection *con = nodeid2con(0, GFP_NOFS);
1433        int result = -EINVAL;
1434
1435        if (!con)
1436                return -ENOMEM;
1437
1438        /* We don't support multi-homed hosts */
1439        if (dlm_local_addr[1] != NULL) {
1440                log_print("TCP protocol can't handle multi-homed hosts, "
1441                          "try SCTP");
1442                return -EINVAL;
1443        }
1444
1445        log_print("Using TCP for communications");
1446
1447        sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1448        if (sock) {
1449                add_sock(sock, con);
1450                result = 0;
1451        }
1452        else {
1453                result = -EADDRINUSE;
1454        }
1455
1456        return result;
1457}
1458
1459
1460
1461static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1462                                                     gfp_t allocation)
1463{
1464        struct writequeue_entry *entry;
1465
1466        entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1467        if (!entry)
1468                return NULL;
1469
1470        entry->page = alloc_page(allocation);
1471        if (!entry->page) {
1472                kfree(entry);
1473                return NULL;
1474        }
1475
1476        entry->offset = 0;
1477        entry->len = 0;
1478        entry->end = 0;
1479        entry->users = 0;
1480        entry->con = con;
1481
1482        return entry;
1483}
1484
1485void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1486{
1487        struct connection *con;
1488        struct writequeue_entry *e;
1489        int offset = 0;
1490
1491        con = nodeid2con(nodeid, allocation);
1492        if (!con)
1493                return NULL;
1494
1495        spin_lock(&con->writequeue_lock);
1496        e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1497        if ((&e->list == &con->writequeue) ||
1498            (PAGE_CACHE_SIZE - e->end < len)) {
1499                e = NULL;
1500        } else {
1501                offset = e->end;
1502                e->end += len;
1503                e->users++;
1504        }
1505        spin_unlock(&con->writequeue_lock);
1506
1507        if (e) {
1508        got_one:
1509                *ppc = page_address(e->page) + offset;
1510                return e;
1511        }
1512
1513        e = new_writequeue_entry(con, allocation);
1514        if (e) {
1515                spin_lock(&con->writequeue_lock);
1516                offset = e->end;
1517                e->end += len;
1518                e->users++;
1519                list_add_tail(&e->list, &con->writequeue);
1520                spin_unlock(&con->writequeue_lock);
1521                goto got_one;
1522        }
1523        return NULL;
1524}
1525
1526void dlm_lowcomms_commit_buffer(void *mh)
1527{
1528        struct writequeue_entry *e = (struct writequeue_entry *)mh;
1529        struct connection *con = e->con;
1530        int users;
1531
1532        spin_lock(&con->writequeue_lock);
1533        users = --e->users;
1534        if (users)
1535                goto out;
1536        e->len = e->end - e->offset;
1537        spin_unlock(&con->writequeue_lock);
1538
1539        if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
1540                queue_work(send_workqueue, &con->swork);
1541        }
1542        return;
1543
1544out:
1545        spin_unlock(&con->writequeue_lock);
1546        return;
1547}
1548
1549/* Send a message */
1550static void send_to_sock(struct connection *con)
1551{
1552        int ret = 0;
1553        const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1554        struct writequeue_entry *e;
1555        int len, offset;
1556        int count = 0;
1557
1558        mutex_lock(&con->sock_mutex);
1559        if (con->sock == NULL)
1560                goto out_connect;
1561
1562        spin_lock(&con->writequeue_lock);
1563        for (;;) {
1564                e = list_entry(con->writequeue.next, struct writequeue_entry,
1565                               list);
1566                if ((struct list_head *) e == &con->writequeue)
1567                        break;
1568
1569                len = e->len;
1570                offset = e->offset;
1571                BUG_ON(len == 0 && e->users == 0);
1572                spin_unlock(&con->writequeue_lock);
1573
1574                ret = 0;
1575                if (len) {
1576                        ret = kernel_sendpage(con->sock, e->page, offset, len,
1577                                              msg_flags);
1578                        if (ret == -EAGAIN || ret == 0) {
1579                                if (ret == -EAGAIN &&
1580                                    test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) &&
1581                                    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1582                                        /* Notify TCP that we're limited by the
1583                                         * application window size.
1584                                         */
1585                                        set_bit(SOCK_NOSPACE, &con->sock->flags);
1586                                        con->sock->sk->sk_write_pending++;
1587                                }
1588                                cond_resched();
1589                                goto out;
1590                        } else if (ret < 0)
1591                                goto send_error;
1592                }
1593
1594                /* Don't starve people filling buffers */
1595                if (++count >= MAX_SEND_MSG_COUNT) {
1596                        cond_resched();
1597                        count = 0;
1598                }
1599
1600                spin_lock(&con->writequeue_lock);
1601                writequeue_entry_complete(e, ret);
1602        }
1603        spin_unlock(&con->writequeue_lock);
1604out:
1605        mutex_unlock(&con->sock_mutex);
1606        return;
1607
1608send_error:
1609        mutex_unlock(&con->sock_mutex);
1610        close_connection(con, false);
1611        lowcomms_connect_sock(con);
1612        return;
1613
1614out_connect:
1615        mutex_unlock(&con->sock_mutex);
1616        if (!test_bit(CF_INIT_PENDING, &con->flags))
1617                lowcomms_connect_sock(con);
1618}
1619
1620static void clean_one_writequeue(struct connection *con)
1621{
1622        struct writequeue_entry *e, *safe;
1623
1624        spin_lock(&con->writequeue_lock);
1625        list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1626                list_del(&e->list);
1627                free_entry(e);
1628        }
1629        spin_unlock(&con->writequeue_lock);
1630}
1631
1632/* Called from recovery when it knows that a node has
1633   left the cluster */
1634int dlm_lowcomms_close(int nodeid)
1635{
1636        struct connection *con;
1637        struct dlm_node_addr *na;
1638
1639        log_print("closing connection to node %d", nodeid);
1640        con = nodeid2con(nodeid, 0);
1641        if (con) {
1642                clear_bit(CF_CONNECT_PENDING, &con->flags);
1643                clear_bit(CF_WRITE_PENDING, &con->flags);
1644                set_bit(CF_CLOSE, &con->flags);
1645                if (cancel_work_sync(&con->swork))
1646                        log_print("canceled swork for node %d", nodeid);
1647                if (cancel_work_sync(&con->rwork))
1648                        log_print("canceled rwork for node %d", nodeid);
1649                clean_one_writequeue(con);
1650                close_connection(con, true);
1651        }
1652
1653        spin_lock(&dlm_node_addrs_spin);
1654        na = find_node_addr(nodeid);
1655        if (na) {
1656                list_del(&na->list);
1657                while (na->addr_count--)
1658                        kfree(na->addr[na->addr_count]);
1659                kfree(na);
1660        }
1661        spin_unlock(&dlm_node_addrs_spin);
1662
1663        return 0;
1664}
1665
1666/* Receive workqueue function */
1667static void process_recv_sockets(struct work_struct *work)
1668{
1669        struct connection *con = container_of(work, struct connection, rwork);
1670        int err;
1671
1672        clear_bit(CF_READ_PENDING, &con->flags);
1673        do {
1674                err = con->rx_action(con);
1675        } while (!err);
1676}
1677
1678/* Send workqueue function */
1679static void process_send_sockets(struct work_struct *work)
1680{
1681        struct connection *con = container_of(work, struct connection, swork);
1682
1683        if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
1684                con->connect_action(con);
1685                set_bit(CF_WRITE_PENDING, &con->flags);
1686        }
1687        if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
1688                send_to_sock(con);
1689}
1690
1691
1692/* Discard all entries on the write queues */
1693static void clean_writequeues(void)
1694{
1695        foreach_conn(clean_one_writequeue);
1696}
1697
1698static void work_stop(void)
1699{
1700        destroy_workqueue(recv_workqueue);
1701        destroy_workqueue(send_workqueue);
1702}
1703
1704static int work_start(void)
1705{
1706        recv_workqueue = alloc_workqueue("dlm_recv",
1707                                         WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1708        if (!recv_workqueue) {
1709                log_print("can't start dlm_recv");
1710                return -ENOMEM;
1711        }
1712
1713        send_workqueue = alloc_workqueue("dlm_send",
1714                                         WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1715        if (!send_workqueue) {
1716                log_print("can't start dlm_send");
1717                destroy_workqueue(recv_workqueue);
1718                return -ENOMEM;
1719        }
1720
1721        return 0;
1722}
1723
1724static void stop_conn(struct connection *con)
1725{
1726        con->flags |= 0x0F;
1727        if (con->sock && con->sock->sk)
1728                con->sock->sk->sk_user_data = NULL;
1729}
1730
1731static void free_conn(struct connection *con)
1732{
1733        close_connection(con, true);
1734        if (con->othercon)
1735                kmem_cache_free(con_cache, con->othercon);
1736        hlist_del(&con->list);
1737        kmem_cache_free(con_cache, con);
1738}
1739
1740void dlm_lowcomms_stop(void)
1741{
1742        /* Set all the flags to prevent any
1743           socket activity.
1744        */
1745        mutex_lock(&connections_lock);
1746        dlm_allow_conn = 0;
1747        foreach_conn(stop_conn);
1748        mutex_unlock(&connections_lock);
1749
1750        work_stop();
1751
1752        mutex_lock(&connections_lock);
1753        clean_writequeues();
1754
1755        foreach_conn(free_conn);
1756
1757        mutex_unlock(&connections_lock);
1758        kmem_cache_destroy(con_cache);
1759}
1760
1761int dlm_lowcomms_start(void)
1762{
1763        int error = -EINVAL;
1764        struct connection *con;
1765        int i;
1766
1767        for (i = 0; i < CONN_HASH_SIZE; i++)
1768                INIT_HLIST_HEAD(&connection_hash[i]);
1769
1770        init_local();
1771        if (!dlm_local_count) {
1772                error = -ENOTCONN;
1773                log_print("no local IP address has been set");
1774                goto fail;
1775        }
1776
1777        error = -ENOMEM;
1778        con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1779                                      __alignof__(struct connection), 0,
1780                                      NULL);
1781        if (!con_cache)
1782                goto fail;
1783
1784        error = work_start();
1785        if (error)
1786                goto fail_destroy;
1787
1788        dlm_allow_conn = 1;
1789
1790        /* Start listening */
1791        if (dlm_config.ci_protocol == 0)
1792                error = tcp_listen_for_all();
1793        else
1794                error = sctp_listen_for_all();
1795        if (error)
1796                goto fail_unlisten;
1797
1798        return 0;
1799
1800fail_unlisten:
1801        dlm_allow_conn = 0;
1802        con = nodeid2con(0,0);
1803        if (con) {
1804                close_connection(con, false);
1805                kmem_cache_free(con_cache, con);
1806        }
1807fail_destroy:
1808        kmem_cache_destroy(con_cache);
1809fail:
1810        return error;
1811}
1812
1813void dlm_lowcomms_exit(void)
1814{
1815        struct dlm_node_addr *na, *safe;
1816
1817        spin_lock(&dlm_node_addrs_spin);
1818        list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1819                list_del(&na->list);
1820                while (na->addr_count--)
1821                        kfree(na->addr[na->addr_count]);
1822                kfree(na);
1823        }
1824        spin_unlock(&dlm_node_addrs_spin);
1825}
1826