linux/fs/dlm/lowcomms.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14/*
  15 * lowcomms.c
  16 *
  17 * This is the "low-level" comms layer.
  18 *
  19 * It is responsible for sending/receiving messages
  20 * from other nodes in the cluster.
  21 *
  22 * Cluster nodes are referred to by their nodeids. nodeids are
  23 * simply 32 bit numbers to the locking module - if they need to
  24 * be expanded for the cluster infrastructure then that is its
  25 * responsibility. It is this layer's
  26 * responsibility to resolve these into IP address or
  27 * whatever it needs for inter-node communication.
  28 *
  29 * The comms level is two kernel threads that deal mainly with
  30 * the receiving of messages from other nodes and passing them
  31 * up to the mid-level comms layer (which understands the
  32 * message format) for execution by the locking core, and
  33 * a send thread which does all the setting up of connections
  34 * to remote nodes and the sending of data. Threads are not allowed
  35 * to send their own data because it may cause them to wait in times
  36 * of high load. Also, this way, the sending thread can collect together
  37 * messages bound for one node and send them in one block.
  38 *
  39 * lowcomms will choose to use either TCP or SCTP as its transport layer
  40 * depending on the configuration variable 'protocol'. This should be set
  41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
  42 * cluster-wide mechanism as it must be the same on all nodes of the cluster
  43 * for the DLM to function.
  44 *
  45 */
  46
  47#include <asm/ioctls.h>
  48#include <net/sock.h>
  49#include <net/tcp.h>
  50#include <linux/pagemap.h>
  51#include <linux/file.h>
  52#include <linux/mutex.h>
  53#include <linux/sctp.h>
  54#include <linux/slab.h>
  55#include <net/sctp/sctp.h>
  56#include <net/ipv6.h>
  57
  58#include "dlm_internal.h"
  59#include "lowcomms.h"
  60#include "midcomms.h"
  61#include "config.h"
  62
  63#define NEEDED_RMEM (4*1024*1024)
  64#define CONN_HASH_SIZE 32
  65
  66/* Number of messages to send before rescheduling */
  67#define MAX_SEND_MSG_COUNT 25
  68
  69struct cbuf {
  70        unsigned int base;
  71        unsigned int len;
  72        unsigned int mask;
  73};
  74
  75static void cbuf_add(struct cbuf *cb, int n)
  76{
  77        cb->len += n;
  78}
  79
  80static int cbuf_data(struct cbuf *cb)
  81{
  82        return ((cb->base + cb->len) & cb->mask);
  83}
  84
  85static void cbuf_init(struct cbuf *cb, int size)
  86{
  87        cb->base = cb->len = 0;
  88        cb->mask = size-1;
  89}
  90
  91static void cbuf_eat(struct cbuf *cb, int n)
  92{
  93        cb->len  -= n;
  94        cb->base += n;
  95        cb->base &= cb->mask;
  96}
  97
  98static bool cbuf_empty(struct cbuf *cb)
  99{
 100        return cb->len == 0;
 101}
 102
 103struct connection {
 104        struct socket *sock;    /* NULL if not connected */
 105        uint32_t nodeid;        /* So we know who we are in the list */
 106        struct mutex sock_mutex;
 107        unsigned long flags;
 108#define CF_READ_PENDING 1
 109#define CF_WRITE_PENDING 2
 110#define CF_CONNECT_PENDING 3
 111#define CF_INIT_PENDING 4
 112#define CF_IS_OTHERCON 5
 113#define CF_CLOSE 6
 114#define CF_APP_LIMITED 7
 115        struct list_head writequeue;  /* List of outgoing writequeue_entries */
 116        spinlock_t writequeue_lock;
 117        int (*rx_action) (struct connection *); /* What to do when active */
 118        void (*connect_action) (struct connection *);   /* What to do to connect */
 119        struct page *rx_page;
 120        struct cbuf cb;
 121        int retries;
 122#define MAX_CONNECT_RETRIES 3
 123        struct hlist_node list;
 124        struct connection *othercon;
 125        struct work_struct rwork; /* Receive workqueue */
 126        struct work_struct swork; /* Send workqueue */
 127        void (*orig_error_report)(struct sock *);
 128        void (*orig_data_ready)(struct sock *);
 129        void (*orig_state_change)(struct sock *);
 130        void (*orig_write_space)(struct sock *);
 131};
 132#define sock2con(x) ((struct connection *)(x)->sk_user_data)
 133
 134/* An entry waiting to be sent */
 135struct writequeue_entry {
 136        struct list_head list;
 137        struct page *page;
 138        int offset;
 139        int len;
 140        int end;
 141        int users;
 142        struct connection *con;
 143};
 144
 145struct dlm_node_addr {
 146        struct list_head list;
 147        int nodeid;
 148        int addr_count;
 149        int curr_addr_index;
 150        struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
 151};
 152
 153static LIST_HEAD(dlm_node_addrs);
 154static DEFINE_SPINLOCK(dlm_node_addrs_spin);
 155
 156static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
 157static int dlm_local_count;
 158static int dlm_allow_conn;
 159
 160/* Work queues */
 161static struct workqueue_struct *recv_workqueue;
 162static struct workqueue_struct *send_workqueue;
 163
 164static struct hlist_head connection_hash[CONN_HASH_SIZE];
 165static DEFINE_MUTEX(connections_lock);
 166static struct kmem_cache *con_cache;
 167
 168static void process_recv_sockets(struct work_struct *work);
 169static void process_send_sockets(struct work_struct *work);
 170
 171
 172/* This is deliberately very simple because most clusters have simple
 173   sequential nodeids, so we should be able to go straight to a connection
 174   struct in the array */
 175static inline int nodeid_hash(int nodeid)
 176{
 177        return nodeid & (CONN_HASH_SIZE-1);
 178}
 179
 180static struct connection *__find_con(int nodeid)
 181{
 182        int r;
 183        struct connection *con;
 184
 185        r = nodeid_hash(nodeid);
 186
 187        hlist_for_each_entry(con, &connection_hash[r], list) {
 188                if (con->nodeid == nodeid)
 189                        return con;
 190        }
 191        return NULL;
 192}
 193
 194/*
 195 * If 'allocation' is zero then we don't attempt to create a new
 196 * connection structure for this node.
 197 */
 198static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
 199{
 200        struct connection *con = NULL;
 201        int r;
 202
 203        con = __find_con(nodeid);
 204        if (con || !alloc)
 205                return con;
 206
 207        con = kmem_cache_zalloc(con_cache, alloc);
 208        if (!con)
 209                return NULL;
 210
 211        r = nodeid_hash(nodeid);
 212        hlist_add_head(&con->list, &connection_hash[r]);
 213
 214        con->nodeid = nodeid;
 215        mutex_init(&con->sock_mutex);
 216        INIT_LIST_HEAD(&con->writequeue);
 217        spin_lock_init(&con->writequeue_lock);
 218        INIT_WORK(&con->swork, process_send_sockets);
 219        INIT_WORK(&con->rwork, process_recv_sockets);
 220
 221        /* Setup action pointers for child sockets */
 222        if (con->nodeid) {
 223                struct connection *zerocon = __find_con(0);
 224
 225                con->connect_action = zerocon->connect_action;
 226                if (!con->rx_action)
 227                        con->rx_action = zerocon->rx_action;
 228        }
 229
 230        return con;
 231}
 232
 233/* Loop round all connections */
 234static void foreach_conn(void (*conn_func)(struct connection *c))
 235{
 236        int i;
 237        struct hlist_node *n;
 238        struct connection *con;
 239
 240        for (i = 0; i < CONN_HASH_SIZE; i++) {
 241                hlist_for_each_entry_safe(con, n, &connection_hash[i], list)
 242                        conn_func(con);
 243        }
 244}
 245
 246static struct connection *nodeid2con(int nodeid, gfp_t allocation)
 247{
 248        struct connection *con;
 249
 250        mutex_lock(&connections_lock);
 251        con = __nodeid2con(nodeid, allocation);
 252        mutex_unlock(&connections_lock);
 253
 254        return con;
 255}
 256
 257static struct dlm_node_addr *find_node_addr(int nodeid)
 258{
 259        struct dlm_node_addr *na;
 260
 261        list_for_each_entry(na, &dlm_node_addrs, list) {
 262                if (na->nodeid == nodeid)
 263                        return na;
 264        }
 265        return NULL;
 266}
 267
 268static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
 269{
 270        switch (x->ss_family) {
 271        case AF_INET: {
 272                struct sockaddr_in *sinx = (struct sockaddr_in *)x;
 273                struct sockaddr_in *siny = (struct sockaddr_in *)y;
 274                if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
 275                        return 0;
 276                if (sinx->sin_port != siny->sin_port)
 277                        return 0;
 278                break;
 279        }
 280        case AF_INET6: {
 281                struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
 282                struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
 283                if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
 284                        return 0;
 285                if (sinx->sin6_port != siny->sin6_port)
 286                        return 0;
 287                break;
 288        }
 289        default:
 290                return 0;
 291        }
 292        return 1;
 293}
 294
 295static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
 296                          struct sockaddr *sa_out, bool try_new_addr)
 297{
 298        struct sockaddr_storage sas;
 299        struct dlm_node_addr *na;
 300
 301        if (!dlm_local_count)
 302                return -1;
 303
 304        spin_lock(&dlm_node_addrs_spin);
 305        na = find_node_addr(nodeid);
 306        if (na && na->addr_count) {
 307                memcpy(&sas, na->addr[na->curr_addr_index],
 308                       sizeof(struct sockaddr_storage));
 309
 310                if (try_new_addr) {
 311                        na->curr_addr_index++;
 312                        if (na->curr_addr_index == na->addr_count)
 313                                na->curr_addr_index = 0;
 314                }
 315        }
 316        spin_unlock(&dlm_node_addrs_spin);
 317
 318        if (!na)
 319                return -EEXIST;
 320
 321        if (!na->addr_count)
 322                return -ENOENT;
 323
 324        if (sas_out)
 325                memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
 326
 327        if (!sa_out)
 328                return 0;
 329
 330        if (dlm_local_addr[0]->ss_family == AF_INET) {
 331                struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
 332                struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
 333                ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
 334        } else {
 335                struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
 336                struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
 337                ret6->sin6_addr = in6->sin6_addr;
 338        }
 339
 340        return 0;
 341}
 342
 343static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
 344{
 345        struct dlm_node_addr *na;
 346        int rv = -EEXIST;
 347        int addr_i;
 348
 349        spin_lock(&dlm_node_addrs_spin);
 350        list_for_each_entry(na, &dlm_node_addrs, list) {
 351                if (!na->addr_count)
 352                        continue;
 353
 354                for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
 355                        if (addr_compare(na->addr[addr_i], addr)) {
 356                                *nodeid = na->nodeid;
 357                                rv = 0;
 358                                goto unlock;
 359                        }
 360                }
 361        }
 362unlock:
 363        spin_unlock(&dlm_node_addrs_spin);
 364        return rv;
 365}
 366
 367int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
 368{
 369        struct sockaddr_storage *new_addr;
 370        struct dlm_node_addr *new_node, *na;
 371
 372        new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
 373        if (!new_node)
 374                return -ENOMEM;
 375
 376        new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
 377        if (!new_addr) {
 378                kfree(new_node);
 379                return -ENOMEM;
 380        }
 381
 382        memcpy(new_addr, addr, len);
 383
 384        spin_lock(&dlm_node_addrs_spin);
 385        na = find_node_addr(nodeid);
 386        if (!na) {
 387                new_node->nodeid = nodeid;
 388                new_node->addr[0] = new_addr;
 389                new_node->addr_count = 1;
 390                list_add(&new_node->list, &dlm_node_addrs);
 391                spin_unlock(&dlm_node_addrs_spin);
 392                return 0;
 393        }
 394
 395        if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
 396                spin_unlock(&dlm_node_addrs_spin);
 397                kfree(new_addr);
 398                kfree(new_node);
 399                return -ENOSPC;
 400        }
 401
 402        na->addr[na->addr_count++] = new_addr;
 403        spin_unlock(&dlm_node_addrs_spin);
 404        kfree(new_node);
 405        return 0;
 406}
 407
 408/* Data available on socket or listen socket received a connect */
 409static void lowcomms_data_ready(struct sock *sk)
 410{
 411        struct connection *con = sock2con(sk);
 412        if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
 413                queue_work(recv_workqueue, &con->rwork);
 414}
 415
 416static void lowcomms_write_space(struct sock *sk)
 417{
 418        struct connection *con = sock2con(sk);
 419
 420        if (!con)
 421                return;
 422
 423        clear_bit(SOCK_NOSPACE, &con->sock->flags);
 424
 425        if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
 426                con->sock->sk->sk_write_pending--;
 427                clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
 428        }
 429
 430        if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
 431                queue_work(send_workqueue, &con->swork);
 432}
 433
 434static inline void lowcomms_connect_sock(struct connection *con)
 435{
 436        if (test_bit(CF_CLOSE, &con->flags))
 437                return;
 438        if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
 439                queue_work(send_workqueue, &con->swork);
 440}
 441
 442static void lowcomms_state_change(struct sock *sk)
 443{
 444        /* SCTP layer is not calling sk_data_ready when the connection
 445         * is done, so we catch the signal through here. Also, it
 446         * doesn't switch socket state when entering shutdown, so we
 447         * skip the write in that case.
 448         */
 449        if (sk->sk_shutdown) {
 450                if (sk->sk_shutdown == RCV_SHUTDOWN)
 451                        lowcomms_data_ready(sk);
 452        } else if (sk->sk_state == TCP_ESTABLISHED) {
 453                lowcomms_write_space(sk);
 454        }
 455}
 456
 457int dlm_lowcomms_connect_node(int nodeid)
 458{
 459        struct connection *con;
 460
 461        if (nodeid == dlm_our_nodeid())
 462                return 0;
 463
 464        con = nodeid2con(nodeid, GFP_NOFS);
 465        if (!con)
 466                return -ENOMEM;
 467        lowcomms_connect_sock(con);
 468        return 0;
 469}
 470
 471static void lowcomms_error_report(struct sock *sk)
 472{
 473        struct connection *con;
 474        struct sockaddr_storage saddr;
 475        int buflen;
 476        void (*orig_report)(struct sock *) = NULL;
 477
 478        read_lock_bh(&sk->sk_callback_lock);
 479        con = sock2con(sk);
 480        if (con == NULL)
 481                goto out;
 482
 483        orig_report = con->orig_error_report;
 484        if (con->sock == NULL ||
 485            kernel_getpeername(con->sock, (struct sockaddr *)&saddr, &buflen)) {
 486                printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
 487                                   "sending to node %d, port %d, "
 488                                   "sk_err=%d/%d\n", dlm_our_nodeid(),
 489                                   con->nodeid, dlm_config.ci_tcp_port,
 490                                   sk->sk_err, sk->sk_err_soft);
 491        } else if (saddr.ss_family == AF_INET) {
 492                struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr;
 493
 494                printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
 495                                   "sending to node %d at %pI4, port %d, "
 496                                   "sk_err=%d/%d\n", dlm_our_nodeid(),
 497                                   con->nodeid, &sin4->sin_addr.s_addr,
 498                                   dlm_config.ci_tcp_port, sk->sk_err,
 499                                   sk->sk_err_soft);
 500        } else {
 501                struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr;
 502
 503                printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
 504                                   "sending to node %d at %u.%u.%u.%u, "
 505                                   "port %d, sk_err=%d/%d\n", dlm_our_nodeid(),
 506                                   con->nodeid, sin6->sin6_addr.s6_addr32[0],
 507                                   sin6->sin6_addr.s6_addr32[1],
 508                                   sin6->sin6_addr.s6_addr32[2],
 509                                   sin6->sin6_addr.s6_addr32[3],
 510                                   dlm_config.ci_tcp_port, sk->sk_err,
 511                                   sk->sk_err_soft);
 512        }
 513out:
 514        read_unlock_bh(&sk->sk_callback_lock);
 515        if (orig_report)
 516                orig_report(sk);
 517}
 518
 519/* Note: sk_callback_lock must be locked before calling this function. */
 520static void save_callbacks(struct connection *con, struct sock *sk)
 521{
 522        lock_sock(sk);
 523        con->orig_data_ready = sk->sk_data_ready;
 524        con->orig_state_change = sk->sk_state_change;
 525        con->orig_write_space = sk->sk_write_space;
 526        con->orig_error_report = sk->sk_error_report;
 527        release_sock(sk);
 528}
 529
 530static void restore_callbacks(struct connection *con, struct sock *sk)
 531{
 532        write_lock_bh(&sk->sk_callback_lock);
 533        lock_sock(sk);
 534        sk->sk_user_data = NULL;
 535        sk->sk_data_ready = con->orig_data_ready;
 536        sk->sk_state_change = con->orig_state_change;
 537        sk->sk_write_space = con->orig_write_space;
 538        sk->sk_error_report = con->orig_error_report;
 539        release_sock(sk);
 540        write_unlock_bh(&sk->sk_callback_lock);
 541}
 542
 543/* Make a socket active */
 544static void add_sock(struct socket *sock, struct connection *con)
 545{
 546        struct sock *sk = sock->sk;
 547
 548        write_lock_bh(&sk->sk_callback_lock);
 549        con->sock = sock;
 550
 551        sk->sk_user_data = con;
 552        if (!test_bit(CF_IS_OTHERCON, &con->flags))
 553                save_callbacks(con, sk);
 554        /* Install a data_ready callback */
 555        sk->sk_data_ready = lowcomms_data_ready;
 556        sk->sk_write_space = lowcomms_write_space;
 557        sk->sk_state_change = lowcomms_state_change;
 558        sk->sk_allocation = GFP_NOFS;
 559        sk->sk_error_report = lowcomms_error_report;
 560        write_unlock_bh(&sk->sk_callback_lock);
 561}
 562
 563/* Add the port number to an IPv6 or 4 sockaddr and return the address
 564   length */
 565static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
 566                          int *addr_len)
 567{
 568        saddr->ss_family =  dlm_local_addr[0]->ss_family;
 569        if (saddr->ss_family == AF_INET) {
 570                struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
 571                in4_addr->sin_port = cpu_to_be16(port);
 572                *addr_len = sizeof(struct sockaddr_in);
 573                memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
 574        } else {
 575                struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
 576                in6_addr->sin6_port = cpu_to_be16(port);
 577                *addr_len = sizeof(struct sockaddr_in6);
 578        }
 579        memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
 580}
 581
 582/* Close a remote connection and tidy up */
 583static void close_connection(struct connection *con, bool and_other,
 584                             bool tx, bool rx)
 585{
 586        clear_bit(CF_CONNECT_PENDING, &con->flags);
 587        clear_bit(CF_WRITE_PENDING, &con->flags);
 588        if (tx && cancel_work_sync(&con->swork))
 589                log_print("canceled swork for node %d", con->nodeid);
 590        if (rx && cancel_work_sync(&con->rwork))
 591                log_print("canceled rwork for node %d", con->nodeid);
 592
 593        mutex_lock(&con->sock_mutex);
 594        if (con->sock) {
 595                if (!test_bit(CF_IS_OTHERCON, &con->flags))
 596                        restore_callbacks(con, con->sock->sk);
 597                sock_release(con->sock);
 598                con->sock = NULL;
 599        }
 600        if (con->othercon && and_other) {
 601                /* Will only re-enter once. */
 602                close_connection(con->othercon, false, true, true);
 603        }
 604        if (con->rx_page) {
 605                __free_page(con->rx_page);
 606                con->rx_page = NULL;
 607        }
 608
 609        con->retries = 0;
 610        mutex_unlock(&con->sock_mutex);
 611}
 612
 613/* Data received from remote end */
 614static int receive_from_sock(struct connection *con)
 615{
 616        int ret = 0;
 617        struct msghdr msg = {};
 618        struct kvec iov[2];
 619        unsigned len;
 620        int r;
 621        int call_again_soon = 0;
 622        int nvec;
 623
 624        mutex_lock(&con->sock_mutex);
 625
 626        if (con->sock == NULL) {
 627                ret = -EAGAIN;
 628                goto out_close;
 629        }
 630        if (con->nodeid == 0) {
 631                ret = -EINVAL;
 632                goto out_close;
 633        }
 634
 635        if (con->rx_page == NULL) {
 636                /*
 637                 * This doesn't need to be atomic, but I think it should
 638                 * improve performance if it is.
 639                 */
 640                con->rx_page = alloc_page(GFP_ATOMIC);
 641                if (con->rx_page == NULL)
 642                        goto out_resched;
 643                cbuf_init(&con->cb, PAGE_SIZE);
 644        }
 645
 646        /*
 647         * iov[0] is the bit of the circular buffer between the current end
 648         * point (cb.base + cb.len) and the end of the buffer.
 649         */
 650        iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
 651        iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
 652        iov[1].iov_len = 0;
 653        nvec = 1;
 654
 655        /*
 656         * iov[1] is the bit of the circular buffer between the start of the
 657         * buffer and the start of the currently used section (cb.base)
 658         */
 659        if (cbuf_data(&con->cb) >= con->cb.base) {
 660                iov[0].iov_len = PAGE_SIZE - cbuf_data(&con->cb);
 661                iov[1].iov_len = con->cb.base;
 662                iov[1].iov_base = page_address(con->rx_page);
 663                nvec = 2;
 664        }
 665        len = iov[0].iov_len + iov[1].iov_len;
 666
 667        r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
 668                               MSG_DONTWAIT | MSG_NOSIGNAL);
 669        if (ret <= 0)
 670                goto out_close;
 671        else if (ret == len)
 672                call_again_soon = 1;
 673
 674        cbuf_add(&con->cb, ret);
 675        ret = dlm_process_incoming_buffer(con->nodeid,
 676                                          page_address(con->rx_page),
 677                                          con->cb.base, con->cb.len,
 678                                          PAGE_SIZE);
 679        if (ret == -EBADMSG) {
 680                log_print("lowcomms: addr=%p, base=%u, len=%u, read=%d",
 681                          page_address(con->rx_page), con->cb.base,
 682                          con->cb.len, r);
 683        }
 684        if (ret < 0)
 685                goto out_close;
 686        cbuf_eat(&con->cb, ret);
 687
 688        if (cbuf_empty(&con->cb) && !call_again_soon) {
 689                __free_page(con->rx_page);
 690                con->rx_page = NULL;
 691        }
 692
 693        if (call_again_soon)
 694                goto out_resched;
 695        mutex_unlock(&con->sock_mutex);
 696        return 0;
 697
 698out_resched:
 699        if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
 700                queue_work(recv_workqueue, &con->rwork);
 701        mutex_unlock(&con->sock_mutex);
 702        return -EAGAIN;
 703
 704out_close:
 705        mutex_unlock(&con->sock_mutex);
 706        if (ret != -EAGAIN) {
 707                close_connection(con, false, true, false);
 708                /* Reconnect when there is something to send */
 709        }
 710        /* Don't return success if we really got EOF */
 711        if (ret == 0)
 712                ret = -EAGAIN;
 713
 714        return ret;
 715}
 716
 717/* Listening socket is busy, accept a connection */
 718static int tcp_accept_from_sock(struct connection *con)
 719{
 720        int result;
 721        struct sockaddr_storage peeraddr;
 722        struct socket *newsock;
 723        int len;
 724        int nodeid;
 725        struct connection *newcon;
 726        struct connection *addcon;
 727
 728        mutex_lock(&connections_lock);
 729        if (!dlm_allow_conn) {
 730                mutex_unlock(&connections_lock);
 731                return -1;
 732        }
 733        mutex_unlock(&connections_lock);
 734
 735        memset(&peeraddr, 0, sizeof(peeraddr));
 736        result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
 737                                  SOCK_STREAM, IPPROTO_TCP, &newsock);
 738        if (result < 0)
 739                return -ENOMEM;
 740
 741        mutex_lock_nested(&con->sock_mutex, 0);
 742
 743        result = -ENOTCONN;
 744        if (con->sock == NULL)
 745                goto accept_err;
 746
 747        newsock->type = con->sock->type;
 748        newsock->ops = con->sock->ops;
 749
 750        result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
 751        if (result < 0)
 752                goto accept_err;
 753
 754        /* Get the connected socket's peer */
 755        memset(&peeraddr, 0, sizeof(peeraddr));
 756        if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
 757                                  &len, 2)) {
 758                result = -ECONNABORTED;
 759                goto accept_err;
 760        }
 761
 762        /* Get the new node's NODEID */
 763        make_sockaddr(&peeraddr, 0, &len);
 764        if (addr_to_nodeid(&peeraddr, &nodeid)) {
 765                unsigned char *b=(unsigned char *)&peeraddr;
 766                log_print("connect from non cluster node");
 767                print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 
 768                                     b, sizeof(struct sockaddr_storage));
 769                sock_release(newsock);
 770                mutex_unlock(&con->sock_mutex);
 771                return -1;
 772        }
 773
 774        log_print("got connection from %d", nodeid);
 775
 776        /*  Check to see if we already have a connection to this node. This
 777         *  could happen if the two nodes initiate a connection at roughly
 778         *  the same time and the connections cross on the wire.
 779         *  In this case we store the incoming one in "othercon"
 780         */
 781        newcon = nodeid2con(nodeid, GFP_NOFS);
 782        if (!newcon) {
 783                result = -ENOMEM;
 784                goto accept_err;
 785        }
 786        mutex_lock_nested(&newcon->sock_mutex, 1);
 787        if (newcon->sock) {
 788                struct connection *othercon = newcon->othercon;
 789
 790                if (!othercon) {
 791                        othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
 792                        if (!othercon) {
 793                                log_print("failed to allocate incoming socket");
 794                                mutex_unlock(&newcon->sock_mutex);
 795                                result = -ENOMEM;
 796                                goto accept_err;
 797                        }
 798                        othercon->nodeid = nodeid;
 799                        othercon->rx_action = receive_from_sock;
 800                        mutex_init(&othercon->sock_mutex);
 801                        INIT_WORK(&othercon->swork, process_send_sockets);
 802                        INIT_WORK(&othercon->rwork, process_recv_sockets);
 803                        set_bit(CF_IS_OTHERCON, &othercon->flags);
 804                }
 805                if (!othercon->sock) {
 806                        newcon->othercon = othercon;
 807                        othercon->sock = newsock;
 808                        newsock->sk->sk_user_data = othercon;
 809                        add_sock(newsock, othercon);
 810                        addcon = othercon;
 811                }
 812                else {
 813                        printk("Extra connection from node %d attempted\n", nodeid);
 814                        result = -EAGAIN;
 815                        mutex_unlock(&newcon->sock_mutex);
 816                        goto accept_err;
 817                }
 818        }
 819        else {
 820                newsock->sk->sk_user_data = newcon;
 821                newcon->rx_action = receive_from_sock;
 822                add_sock(newsock, newcon);
 823                addcon = newcon;
 824        }
 825
 826        mutex_unlock(&newcon->sock_mutex);
 827
 828        /*
 829         * Add it to the active queue in case we got data
 830         * between processing the accept adding the socket
 831         * to the read_sockets list
 832         */
 833        if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
 834                queue_work(recv_workqueue, &addcon->rwork);
 835        mutex_unlock(&con->sock_mutex);
 836
 837        return 0;
 838
 839accept_err:
 840        mutex_unlock(&con->sock_mutex);
 841        sock_release(newsock);
 842
 843        if (result != -EAGAIN)
 844                log_print("error accepting connection from node: %d", result);
 845        return result;
 846}
 847
 848static int sctp_accept_from_sock(struct connection *con)
 849{
 850        /* Check that the new node is in the lockspace */
 851        struct sctp_prim prim;
 852        int nodeid;
 853        int prim_len, ret;
 854        int addr_len;
 855        struct connection *newcon;
 856        struct connection *addcon;
 857        struct socket *newsock;
 858
 859        mutex_lock(&connections_lock);
 860        if (!dlm_allow_conn) {
 861                mutex_unlock(&connections_lock);
 862                return -1;
 863        }
 864        mutex_unlock(&connections_lock);
 865
 866        mutex_lock_nested(&con->sock_mutex, 0);
 867
 868        ret = kernel_accept(con->sock, &newsock, O_NONBLOCK);
 869        if (ret < 0)
 870                goto accept_err;
 871
 872        memset(&prim, 0, sizeof(struct sctp_prim));
 873        prim_len = sizeof(struct sctp_prim);
 874
 875        ret = kernel_getsockopt(newsock, IPPROTO_SCTP, SCTP_PRIMARY_ADDR,
 876                                (char *)&prim, &prim_len);
 877        if (ret < 0) {
 878                log_print("getsockopt/sctp_primary_addr failed: %d", ret);
 879                goto accept_err;
 880        }
 881
 882        make_sockaddr(&prim.ssp_addr, 0, &addr_len);
 883        if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
 884                unsigned char *b = (unsigned char *)&prim.ssp_addr;
 885
 886                log_print("reject connect from unknown addr");
 887                print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
 888                                     b, sizeof(struct sockaddr_storage));
 889                goto accept_err;
 890        }
 891
 892        newcon = nodeid2con(nodeid, GFP_NOFS);
 893        if (!newcon) {
 894                ret = -ENOMEM;
 895                goto accept_err;
 896        }
 897
 898        mutex_lock_nested(&newcon->sock_mutex, 1);
 899
 900        if (newcon->sock) {
 901                struct connection *othercon = newcon->othercon;
 902
 903                if (!othercon) {
 904                        othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
 905                        if (!othercon) {
 906                                log_print("failed to allocate incoming socket");
 907                                mutex_unlock(&newcon->sock_mutex);
 908                                ret = -ENOMEM;
 909                                goto accept_err;
 910                        }
 911                        othercon->nodeid = nodeid;
 912                        othercon->rx_action = receive_from_sock;
 913                        mutex_init(&othercon->sock_mutex);
 914                        INIT_WORK(&othercon->swork, process_send_sockets);
 915                        INIT_WORK(&othercon->rwork, process_recv_sockets);
 916                        set_bit(CF_IS_OTHERCON, &othercon->flags);
 917                }
 918                if (!othercon->sock) {
 919                        newcon->othercon = othercon;
 920                        othercon->sock = newsock;
 921                        newsock->sk->sk_user_data = othercon;
 922                        add_sock(newsock, othercon);
 923                        addcon = othercon;
 924                } else {
 925                        printk("Extra connection from node %d attempted\n", nodeid);
 926                        ret = -EAGAIN;
 927                        mutex_unlock(&newcon->sock_mutex);
 928                        goto accept_err;
 929                }
 930        } else {
 931                newsock->sk->sk_user_data = newcon;
 932                newcon->rx_action = receive_from_sock;
 933                add_sock(newsock, newcon);
 934                addcon = newcon;
 935        }
 936
 937        log_print("connected to %d", nodeid);
 938
 939        mutex_unlock(&newcon->sock_mutex);
 940
 941        /*
 942         * Add it to the active queue in case we got data
 943         * between processing the accept adding the socket
 944         * to the read_sockets list
 945         */
 946        if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
 947                queue_work(recv_workqueue, &addcon->rwork);
 948        mutex_unlock(&con->sock_mutex);
 949
 950        return 0;
 951
 952accept_err:
 953        mutex_unlock(&con->sock_mutex);
 954        if (newsock)
 955                sock_release(newsock);
 956        if (ret != -EAGAIN)
 957                log_print("error accepting connection from node: %d", ret);
 958
 959        return ret;
 960}
 961
 962static void free_entry(struct writequeue_entry *e)
 963{
 964        __free_page(e->page);
 965        kfree(e);
 966}
 967
 968/*
 969 * writequeue_entry_complete - try to delete and free write queue entry
 970 * @e: write queue entry to try to delete
 971 * @completed: bytes completed
 972 *
 973 * writequeue_lock must be held.
 974 */
 975static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
 976{
 977        e->offset += completed;
 978        e->len -= completed;
 979
 980        if (e->len == 0 && e->users == 0) {
 981                list_del(&e->list);
 982                free_entry(e);
 983        }
 984}
 985
 986/*
 987 * sctp_bind_addrs - bind a SCTP socket to all our addresses
 988 */
 989static int sctp_bind_addrs(struct connection *con, uint16_t port)
 990{
 991        struct sockaddr_storage localaddr;
 992        int i, addr_len, result = 0;
 993
 994        for (i = 0; i < dlm_local_count; i++) {
 995                memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
 996                make_sockaddr(&localaddr, port, &addr_len);
 997
 998                if (!i)
 999                        result = kernel_bind(con->sock,
1000                                             (struct sockaddr *)&localaddr,
1001                                             addr_len);
1002                else
1003                        result = kernel_setsockopt(con->sock, SOL_SCTP,
1004                                                   SCTP_SOCKOPT_BINDX_ADD,
1005                                                   (char *)&localaddr, addr_len);
1006
1007                if (result < 0) {
1008                        log_print("Can't bind to %d addr number %d, %d.\n",
1009                                  port, i + 1, result);
1010                        break;
1011                }
1012        }
1013        return result;
1014}
1015
1016/* Initiate an SCTP association.
1017   This is a special case of send_to_sock() in that we don't yet have a
1018   peeled-off socket for this association, so we use the listening socket
1019   and add the primary IP address of the remote node.
1020 */
1021static void sctp_connect_to_sock(struct connection *con)
1022{
1023        struct sockaddr_storage daddr;
1024        int one = 1;
1025        int result;
1026        int addr_len;
1027        struct socket *sock;
1028
1029        if (con->nodeid == 0) {
1030                log_print("attempt to connect sock 0 foiled");
1031                return;
1032        }
1033
1034        mutex_lock(&con->sock_mutex);
1035
1036        /* Some odd races can cause double-connects, ignore them */
1037        if (con->retries++ > MAX_CONNECT_RETRIES)
1038                goto out;
1039
1040        if (con->sock) {
1041                log_print("node %d already connected.", con->nodeid);
1042                goto out;
1043        }
1044
1045        memset(&daddr, 0, sizeof(daddr));
1046        result = nodeid_to_addr(con->nodeid, &daddr, NULL, true);
1047        if (result < 0) {
1048                log_print("no address for nodeid %d", con->nodeid);
1049                goto out;
1050        }
1051
1052        /* Create a socket to communicate with */
1053        result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1054                                  SOCK_STREAM, IPPROTO_SCTP, &sock);
1055        if (result < 0)
1056                goto socket_err;
1057
1058        sock->sk->sk_user_data = con;
1059        con->rx_action = receive_from_sock;
1060        con->connect_action = sctp_connect_to_sock;
1061        add_sock(sock, con);
1062
1063        /* Bind to all addresses. */
1064        if (sctp_bind_addrs(con, 0))
1065                goto bind_err;
1066
1067        make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
1068
1069        log_print("connecting to %d", con->nodeid);
1070
1071        /* Turn off Nagle's algorithm */
1072        kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1073                          sizeof(one));
1074
1075        result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
1076                                   O_NONBLOCK);
1077        if (result == -EINPROGRESS)
1078                result = 0;
1079        if (result == 0)
1080                goto out;
1081
1082
1083bind_err:
1084        con->sock = NULL;
1085        sock_release(sock);
1086
1087socket_err:
1088        /*
1089         * Some errors are fatal and this list might need adjusting. For other
1090         * errors we try again until the max number of retries is reached.
1091         */
1092        if (result != -EHOSTUNREACH &&
1093            result != -ENETUNREACH &&
1094            result != -ENETDOWN &&
1095            result != -EINVAL &&
1096            result != -EPROTONOSUPPORT) {
1097                log_print("connect %d try %d error %d", con->nodeid,
1098                          con->retries, result);
1099                mutex_unlock(&con->sock_mutex);
1100                msleep(1000);
1101                clear_bit(CF_CONNECT_PENDING, &con->flags);
1102                lowcomms_connect_sock(con);
1103                return;
1104        }
1105
1106out:
1107        mutex_unlock(&con->sock_mutex);
1108        set_bit(CF_WRITE_PENDING, &con->flags);
1109}
1110
1111/* Connect a new socket to its peer */
1112static void tcp_connect_to_sock(struct connection *con)
1113{
1114        struct sockaddr_storage saddr, src_addr;
1115        int addr_len;
1116        struct socket *sock = NULL;
1117        int one = 1;
1118        int result;
1119
1120        if (con->nodeid == 0) {
1121                log_print("attempt to connect sock 0 foiled");
1122                return;
1123        }
1124
1125        mutex_lock(&con->sock_mutex);
1126        if (con->retries++ > MAX_CONNECT_RETRIES)
1127                goto out;
1128
1129        /* Some odd races can cause double-connects, ignore them */
1130        if (con->sock)
1131                goto out;
1132
1133        /* Create a socket to communicate with */
1134        result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1135                                  SOCK_STREAM, IPPROTO_TCP, &sock);
1136        if (result < 0)
1137                goto out_err;
1138
1139        memset(&saddr, 0, sizeof(saddr));
1140        result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
1141        if (result < 0) {
1142                log_print("no address for nodeid %d", con->nodeid);
1143                goto out_err;
1144        }
1145
1146        sock->sk->sk_user_data = con;
1147        con->rx_action = receive_from_sock;
1148        con->connect_action = tcp_connect_to_sock;
1149        add_sock(sock, con);
1150
1151        /* Bind to our cluster-known address connecting to avoid
1152           routing problems */
1153        memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1154        make_sockaddr(&src_addr, 0, &addr_len);
1155        result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1156                                 addr_len);
1157        if (result < 0) {
1158                log_print("could not bind for connect: %d", result);
1159                /* This *may* not indicate a critical error */
1160        }
1161
1162        make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
1163
1164        log_print("connecting to %d", con->nodeid);
1165
1166        /* Turn off Nagle's algorithm */
1167        kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1168                          sizeof(one));
1169
1170        result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
1171                                   O_NONBLOCK);
1172        if (result == -EINPROGRESS)
1173                result = 0;
1174        if (result == 0)
1175                goto out;
1176
1177out_err:
1178        if (con->sock) {
1179                sock_release(con->sock);
1180                con->sock = NULL;
1181        } else if (sock) {
1182                sock_release(sock);
1183        }
1184        /*
1185         * Some errors are fatal and this list might need adjusting. For other
1186         * errors we try again until the max number of retries is reached.
1187         */
1188        if (result != -EHOSTUNREACH &&
1189            result != -ENETUNREACH &&
1190            result != -ENETDOWN && 
1191            result != -EINVAL &&
1192            result != -EPROTONOSUPPORT) {
1193                log_print("connect %d try %d error %d", con->nodeid,
1194                          con->retries, result);
1195                mutex_unlock(&con->sock_mutex);
1196                msleep(1000);
1197                clear_bit(CF_CONNECT_PENDING, &con->flags);
1198                lowcomms_connect_sock(con);
1199                return;
1200        }
1201out:
1202        mutex_unlock(&con->sock_mutex);
1203        set_bit(CF_WRITE_PENDING, &con->flags);
1204        return;
1205}
1206
1207static struct socket *tcp_create_listen_sock(struct connection *con,
1208                                             struct sockaddr_storage *saddr)
1209{
1210        struct socket *sock = NULL;
1211        int result = 0;
1212        int one = 1;
1213        int addr_len;
1214
1215        if (dlm_local_addr[0]->ss_family == AF_INET)
1216                addr_len = sizeof(struct sockaddr_in);
1217        else
1218                addr_len = sizeof(struct sockaddr_in6);
1219
1220        /* Create a socket to communicate with */
1221        result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1222                                  SOCK_STREAM, IPPROTO_TCP, &sock);
1223        if (result < 0) {
1224                log_print("Can't create listening comms socket");
1225                goto create_out;
1226        }
1227
1228        /* Turn off Nagle's algorithm */
1229        kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1230                          sizeof(one));
1231
1232        result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1233                                   (char *)&one, sizeof(one));
1234
1235        if (result < 0) {
1236                log_print("Failed to set SO_REUSEADDR on socket: %d", result);
1237        }
1238        sock->sk->sk_user_data = con;
1239
1240        con->rx_action = tcp_accept_from_sock;
1241        con->connect_action = tcp_connect_to_sock;
1242
1243        /* Bind to our port */
1244        make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1245        result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1246        if (result < 0) {
1247                log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1248                sock_release(sock);
1249                sock = NULL;
1250                con->sock = NULL;
1251                goto create_out;
1252        }
1253        result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
1254                                 (char *)&one, sizeof(one));
1255        if (result < 0) {
1256                log_print("Set keepalive failed: %d", result);
1257        }
1258
1259        result = sock->ops->listen(sock, 5);
1260        if (result < 0) {
1261                log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1262                sock_release(sock);
1263                sock = NULL;
1264                goto create_out;
1265        }
1266
1267create_out:
1268        return sock;
1269}
1270
1271/* Get local addresses */
1272static void init_local(void)
1273{
1274        struct sockaddr_storage sas, *addr;
1275        int i;
1276
1277        dlm_local_count = 0;
1278        for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1279                if (dlm_our_addr(&sas, i))
1280                        break;
1281
1282                addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS);
1283                if (!addr)
1284                        break;
1285                dlm_local_addr[dlm_local_count++] = addr;
1286        }
1287}
1288
1289/* Initialise SCTP socket and bind to all interfaces */
1290static int sctp_listen_for_all(void)
1291{
1292        struct socket *sock = NULL;
1293        int result = -EINVAL;
1294        struct connection *con = nodeid2con(0, GFP_NOFS);
1295        int bufsize = NEEDED_RMEM;
1296        int one = 1;
1297
1298        if (!con)
1299                return -ENOMEM;
1300
1301        log_print("Using SCTP for communications");
1302
1303        result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1304                                  SOCK_STREAM, IPPROTO_SCTP, &sock);
1305        if (result < 0) {
1306                log_print("Can't create comms socket, check SCTP is loaded");
1307                goto out;
1308        }
1309
1310        result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1311                                 (char *)&bufsize, sizeof(bufsize));
1312        if (result)
1313                log_print("Error increasing buffer space on socket %d", result);
1314
1315        result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
1316                                   sizeof(one));
1317        if (result < 0)
1318                log_print("Could not set SCTP NODELAY error %d\n", result);
1319
1320        write_lock_bh(&sock->sk->sk_callback_lock);
1321        /* Init con struct */
1322        sock->sk->sk_user_data = con;
1323        con->sock = sock;
1324        con->sock->sk->sk_data_ready = lowcomms_data_ready;
1325        con->rx_action = sctp_accept_from_sock;
1326        con->connect_action = sctp_connect_to_sock;
1327
1328        write_unlock_bh(&sock->sk->sk_callback_lock);
1329
1330        /* Bind to all addresses. */
1331        if (sctp_bind_addrs(con, dlm_config.ci_tcp_port))
1332                goto create_delsock;
1333
1334        result = sock->ops->listen(sock, 5);
1335        if (result < 0) {
1336                log_print("Can't set socket listening");
1337                goto create_delsock;
1338        }
1339
1340        return 0;
1341
1342create_delsock:
1343        sock_release(sock);
1344        con->sock = NULL;
1345out:
1346        return result;
1347}
1348
1349static int tcp_listen_for_all(void)
1350{
1351        struct socket *sock = NULL;
1352        struct connection *con = nodeid2con(0, GFP_NOFS);
1353        int result = -EINVAL;
1354
1355        if (!con)
1356                return -ENOMEM;
1357
1358        /* We don't support multi-homed hosts */
1359        if (dlm_local_addr[1] != NULL) {
1360                log_print("TCP protocol can't handle multi-homed hosts, "
1361                          "try SCTP");
1362                return -EINVAL;
1363        }
1364
1365        log_print("Using TCP for communications");
1366
1367        sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1368        if (sock) {
1369                add_sock(sock, con);
1370                result = 0;
1371        }
1372        else {
1373                result = -EADDRINUSE;
1374        }
1375
1376        return result;
1377}
1378
1379
1380
1381static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1382                                                     gfp_t allocation)
1383{
1384        struct writequeue_entry *entry;
1385
1386        entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1387        if (!entry)
1388                return NULL;
1389
1390        entry->page = alloc_page(allocation);
1391        if (!entry->page) {
1392                kfree(entry);
1393                return NULL;
1394        }
1395
1396        entry->offset = 0;
1397        entry->len = 0;
1398        entry->end = 0;
1399        entry->users = 0;
1400        entry->con = con;
1401
1402        return entry;
1403}
1404
1405void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1406{
1407        struct connection *con;
1408        struct writequeue_entry *e;
1409        int offset = 0;
1410
1411        con = nodeid2con(nodeid, allocation);
1412        if (!con)
1413                return NULL;
1414
1415        spin_lock(&con->writequeue_lock);
1416        e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1417        if ((&e->list == &con->writequeue) ||
1418            (PAGE_SIZE - e->end < len)) {
1419                e = NULL;
1420        } else {
1421                offset = e->end;
1422                e->end += len;
1423                e->users++;
1424        }
1425        spin_unlock(&con->writequeue_lock);
1426
1427        if (e) {
1428        got_one:
1429                *ppc = page_address(e->page) + offset;
1430                return e;
1431        }
1432
1433        e = new_writequeue_entry(con, allocation);
1434        if (e) {
1435                spin_lock(&con->writequeue_lock);
1436                offset = e->end;
1437                e->end += len;
1438                e->users++;
1439                list_add_tail(&e->list, &con->writequeue);
1440                spin_unlock(&con->writequeue_lock);
1441                goto got_one;
1442        }
1443        return NULL;
1444}
1445
1446void dlm_lowcomms_commit_buffer(void *mh)
1447{
1448        struct writequeue_entry *e = (struct writequeue_entry *)mh;
1449        struct connection *con = e->con;
1450        int users;
1451
1452        spin_lock(&con->writequeue_lock);
1453        users = --e->users;
1454        if (users)
1455                goto out;
1456        e->len = e->end - e->offset;
1457        spin_unlock(&con->writequeue_lock);
1458
1459        if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
1460                queue_work(send_workqueue, &con->swork);
1461        }
1462        return;
1463
1464out:
1465        spin_unlock(&con->writequeue_lock);
1466        return;
1467}
1468
1469/* Send a message */
1470static void send_to_sock(struct connection *con)
1471{
1472        int ret = 0;
1473        const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1474        struct writequeue_entry *e;
1475        int len, offset;
1476        int count = 0;
1477
1478        mutex_lock(&con->sock_mutex);
1479        if (con->sock == NULL)
1480                goto out_connect;
1481
1482        spin_lock(&con->writequeue_lock);
1483        for (;;) {
1484                e = list_entry(con->writequeue.next, struct writequeue_entry,
1485                               list);
1486                if ((struct list_head *) e == &con->writequeue)
1487                        break;
1488
1489                len = e->len;
1490                offset = e->offset;
1491                BUG_ON(len == 0 && e->users == 0);
1492                spin_unlock(&con->writequeue_lock);
1493
1494                ret = 0;
1495                if (len) {
1496                        ret = kernel_sendpage(con->sock, e->page, offset, len,
1497                                              msg_flags);
1498                        if (ret == -EAGAIN || ret == 0) {
1499                                if (ret == -EAGAIN &&
1500                                    test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
1501                                    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1502                                        /* Notify TCP that we're limited by the
1503                                         * application window size.
1504                                         */
1505                                        set_bit(SOCK_NOSPACE, &con->sock->flags);
1506                                        con->sock->sk->sk_write_pending++;
1507                                }
1508                                cond_resched();
1509                                goto out;
1510                        } else if (ret < 0)
1511                                goto send_error;
1512                }
1513
1514                /* Don't starve people filling buffers */
1515                if (++count >= MAX_SEND_MSG_COUNT) {
1516                        cond_resched();
1517                        count = 0;
1518                }
1519
1520                spin_lock(&con->writequeue_lock);
1521                writequeue_entry_complete(e, ret);
1522        }
1523        spin_unlock(&con->writequeue_lock);
1524out:
1525        mutex_unlock(&con->sock_mutex);
1526        return;
1527
1528send_error:
1529        mutex_unlock(&con->sock_mutex);
1530        close_connection(con, false, false, true);
1531        lowcomms_connect_sock(con);
1532        return;
1533
1534out_connect:
1535        mutex_unlock(&con->sock_mutex);
1536        lowcomms_connect_sock(con);
1537}
1538
1539static void clean_one_writequeue(struct connection *con)
1540{
1541        struct writequeue_entry *e, *safe;
1542
1543        spin_lock(&con->writequeue_lock);
1544        list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1545                list_del(&e->list);
1546                free_entry(e);
1547        }
1548        spin_unlock(&con->writequeue_lock);
1549}
1550
1551/* Called from recovery when it knows that a node has
1552   left the cluster */
1553int dlm_lowcomms_close(int nodeid)
1554{
1555        struct connection *con;
1556        struct dlm_node_addr *na;
1557
1558        log_print("closing connection to node %d", nodeid);
1559        con = nodeid2con(nodeid, 0);
1560        if (con) {
1561                set_bit(CF_CLOSE, &con->flags);
1562                close_connection(con, true, true, true);
1563                clean_one_writequeue(con);
1564        }
1565
1566        spin_lock(&dlm_node_addrs_spin);
1567        na = find_node_addr(nodeid);
1568        if (na) {
1569                list_del(&na->list);
1570                while (na->addr_count--)
1571                        kfree(na->addr[na->addr_count]);
1572                kfree(na);
1573        }
1574        spin_unlock(&dlm_node_addrs_spin);
1575
1576        return 0;
1577}
1578
1579/* Receive workqueue function */
1580static void process_recv_sockets(struct work_struct *work)
1581{
1582        struct connection *con = container_of(work, struct connection, rwork);
1583        int err;
1584
1585        clear_bit(CF_READ_PENDING, &con->flags);
1586        do {
1587                err = con->rx_action(con);
1588        } while (!err);
1589}
1590
1591/* Send workqueue function */
1592static void process_send_sockets(struct work_struct *work)
1593{
1594        struct connection *con = container_of(work, struct connection, swork);
1595
1596        if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags))
1597                con->connect_action(con);
1598        if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
1599                send_to_sock(con);
1600}
1601
1602
1603/* Discard all entries on the write queues */
1604static void clean_writequeues(void)
1605{
1606        foreach_conn(clean_one_writequeue);
1607}
1608
1609static void work_stop(void)
1610{
1611        destroy_workqueue(recv_workqueue);
1612        destroy_workqueue(send_workqueue);
1613}
1614
1615static int work_start(void)
1616{
1617        recv_workqueue = alloc_workqueue("dlm_recv",
1618                                         WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1619        if (!recv_workqueue) {
1620                log_print("can't start dlm_recv");
1621                return -ENOMEM;
1622        }
1623
1624        send_workqueue = alloc_workqueue("dlm_send",
1625                                         WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1626        if (!send_workqueue) {
1627                log_print("can't start dlm_send");
1628                destroy_workqueue(recv_workqueue);
1629                return -ENOMEM;
1630        }
1631
1632        return 0;
1633}
1634
1635static void stop_conn(struct connection *con)
1636{
1637        con->flags |= 0x0F;
1638        if (con->sock && con->sock->sk)
1639                con->sock->sk->sk_user_data = NULL;
1640}
1641
1642static void free_conn(struct connection *con)
1643{
1644        close_connection(con, true, true, true);
1645        if (con->othercon)
1646                kmem_cache_free(con_cache, con->othercon);
1647        hlist_del(&con->list);
1648        kmem_cache_free(con_cache, con);
1649}
1650
1651void dlm_lowcomms_stop(void)
1652{
1653        /* Set all the flags to prevent any
1654           socket activity.
1655        */
1656        mutex_lock(&connections_lock);
1657        dlm_allow_conn = 0;
1658        foreach_conn(stop_conn);
1659        clean_writequeues();
1660        foreach_conn(free_conn);
1661        mutex_unlock(&connections_lock);
1662
1663        work_stop();
1664
1665        kmem_cache_destroy(con_cache);
1666}
1667
1668int dlm_lowcomms_start(void)
1669{
1670        int error = -EINVAL;
1671        struct connection *con;
1672        int i;
1673
1674        for (i = 0; i < CONN_HASH_SIZE; i++)
1675                INIT_HLIST_HEAD(&connection_hash[i]);
1676
1677        init_local();
1678        if (!dlm_local_count) {
1679                error = -ENOTCONN;
1680                log_print("no local IP address has been set");
1681                goto fail;
1682        }
1683
1684        error = -ENOMEM;
1685        con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1686                                      __alignof__(struct connection), 0,
1687                                      NULL);
1688        if (!con_cache)
1689                goto fail;
1690
1691        error = work_start();
1692        if (error)
1693                goto fail_destroy;
1694
1695        dlm_allow_conn = 1;
1696
1697        /* Start listening */
1698        if (dlm_config.ci_protocol == 0)
1699                error = tcp_listen_for_all();
1700        else
1701                error = sctp_listen_for_all();
1702        if (error)
1703                goto fail_unlisten;
1704
1705        return 0;
1706
1707fail_unlisten:
1708        dlm_allow_conn = 0;
1709        con = nodeid2con(0,0);
1710        if (con) {
1711                close_connection(con, false, true, true);
1712                kmem_cache_free(con_cache, con);
1713        }
1714fail_destroy:
1715        kmem_cache_destroy(con_cache);
1716fail:
1717        return error;
1718}
1719
1720void dlm_lowcomms_exit(void)
1721{
1722        struct dlm_node_addr *na, *safe;
1723
1724        spin_lock(&dlm_node_addrs_spin);
1725        list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1726                list_del(&na->list);
1727                while (na->addr_count--)
1728                        kfree(na->addr[na->addr_count]);
1729                kfree(na);
1730        }
1731        spin_unlock(&dlm_node_addrs_spin);
1732}
1733