linux/fs/dlm/lowcomms.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14/*
  15 * lowcomms.c
  16 *
  17 * This is the "low-level" comms layer.
  18 *
  19 * It is responsible for sending/receiving messages
  20 * from other nodes in the cluster.
  21 *
  22 * Cluster nodes are referred to by their nodeids. nodeids are
  23 * simply 32 bit numbers to the locking module - if they need to
  24 * be expanded for the cluster infrastructure then that is its
  25 * responsibility. It is this layer's
  26 * responsibility to resolve these into IP address or
  27 * whatever it needs for inter-node communication.
  28 *
  29 * The comms level is two kernel threads that deal mainly with
  30 * the receiving of messages from other nodes and passing them
  31 * up to the mid-level comms layer (which understands the
  32 * message format) for execution by the locking core, and
  33 * a send thread which does all the setting up of connections
  34 * to remote nodes and the sending of data. Threads are not allowed
  35 * to send their own data because it may cause them to wait in times
  36 * of high load. Also, this way, the sending thread can collect together
  37 * messages bound for one node and send them in one block.
  38 *
  39 * lowcomms will choose to use either TCP or SCTP as its transport layer
  40 * depending on the configuration variable 'protocol'. This should be set
  41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
  42 * cluster-wide mechanism as it must be the same on all nodes of the cluster
  43 * for the DLM to function.
  44 *
  45 */
  46
  47#include <asm/ioctls.h>
  48#include <net/sock.h>
  49#include <net/tcp.h>
  50#include <linux/pagemap.h>
  51#include <linux/file.h>
  52#include <linux/mutex.h>
  53#include <linux/sctp.h>
  54#include <linux/slab.h>
  55#include <net/sctp/sctp.h>
  56#include <net/ipv6.h>
  57
  58#include "dlm_internal.h"
  59#include "lowcomms.h"
  60#include "midcomms.h"
  61#include "config.h"
  62
  63#define NEEDED_RMEM (4*1024*1024)
  64#define CONN_HASH_SIZE 32
  65
  66/* Number of messages to send before rescheduling */
  67#define MAX_SEND_MSG_COUNT 25
  68
  69struct cbuf {
  70        unsigned int base;
  71        unsigned int len;
  72        unsigned int mask;
  73};
  74
  75static void cbuf_add(struct cbuf *cb, int n)
  76{
  77        cb->len += n;
  78}
  79
  80static int cbuf_data(struct cbuf *cb)
  81{
  82        return ((cb->base + cb->len) & cb->mask);
  83}
  84
  85static void cbuf_init(struct cbuf *cb, int size)
  86{
  87        cb->base = cb->len = 0;
  88        cb->mask = size-1;
  89}
  90
  91static void cbuf_eat(struct cbuf *cb, int n)
  92{
  93        cb->len  -= n;
  94        cb->base += n;
  95        cb->base &= cb->mask;
  96}
  97
  98static bool cbuf_empty(struct cbuf *cb)
  99{
 100        return cb->len == 0;
 101}
 102
 103struct connection {
 104        struct socket *sock;    /* NULL if not connected */
 105        uint32_t nodeid;        /* So we know who we are in the list */
 106        struct mutex sock_mutex;
 107        unsigned long flags;
 108#define CF_READ_PENDING 1
 109#define CF_WRITE_PENDING 2
 110#define CF_INIT_PENDING 4
 111#define CF_IS_OTHERCON 5
 112#define CF_CLOSE 6
 113#define CF_APP_LIMITED 7
 114#define CF_CLOSING 8
 115        struct list_head writequeue;  /* List of outgoing writequeue_entries */
 116        spinlock_t writequeue_lock;
 117        int (*rx_action) (struct connection *); /* What to do when active */
 118        void (*connect_action) (struct connection *);   /* What to do to connect */
 119        struct page *rx_page;
 120        struct cbuf cb;
 121        int retries;
 122#define MAX_CONNECT_RETRIES 3
 123        struct hlist_node list;
 124        struct connection *othercon;
 125        struct work_struct rwork; /* Receive workqueue */
 126        struct work_struct swork; /* Send workqueue */
 127};
 128#define sock2con(x) ((struct connection *)(x)->sk_user_data)
 129
 130/* An entry waiting to be sent */
 131struct writequeue_entry {
 132        struct list_head list;
 133        struct page *page;
 134        int offset;
 135        int len;
 136        int end;
 137        int users;
 138        struct connection *con;
 139};
 140
 141struct dlm_node_addr {
 142        struct list_head list;
 143        int nodeid;
 144        int mark;
 145        int addr_count;
 146        int curr_addr_index;
 147        struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
 148};
 149
 150static struct listen_sock_callbacks {
 151        void (*sk_error_report)(struct sock *);
 152        void (*sk_data_ready)(struct sock *);
 153        void (*sk_state_change)(struct sock *);
 154        void (*sk_write_space)(struct sock *);
 155} listen_sock;
 156
 157static LIST_HEAD(dlm_node_addrs);
 158static DEFINE_SPINLOCK(dlm_node_addrs_spin);
 159
 160static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
 161static int dlm_local_count;
 162static int dlm_allow_conn;
 163
 164/* Work queues */
 165static struct workqueue_struct *recv_workqueue;
 166static struct workqueue_struct *send_workqueue;
 167
 168static struct hlist_head connection_hash[CONN_HASH_SIZE];
 169static DEFINE_MUTEX(connections_lock);
 170static struct kmem_cache *con_cache;
 171
 172static void process_recv_sockets(struct work_struct *work);
 173static void process_send_sockets(struct work_struct *work);
 174
 175
 176/* This is deliberately very simple because most clusters have simple
 177   sequential nodeids, so we should be able to go straight to a connection
 178   struct in the array */
 179static inline int nodeid_hash(int nodeid)
 180{
 181        return nodeid & (CONN_HASH_SIZE-1);
 182}
 183
 184static struct connection *__find_con(int nodeid)
 185{
 186        int r;
 187        struct connection *con;
 188
 189        r = nodeid_hash(nodeid);
 190
 191        hlist_for_each_entry(con, &connection_hash[r], list) {
 192                if (con->nodeid == nodeid)
 193                        return con;
 194        }
 195        return NULL;
 196}
 197
 198/*
 199 * If 'allocation' is zero then we don't attempt to create a new
 200 * connection structure for this node.
 201 */
 202static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
 203{
 204        struct connection *con = NULL;
 205        int r;
 206
 207        con = __find_con(nodeid);
 208        if (con || !alloc)
 209                return con;
 210
 211        con = kmem_cache_zalloc(con_cache, alloc);
 212        if (!con)
 213                return NULL;
 214
 215        r = nodeid_hash(nodeid);
 216        hlist_add_head(&con->list, &connection_hash[r]);
 217
 218        con->nodeid = nodeid;
 219        mutex_init(&con->sock_mutex);
 220        INIT_LIST_HEAD(&con->writequeue);
 221        spin_lock_init(&con->writequeue_lock);
 222        INIT_WORK(&con->swork, process_send_sockets);
 223        INIT_WORK(&con->rwork, process_recv_sockets);
 224
 225        /* Setup action pointers for child sockets */
 226        if (con->nodeid) {
 227                struct connection *zerocon = __find_con(0);
 228
 229                con->connect_action = zerocon->connect_action;
 230                if (!con->rx_action)
 231                        con->rx_action = zerocon->rx_action;
 232        }
 233
 234        return con;
 235}
 236
 237/* Loop round all connections */
 238static void foreach_conn(void (*conn_func)(struct connection *c))
 239{
 240        int i;
 241        struct hlist_node *n;
 242        struct connection *con;
 243
 244        for (i = 0; i < CONN_HASH_SIZE; i++) {
 245                hlist_for_each_entry_safe(con, n, &connection_hash[i], list)
 246                        conn_func(con);
 247        }
 248}
 249
 250static struct connection *nodeid2con(int nodeid, gfp_t allocation)
 251{
 252        struct connection *con;
 253
 254        mutex_lock(&connections_lock);
 255        con = __nodeid2con(nodeid, allocation);
 256        mutex_unlock(&connections_lock);
 257
 258        return con;
 259}
 260
 261static struct dlm_node_addr *find_node_addr(int nodeid)
 262{
 263        struct dlm_node_addr *na;
 264
 265        list_for_each_entry(na, &dlm_node_addrs, list) {
 266                if (na->nodeid == nodeid)
 267                        return na;
 268        }
 269        return NULL;
 270}
 271
 272static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
 273{
 274        switch (x->ss_family) {
 275        case AF_INET: {
 276                struct sockaddr_in *sinx = (struct sockaddr_in *)x;
 277                struct sockaddr_in *siny = (struct sockaddr_in *)y;
 278                if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
 279                        return 0;
 280                if (sinx->sin_port != siny->sin_port)
 281                        return 0;
 282                break;
 283        }
 284        case AF_INET6: {
 285                struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
 286                struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
 287                if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
 288                        return 0;
 289                if (sinx->sin6_port != siny->sin6_port)
 290                        return 0;
 291                break;
 292        }
 293        default:
 294                return 0;
 295        }
 296        return 1;
 297}
 298
 299static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
 300                          struct sockaddr *sa_out, bool try_new_addr,
 301                          unsigned int *mark)
 302{
 303        struct sockaddr_storage sas;
 304        struct dlm_node_addr *na;
 305
 306        if (!dlm_local_count)
 307                return -1;
 308
 309        spin_lock(&dlm_node_addrs_spin);
 310        na = find_node_addr(nodeid);
 311        if (na && na->addr_count) {
 312                memcpy(&sas, na->addr[na->curr_addr_index],
 313                       sizeof(struct sockaddr_storage));
 314
 315                if (try_new_addr) {
 316                        na->curr_addr_index++;
 317                        if (na->curr_addr_index == na->addr_count)
 318                                na->curr_addr_index = 0;
 319                }
 320        }
 321        spin_unlock(&dlm_node_addrs_spin);
 322
 323        if (!na)
 324                return -EEXIST;
 325
 326        if (!na->addr_count)
 327                return -ENOENT;
 328
 329        *mark = na->mark;
 330
 331        if (sas_out)
 332                memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
 333
 334        if (!sa_out)
 335                return 0;
 336
 337        if (dlm_local_addr[0]->ss_family == AF_INET) {
 338                struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
 339                struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
 340                ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
 341        } else {
 342                struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
 343                struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
 344                ret6->sin6_addr = in6->sin6_addr;
 345        }
 346
 347        return 0;
 348}
 349
 350static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid,
 351                          unsigned int *mark)
 352{
 353        struct dlm_node_addr *na;
 354        int rv = -EEXIST;
 355        int addr_i;
 356
 357        spin_lock(&dlm_node_addrs_spin);
 358        list_for_each_entry(na, &dlm_node_addrs, list) {
 359                if (!na->addr_count)
 360                        continue;
 361
 362                for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
 363                        if (addr_compare(na->addr[addr_i], addr)) {
 364                                *nodeid = na->nodeid;
 365                                *mark = na->mark;
 366                                rv = 0;
 367                                goto unlock;
 368                        }
 369                }
 370        }
 371unlock:
 372        spin_unlock(&dlm_node_addrs_spin);
 373        return rv;
 374}
 375
 376int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
 377{
 378        struct sockaddr_storage *new_addr;
 379        struct dlm_node_addr *new_node, *na;
 380
 381        new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
 382        if (!new_node)
 383                return -ENOMEM;
 384
 385        new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
 386        if (!new_addr) {
 387                kfree(new_node);
 388                return -ENOMEM;
 389        }
 390
 391        memcpy(new_addr, addr, len);
 392
 393        spin_lock(&dlm_node_addrs_spin);
 394        na = find_node_addr(nodeid);
 395        if (!na) {
 396                new_node->nodeid = nodeid;
 397                new_node->addr[0] = new_addr;
 398                new_node->addr_count = 1;
 399                new_node->mark = dlm_config.ci_mark;
 400                list_add(&new_node->list, &dlm_node_addrs);
 401                spin_unlock(&dlm_node_addrs_spin);
 402                return 0;
 403        }
 404
 405        if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
 406                spin_unlock(&dlm_node_addrs_spin);
 407                kfree(new_addr);
 408                kfree(new_node);
 409                return -ENOSPC;
 410        }
 411
 412        na->addr[na->addr_count++] = new_addr;
 413        spin_unlock(&dlm_node_addrs_spin);
 414        kfree(new_node);
 415        return 0;
 416}
 417
 418/* Data available on socket or listen socket received a connect */
 419static void lowcomms_data_ready(struct sock *sk)
 420{
 421        struct connection *con;
 422
 423        read_lock_bh(&sk->sk_callback_lock);
 424        con = sock2con(sk);
 425        if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
 426                queue_work(recv_workqueue, &con->rwork);
 427        read_unlock_bh(&sk->sk_callback_lock);
 428}
 429
 430static void lowcomms_write_space(struct sock *sk)
 431{
 432        struct connection *con;
 433
 434        read_lock_bh(&sk->sk_callback_lock);
 435        con = sock2con(sk);
 436        if (!con)
 437                goto out;
 438
 439        clear_bit(SOCK_NOSPACE, &con->sock->flags);
 440
 441        if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
 442                con->sock->sk->sk_write_pending--;
 443                clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
 444        }
 445
 446        queue_work(send_workqueue, &con->swork);
 447out:
 448        read_unlock_bh(&sk->sk_callback_lock);
 449}
 450
 451static inline void lowcomms_connect_sock(struct connection *con)
 452{
 453        if (test_bit(CF_CLOSE, &con->flags))
 454                return;
 455        queue_work(send_workqueue, &con->swork);
 456        cond_resched();
 457}
 458
 459static void lowcomms_state_change(struct sock *sk)
 460{
 461        /* SCTP layer is not calling sk_data_ready when the connection
 462         * is done, so we catch the signal through here. Also, it
 463         * doesn't switch socket state when entering shutdown, so we
 464         * skip the write in that case.
 465         */
 466        if (sk->sk_shutdown) {
 467                if (sk->sk_shutdown == RCV_SHUTDOWN)
 468                        lowcomms_data_ready(sk);
 469        } else if (sk->sk_state == TCP_ESTABLISHED) {
 470                lowcomms_write_space(sk);
 471        }
 472}
 473
 474int dlm_lowcomms_connect_node(int nodeid)
 475{
 476        struct connection *con;
 477
 478        if (nodeid == dlm_our_nodeid())
 479                return 0;
 480
 481        con = nodeid2con(nodeid, GFP_NOFS);
 482        if (!con)
 483                return -ENOMEM;
 484        lowcomms_connect_sock(con);
 485        return 0;
 486}
 487
 488int dlm_lowcomms_nodes_set_mark(int nodeid, unsigned int mark)
 489{
 490        struct dlm_node_addr *na;
 491
 492        spin_lock(&dlm_node_addrs_spin);
 493        na = find_node_addr(nodeid);
 494        if (!na) {
 495                spin_unlock(&dlm_node_addrs_spin);
 496                return -ENOENT;
 497        }
 498
 499        na->mark = mark;
 500        spin_unlock(&dlm_node_addrs_spin);
 501
 502        return 0;
 503}
 504
 505static void lowcomms_error_report(struct sock *sk)
 506{
 507        struct connection *con;
 508        struct sockaddr_storage saddr;
 509        void (*orig_report)(struct sock *) = NULL;
 510
 511        read_lock_bh(&sk->sk_callback_lock);
 512        con = sock2con(sk);
 513        if (con == NULL)
 514                goto out;
 515
 516        orig_report = listen_sock.sk_error_report;
 517        if (con->sock == NULL ||
 518            kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) {
 519                printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
 520                                   "sending to node %d, port %d, "
 521                                   "sk_err=%d/%d\n", dlm_our_nodeid(),
 522                                   con->nodeid, dlm_config.ci_tcp_port,
 523                                   sk->sk_err, sk->sk_err_soft);
 524        } else if (saddr.ss_family == AF_INET) {
 525                struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr;
 526
 527                printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
 528                                   "sending to node %d at %pI4, port %d, "
 529                                   "sk_err=%d/%d\n", dlm_our_nodeid(),
 530                                   con->nodeid, &sin4->sin_addr.s_addr,
 531                                   dlm_config.ci_tcp_port, sk->sk_err,
 532                                   sk->sk_err_soft);
 533        } else {
 534                struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr;
 535
 536                printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
 537                                   "sending to node %d at %u.%u.%u.%u, "
 538                                   "port %d, sk_err=%d/%d\n", dlm_our_nodeid(),
 539                                   con->nodeid, sin6->sin6_addr.s6_addr32[0],
 540                                   sin6->sin6_addr.s6_addr32[1],
 541                                   sin6->sin6_addr.s6_addr32[2],
 542                                   sin6->sin6_addr.s6_addr32[3],
 543                                   dlm_config.ci_tcp_port, sk->sk_err,
 544                                   sk->sk_err_soft);
 545        }
 546out:
 547        read_unlock_bh(&sk->sk_callback_lock);
 548        if (orig_report)
 549                orig_report(sk);
 550}
 551
 552/* Note: sk_callback_lock must be locked before calling this function. */
 553static void save_listen_callbacks(struct socket *sock)
 554{
 555        struct sock *sk = sock->sk;
 556
 557        listen_sock.sk_data_ready = sk->sk_data_ready;
 558        listen_sock.sk_state_change = sk->sk_state_change;
 559        listen_sock.sk_write_space = sk->sk_write_space;
 560        listen_sock.sk_error_report = sk->sk_error_report;
 561}
 562
 563static void restore_callbacks(struct socket *sock)
 564{
 565        struct sock *sk = sock->sk;
 566
 567        write_lock_bh(&sk->sk_callback_lock);
 568        sk->sk_user_data = NULL;
 569        sk->sk_data_ready = listen_sock.sk_data_ready;
 570        sk->sk_state_change = listen_sock.sk_state_change;
 571        sk->sk_write_space = listen_sock.sk_write_space;
 572        sk->sk_error_report = listen_sock.sk_error_report;
 573        write_unlock_bh(&sk->sk_callback_lock);
 574}
 575
 576/* Make a socket active */
 577static void add_sock(struct socket *sock, struct connection *con)
 578{
 579        struct sock *sk = sock->sk;
 580
 581        write_lock_bh(&sk->sk_callback_lock);
 582        con->sock = sock;
 583
 584        sk->sk_user_data = con;
 585        /* Install a data_ready callback */
 586        sk->sk_data_ready = lowcomms_data_ready;
 587        sk->sk_write_space = lowcomms_write_space;
 588        sk->sk_state_change = lowcomms_state_change;
 589        sk->sk_allocation = GFP_NOFS;
 590        sk->sk_error_report = lowcomms_error_report;
 591        write_unlock_bh(&sk->sk_callback_lock);
 592}
 593
 594/* Add the port number to an IPv6 or 4 sockaddr and return the address
 595   length */
 596static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
 597                          int *addr_len)
 598{
 599        saddr->ss_family =  dlm_local_addr[0]->ss_family;
 600        if (saddr->ss_family == AF_INET) {
 601                struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
 602                in4_addr->sin_port = cpu_to_be16(port);
 603                *addr_len = sizeof(struct sockaddr_in);
 604                memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
 605        } else {
 606                struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
 607                in6_addr->sin6_port = cpu_to_be16(port);
 608                *addr_len = sizeof(struct sockaddr_in6);
 609        }
 610        memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
 611}
 612
 613/* Close a remote connection and tidy up */
 614static void close_connection(struct connection *con, bool and_other,
 615                             bool tx, bool rx)
 616{
 617        bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
 618
 619        if (tx && !closing && cancel_work_sync(&con->swork)) {
 620                log_print("canceled swork for node %d", con->nodeid);
 621                clear_bit(CF_WRITE_PENDING, &con->flags);
 622        }
 623        if (rx && !closing && cancel_work_sync(&con->rwork)) {
 624                log_print("canceled rwork for node %d", con->nodeid);
 625                clear_bit(CF_READ_PENDING, &con->flags);
 626        }
 627
 628        mutex_lock(&con->sock_mutex);
 629        if (con->sock) {
 630                restore_callbacks(con->sock);
 631                sock_release(con->sock);
 632                con->sock = NULL;
 633        }
 634        if (con->othercon && and_other) {
 635                /* Will only re-enter once. */
 636                close_connection(con->othercon, false, true, true);
 637        }
 638        if (con->rx_page) {
 639                __free_page(con->rx_page);
 640                con->rx_page = NULL;
 641        }
 642
 643        con->retries = 0;
 644        mutex_unlock(&con->sock_mutex);
 645        clear_bit(CF_CLOSING, &con->flags);
 646}
 647
 648/* Data received from remote end */
 649static int receive_from_sock(struct connection *con)
 650{
 651        int ret = 0;
 652        struct msghdr msg = {};
 653        struct kvec iov[2];
 654        unsigned len;
 655        int r;
 656        int call_again_soon = 0;
 657        int nvec;
 658
 659        mutex_lock(&con->sock_mutex);
 660
 661        if (con->sock == NULL) {
 662                ret = -EAGAIN;
 663                goto out_close;
 664        }
 665        if (con->nodeid == 0) {
 666                ret = -EINVAL;
 667                goto out_close;
 668        }
 669
 670        if (con->rx_page == NULL) {
 671                /*
 672                 * This doesn't need to be atomic, but I think it should
 673                 * improve performance if it is.
 674                 */
 675                con->rx_page = alloc_page(GFP_ATOMIC);
 676                if (con->rx_page == NULL)
 677                        goto out_resched;
 678                cbuf_init(&con->cb, PAGE_SIZE);
 679        }
 680
 681        /*
 682         * iov[0] is the bit of the circular buffer between the current end
 683         * point (cb.base + cb.len) and the end of the buffer.
 684         */
 685        iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
 686        iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
 687        iov[1].iov_len = 0;
 688        nvec = 1;
 689
 690        /*
 691         * iov[1] is the bit of the circular buffer between the start of the
 692         * buffer and the start of the currently used section (cb.base)
 693         */
 694        if (cbuf_data(&con->cb) >= con->cb.base) {
 695                iov[0].iov_len = PAGE_SIZE - cbuf_data(&con->cb);
 696                iov[1].iov_len = con->cb.base;
 697                iov[1].iov_base = page_address(con->rx_page);
 698                nvec = 2;
 699        }
 700        len = iov[0].iov_len + iov[1].iov_len;
 701        iov_iter_kvec(&msg.msg_iter, READ, iov, nvec, len);
 702
 703        r = ret = sock_recvmsg(con->sock, &msg, MSG_DONTWAIT | MSG_NOSIGNAL);
 704        if (ret <= 0)
 705                goto out_close;
 706        else if (ret == len)
 707                call_again_soon = 1;
 708
 709        cbuf_add(&con->cb, ret);
 710        ret = dlm_process_incoming_buffer(con->nodeid,
 711                                          page_address(con->rx_page),
 712                                          con->cb.base, con->cb.len,
 713                                          PAGE_SIZE);
 714        if (ret == -EBADMSG) {
 715                log_print("lowcomms: addr=%p, base=%u, len=%u, read=%d",
 716                          page_address(con->rx_page), con->cb.base,
 717                          con->cb.len, r);
 718        }
 719        if (ret < 0)
 720                goto out_close;
 721        cbuf_eat(&con->cb, ret);
 722
 723        if (cbuf_empty(&con->cb) && !call_again_soon) {
 724                __free_page(con->rx_page);
 725                con->rx_page = NULL;
 726        }
 727
 728        if (call_again_soon)
 729                goto out_resched;
 730        mutex_unlock(&con->sock_mutex);
 731        return 0;
 732
 733out_resched:
 734        if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
 735                queue_work(recv_workqueue, &con->rwork);
 736        mutex_unlock(&con->sock_mutex);
 737        return -EAGAIN;
 738
 739out_close:
 740        mutex_unlock(&con->sock_mutex);
 741        if (ret != -EAGAIN) {
 742                close_connection(con, true, true, false);
 743                /* Reconnect when there is something to send */
 744        }
 745        /* Don't return success if we really got EOF */
 746        if (ret == 0)
 747                ret = -EAGAIN;
 748
 749        return ret;
 750}
 751
 752/* Listening socket is busy, accept a connection */
 753static int tcp_accept_from_sock(struct connection *con)
 754{
 755        int result;
 756        struct sockaddr_storage peeraddr;
 757        struct socket *newsock;
 758        int len;
 759        int nodeid;
 760        struct connection *newcon;
 761        struct connection *addcon;
 762        unsigned int mark;
 763
 764        mutex_lock(&connections_lock);
 765        if (!dlm_allow_conn) {
 766                mutex_unlock(&connections_lock);
 767                return -1;
 768        }
 769        mutex_unlock(&connections_lock);
 770
 771        mutex_lock_nested(&con->sock_mutex, 0);
 772
 773        if (!con->sock) {
 774                mutex_unlock(&con->sock_mutex);
 775                return -ENOTCONN;
 776        }
 777
 778        result = kernel_accept(con->sock, &newsock, O_NONBLOCK);
 779        if (result < 0)
 780                goto accept_err;
 781
 782        /* Get the connected socket's peer */
 783        memset(&peeraddr, 0, sizeof(peeraddr));
 784        len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2);
 785        if (len < 0) {
 786                result = -ECONNABORTED;
 787                goto accept_err;
 788        }
 789
 790        /* Get the new node's NODEID */
 791        make_sockaddr(&peeraddr, 0, &len);
 792        if (addr_to_nodeid(&peeraddr, &nodeid, &mark)) {
 793                unsigned char *b=(unsigned char *)&peeraddr;
 794                log_print("connect from non cluster node");
 795                print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 
 796                                     b, sizeof(struct sockaddr_storage));
 797                sock_release(newsock);
 798                mutex_unlock(&con->sock_mutex);
 799                return -1;
 800        }
 801
 802        log_print("got connection from %d", nodeid);
 803
 804        /*  Check to see if we already have a connection to this node. This
 805         *  could happen if the two nodes initiate a connection at roughly
 806         *  the same time and the connections cross on the wire.
 807         *  In this case we store the incoming one in "othercon"
 808         */
 809        newcon = nodeid2con(nodeid, GFP_NOFS);
 810        if (!newcon) {
 811                result = -ENOMEM;
 812                goto accept_err;
 813        }
 814
 815        sock_set_mark(newsock->sk, mark);
 816
 817        mutex_lock_nested(&newcon->sock_mutex, 1);
 818        if (newcon->sock) {
 819                struct connection *othercon = newcon->othercon;
 820
 821                if (!othercon) {
 822                        othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
 823                        if (!othercon) {
 824                                log_print("failed to allocate incoming socket");
 825                                mutex_unlock(&newcon->sock_mutex);
 826                                result = -ENOMEM;
 827                                goto accept_err;
 828                        }
 829                        othercon->nodeid = nodeid;
 830                        othercon->rx_action = receive_from_sock;
 831                        mutex_init(&othercon->sock_mutex);
 832                        INIT_LIST_HEAD(&othercon->writequeue);
 833                        spin_lock_init(&othercon->writequeue_lock);
 834                        INIT_WORK(&othercon->swork, process_send_sockets);
 835                        INIT_WORK(&othercon->rwork, process_recv_sockets);
 836                        set_bit(CF_IS_OTHERCON, &othercon->flags);
 837                }
 838                mutex_lock_nested(&othercon->sock_mutex, 2);
 839                if (!othercon->sock) {
 840                        newcon->othercon = othercon;
 841                        add_sock(newsock, othercon);
 842                        addcon = othercon;
 843                        mutex_unlock(&othercon->sock_mutex);
 844                }
 845                else {
 846                        printk("Extra connection from node %d attempted\n", nodeid);
 847                        result = -EAGAIN;
 848                        mutex_unlock(&othercon->sock_mutex);
 849                        mutex_unlock(&newcon->sock_mutex);
 850                        goto accept_err;
 851                }
 852        }
 853        else {
 854                newcon->rx_action = receive_from_sock;
 855                /* accept copies the sk after we've saved the callbacks, so we
 856                   don't want to save them a second time or comm errors will
 857                   result in calling sk_error_report recursively. */
 858                add_sock(newsock, newcon);
 859                addcon = newcon;
 860        }
 861
 862        mutex_unlock(&newcon->sock_mutex);
 863
 864        /*
 865         * Add it to the active queue in case we got data
 866         * between processing the accept adding the socket
 867         * to the read_sockets list
 868         */
 869        if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
 870                queue_work(recv_workqueue, &addcon->rwork);
 871        mutex_unlock(&con->sock_mutex);
 872
 873        return 0;
 874
 875accept_err:
 876        mutex_unlock(&con->sock_mutex);
 877        if (newsock)
 878                sock_release(newsock);
 879
 880        if (result != -EAGAIN)
 881                log_print("error accepting connection from node: %d", result);
 882        return result;
 883}
 884
 885static int sctp_accept_from_sock(struct connection *con)
 886{
 887        /* Check that the new node is in the lockspace */
 888        struct sctp_prim prim;
 889        int nodeid;
 890        int prim_len, ret;
 891        int addr_len;
 892        struct connection *newcon;
 893        struct connection *addcon;
 894        struct socket *newsock;
 895        unsigned int mark;
 896
 897        mutex_lock(&connections_lock);
 898        if (!dlm_allow_conn) {
 899                mutex_unlock(&connections_lock);
 900                return -1;
 901        }
 902        mutex_unlock(&connections_lock);
 903
 904        mutex_lock_nested(&con->sock_mutex, 0);
 905
 906        ret = kernel_accept(con->sock, &newsock, O_NONBLOCK);
 907        if (ret < 0)
 908                goto accept_err;
 909
 910        memset(&prim, 0, sizeof(struct sctp_prim));
 911        prim_len = sizeof(struct sctp_prim);
 912
 913        ret = kernel_getsockopt(newsock, IPPROTO_SCTP, SCTP_PRIMARY_ADDR,
 914                                (char *)&prim, &prim_len);
 915        if (ret < 0) {
 916                log_print("getsockopt/sctp_primary_addr failed: %d", ret);
 917                goto accept_err;
 918        }
 919
 920        make_sockaddr(&prim.ssp_addr, 0, &addr_len);
 921        ret = addr_to_nodeid(&prim.ssp_addr, &nodeid, &mark);
 922        if (ret) {
 923                unsigned char *b = (unsigned char *)&prim.ssp_addr;
 924
 925                log_print("reject connect from unknown addr");
 926                print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
 927                                     b, sizeof(struct sockaddr_storage));
 928                goto accept_err;
 929        }
 930
 931        newcon = nodeid2con(nodeid, GFP_NOFS);
 932        if (!newcon) {
 933                ret = -ENOMEM;
 934                goto accept_err;
 935        }
 936
 937        sock_set_mark(newsock->sk, mark);
 938
 939        mutex_lock_nested(&newcon->sock_mutex, 1);
 940
 941        if (newcon->sock) {
 942                struct connection *othercon = newcon->othercon;
 943
 944                if (!othercon) {
 945                        othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
 946                        if (!othercon) {
 947                                log_print("failed to allocate incoming socket");
 948                                mutex_unlock(&newcon->sock_mutex);
 949                                ret = -ENOMEM;
 950                                goto accept_err;
 951                        }
 952                        othercon->nodeid = nodeid;
 953                        othercon->rx_action = receive_from_sock;
 954                        mutex_init(&othercon->sock_mutex);
 955                        INIT_LIST_HEAD(&othercon->writequeue);
 956                        spin_lock_init(&othercon->writequeue_lock);
 957                        INIT_WORK(&othercon->swork, process_send_sockets);
 958                        INIT_WORK(&othercon->rwork, process_recv_sockets);
 959                        set_bit(CF_IS_OTHERCON, &othercon->flags);
 960                }
 961                mutex_lock_nested(&othercon->sock_mutex, 2);
 962                if (!othercon->sock) {
 963                        newcon->othercon = othercon;
 964                        add_sock(newsock, othercon);
 965                        addcon = othercon;
 966                        mutex_unlock(&othercon->sock_mutex);
 967                } else {
 968                        printk("Extra connection from node %d attempted\n", nodeid);
 969                        ret = -EAGAIN;
 970                        mutex_unlock(&othercon->sock_mutex);
 971                        mutex_unlock(&newcon->sock_mutex);
 972                        goto accept_err;
 973                }
 974        } else {
 975                newcon->rx_action = receive_from_sock;
 976                add_sock(newsock, newcon);
 977                addcon = newcon;
 978        }
 979
 980        log_print("connected to %d", nodeid);
 981
 982        mutex_unlock(&newcon->sock_mutex);
 983
 984        /*
 985         * Add it to the active queue in case we got data
 986         * between processing the accept adding the socket
 987         * to the read_sockets list
 988         */
 989        if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
 990                queue_work(recv_workqueue, &addcon->rwork);
 991        mutex_unlock(&con->sock_mutex);
 992
 993        return 0;
 994
 995accept_err:
 996        mutex_unlock(&con->sock_mutex);
 997        if (newsock)
 998                sock_release(newsock);
 999        if (ret != -EAGAIN)
1000                log_print("error accepting connection from node: %d", ret);
1001
1002        return ret;
1003}
1004
1005static void free_entry(struct writequeue_entry *e)
1006{
1007        __free_page(e->page);
1008        kfree(e);
1009}
1010
1011/*
1012 * writequeue_entry_complete - try to delete and free write queue entry
1013 * @e: write queue entry to try to delete
1014 * @completed: bytes completed
1015 *
1016 * writequeue_lock must be held.
1017 */
1018static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
1019{
1020        e->offset += completed;
1021        e->len -= completed;
1022
1023        if (e->len == 0 && e->users == 0) {
1024                list_del(&e->list);
1025                free_entry(e);
1026        }
1027}
1028
1029/*
1030 * sctp_bind_addrs - bind a SCTP socket to all our addresses
1031 */
1032static int sctp_bind_addrs(struct connection *con, uint16_t port)
1033{
1034        struct sockaddr_storage localaddr;
1035        int i, addr_len, result = 0;
1036
1037        for (i = 0; i < dlm_local_count; i++) {
1038                memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
1039                make_sockaddr(&localaddr, port, &addr_len);
1040
1041                if (!i)
1042                        result = kernel_bind(con->sock,
1043                                             (struct sockaddr *)&localaddr,
1044                                             addr_len);
1045                else
1046                        result = kernel_setsockopt(con->sock, SOL_SCTP,
1047                                                   SCTP_SOCKOPT_BINDX_ADD,
1048                                                   (char *)&localaddr, addr_len);
1049
1050                if (result < 0) {
1051                        log_print("Can't bind to %d addr number %d, %d.\n",
1052                                  port, i + 1, result);
1053                        break;
1054                }
1055        }
1056        return result;
1057}
1058
1059/* Initiate an SCTP association.
1060   This is a special case of send_to_sock() in that we don't yet have a
1061   peeled-off socket for this association, so we use the listening socket
1062   and add the primary IP address of the remote node.
1063 */
1064static void sctp_connect_to_sock(struct connection *con)
1065{
1066        struct sockaddr_storage daddr;
1067        int one = 1;
1068        int result;
1069        int addr_len;
1070        struct socket *sock;
1071        struct timeval tv = { .tv_sec = 5, .tv_usec = 0 };
1072        unsigned int mark;
1073
1074        if (con->nodeid == 0) {
1075                log_print("attempt to connect sock 0 foiled");
1076                return;
1077        }
1078
1079        mutex_lock(&con->sock_mutex);
1080
1081        /* Some odd races can cause double-connects, ignore them */
1082        if (con->retries++ > MAX_CONNECT_RETRIES)
1083                goto out;
1084
1085        if (con->sock) {
1086                log_print("node %d already connected.", con->nodeid);
1087                goto out;
1088        }
1089
1090        memset(&daddr, 0, sizeof(daddr));
1091        result = nodeid_to_addr(con->nodeid, &daddr, NULL, true, &mark);
1092        if (result < 0) {
1093                log_print("no address for nodeid %d", con->nodeid);
1094                goto out;
1095        }
1096
1097        /* Create a socket to communicate with */
1098        result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1099                                  SOCK_STREAM, IPPROTO_SCTP, &sock);
1100        if (result < 0)
1101                goto socket_err;
1102
1103        sock_set_mark(sock->sk, mark);
1104
1105        con->rx_action = receive_from_sock;
1106        con->connect_action = sctp_connect_to_sock;
1107        add_sock(sock, con);
1108
1109        /* Bind to all addresses. */
1110        if (sctp_bind_addrs(con, 0))
1111                goto bind_err;
1112
1113        make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
1114
1115        log_print("connecting to %d", con->nodeid);
1116
1117        /* Turn off Nagle's algorithm */
1118        kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
1119                          sizeof(one));
1120
1121        /*
1122         * Make sock->ops->connect() function return in specified time,
1123         * since O_NONBLOCK argument in connect() function does not work here,
1124         * then, we should restore the default value of this attribute.
1125         */
1126        kernel_setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,
1127                          sizeof(tv));
1128        result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
1129                                   0);
1130        memset(&tv, 0, sizeof(tv));
1131        kernel_setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (char *)&tv,
1132                          sizeof(tv));
1133
1134        if (result == -EINPROGRESS)
1135                result = 0;
1136        if (result == 0)
1137                goto out;
1138
1139bind_err:
1140        con->sock = NULL;
1141        sock_release(sock);
1142
1143socket_err:
1144        /*
1145         * Some errors are fatal and this list might need adjusting. For other
1146         * errors we try again until the max number of retries is reached.
1147         */
1148        if (result != -EHOSTUNREACH &&
1149            result != -ENETUNREACH &&
1150            result != -ENETDOWN &&
1151            result != -EINVAL &&
1152            result != -EPROTONOSUPPORT) {
1153                log_print("connect %d try %d error %d", con->nodeid,
1154                          con->retries, result);
1155                mutex_unlock(&con->sock_mutex);
1156                msleep(1000);
1157                lowcomms_connect_sock(con);
1158                return;
1159        }
1160
1161out:
1162        mutex_unlock(&con->sock_mutex);
1163}
1164
1165/* Connect a new socket to its peer */
1166static void tcp_connect_to_sock(struct connection *con)
1167{
1168        struct sockaddr_storage saddr, src_addr;
1169        unsigned int mark;
1170        int addr_len;
1171        struct socket *sock = NULL;
1172        int result;
1173
1174        if (con->nodeid == 0) {
1175                log_print("attempt to connect sock 0 foiled");
1176                return;
1177        }
1178
1179        mutex_lock(&con->sock_mutex);
1180        if (con->retries++ > MAX_CONNECT_RETRIES)
1181                goto out;
1182
1183        /* Some odd races can cause double-connects, ignore them */
1184        if (con->sock)
1185                goto out;
1186
1187        /* Create a socket to communicate with */
1188        result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1189                                  SOCK_STREAM, IPPROTO_TCP, &sock);
1190        if (result < 0)
1191                goto out_err;
1192
1193        memset(&saddr, 0, sizeof(saddr));
1194        result = nodeid_to_addr(con->nodeid, &saddr, NULL, false, &mark);
1195        if (result < 0) {
1196                log_print("no address for nodeid %d", con->nodeid);
1197                goto out_err;
1198        }
1199
1200        con->rx_action = receive_from_sock;
1201        con->connect_action = tcp_connect_to_sock;
1202        sock_set_mark(sock->sk, mark);
1203
1204        add_sock(sock, con);
1205
1206        /* Bind to our cluster-known address connecting to avoid
1207           routing problems */
1208        memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1209        make_sockaddr(&src_addr, 0, &addr_len);
1210        result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1211                                 addr_len);
1212        if (result < 0) {
1213                log_print("could not bind for connect: %d", result);
1214                /* This *may* not indicate a critical error */
1215        }
1216
1217        make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
1218
1219        log_print("connecting to %d", con->nodeid);
1220
1221        /* Turn off Nagle's algorithm */
1222        tcp_sock_set_nodelay(sock->sk);
1223
1224        result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
1225                                   O_NONBLOCK);
1226        if (result == -EINPROGRESS)
1227                result = 0;
1228        if (result == 0)
1229                goto out;
1230
1231out_err:
1232        if (con->sock) {
1233                sock_release(con->sock);
1234                con->sock = NULL;
1235        } else if (sock) {
1236                sock_release(sock);
1237        }
1238        /*
1239         * Some errors are fatal and this list might need adjusting. For other
1240         * errors we try again until the max number of retries is reached.
1241         */
1242        if (result != -EHOSTUNREACH &&
1243            result != -ENETUNREACH &&
1244            result != -ENETDOWN && 
1245            result != -EINVAL &&
1246            result != -EPROTONOSUPPORT) {
1247                log_print("connect %d try %d error %d", con->nodeid,
1248                          con->retries, result);
1249                mutex_unlock(&con->sock_mutex);
1250                msleep(1000);
1251                lowcomms_connect_sock(con);
1252                return;
1253        }
1254out:
1255        mutex_unlock(&con->sock_mutex);
1256        return;
1257}
1258
1259static struct socket *tcp_create_listen_sock(struct connection *con,
1260                                             struct sockaddr_storage *saddr)
1261{
1262        struct socket *sock = NULL;
1263        int result = 0;
1264        int addr_len;
1265
1266        if (dlm_local_addr[0]->ss_family == AF_INET)
1267                addr_len = sizeof(struct sockaddr_in);
1268        else
1269                addr_len = sizeof(struct sockaddr_in6);
1270
1271        /* Create a socket to communicate with */
1272        result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1273                                  SOCK_STREAM, IPPROTO_TCP, &sock);
1274        if (result < 0) {
1275                log_print("Can't create listening comms socket");
1276                goto create_out;
1277        }
1278
1279        sock_set_mark(sock->sk, dlm_config.ci_mark);
1280
1281        /* Turn off Nagle's algorithm */
1282        tcp_sock_set_nodelay(sock->sk);
1283
1284        sock_set_reuseaddr(sock->sk);
1285
1286        write_lock_bh(&sock->sk->sk_callback_lock);
1287        sock->sk->sk_user_data = con;
1288        save_listen_callbacks(sock);
1289        con->rx_action = tcp_accept_from_sock;
1290        con->connect_action = tcp_connect_to_sock;
1291        write_unlock_bh(&sock->sk->sk_callback_lock);
1292
1293        /* Bind to our port */
1294        make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1295        result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1296        if (result < 0) {
1297                log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1298                sock_release(sock);
1299                sock = NULL;
1300                con->sock = NULL;
1301                goto create_out;
1302        }
1303        sock_set_keepalive(sock->sk);
1304
1305        result = sock->ops->listen(sock, 5);
1306        if (result < 0) {
1307                log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1308                sock_release(sock);
1309                sock = NULL;
1310                goto create_out;
1311        }
1312
1313create_out:
1314        return sock;
1315}
1316
1317/* Get local addresses */
1318static void init_local(void)
1319{
1320        struct sockaddr_storage sas, *addr;
1321        int i;
1322
1323        dlm_local_count = 0;
1324        for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1325                if (dlm_our_addr(&sas, i))
1326                        break;
1327
1328                addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS);
1329                if (!addr)
1330                        break;
1331                dlm_local_addr[dlm_local_count++] = addr;
1332        }
1333}
1334
1335/* Initialise SCTP socket and bind to all interfaces */
1336static int sctp_listen_for_all(void)
1337{
1338        struct socket *sock = NULL;
1339        int result = -EINVAL;
1340        struct connection *con = nodeid2con(0, GFP_NOFS);
1341        int bufsize = NEEDED_RMEM;
1342        int one = 1;
1343
1344        if (!con)
1345                return -ENOMEM;
1346
1347        log_print("Using SCTP for communications");
1348
1349        result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1350                                  SOCK_STREAM, IPPROTO_SCTP, &sock);
1351        if (result < 0) {
1352                log_print("Can't create comms socket, check SCTP is loaded");
1353                goto out;
1354        }
1355
1356        result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1357                                 (char *)&bufsize, sizeof(bufsize));
1358        if (result)
1359                log_print("Error increasing buffer space on socket %d", result);
1360
1361        result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
1362                                   sizeof(one));
1363        if (result < 0)
1364                log_print("Could not set SCTP NODELAY error %d\n", result);
1365
1366        sock_set_mark(sock->sk, dlm_config.ci_mark);
1367
1368        write_lock_bh(&sock->sk->sk_callback_lock);
1369        /* Init con struct */
1370        sock->sk->sk_user_data = con;
1371        save_listen_callbacks(sock);
1372        con->sock = sock;
1373        con->sock->sk->sk_data_ready = lowcomms_data_ready;
1374        con->rx_action = sctp_accept_from_sock;
1375        con->connect_action = sctp_connect_to_sock;
1376
1377        write_unlock_bh(&sock->sk->sk_callback_lock);
1378
1379        /* Bind to all addresses. */
1380        if (sctp_bind_addrs(con, dlm_config.ci_tcp_port))
1381                goto create_delsock;
1382
1383        result = sock->ops->listen(sock, 5);
1384        if (result < 0) {
1385                log_print("Can't set socket listening");
1386                goto create_delsock;
1387        }
1388
1389        return 0;
1390
1391create_delsock:
1392        sock_release(sock);
1393        con->sock = NULL;
1394out:
1395        return result;
1396}
1397
1398static int tcp_listen_for_all(void)
1399{
1400        struct socket *sock = NULL;
1401        struct connection *con = nodeid2con(0, GFP_NOFS);
1402        int result = -EINVAL;
1403
1404        if (!con)
1405                return -ENOMEM;
1406
1407        /* We don't support multi-homed hosts */
1408        if (dlm_local_addr[1] != NULL) {
1409                log_print("TCP protocol can't handle multi-homed hosts, "
1410                          "try SCTP");
1411                return -EINVAL;
1412        }
1413
1414        log_print("Using TCP for communications");
1415
1416        sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1417        if (sock) {
1418                add_sock(sock, con);
1419                result = 0;
1420        }
1421        else {
1422                result = -EADDRINUSE;
1423        }
1424
1425        return result;
1426}
1427
1428
1429
1430static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1431                                                     gfp_t allocation)
1432{
1433        struct writequeue_entry *entry;
1434
1435        entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1436        if (!entry)
1437                return NULL;
1438
1439        entry->page = alloc_page(allocation);
1440        if (!entry->page) {
1441                kfree(entry);
1442                return NULL;
1443        }
1444
1445        entry->offset = 0;
1446        entry->len = 0;
1447        entry->end = 0;
1448        entry->users = 0;
1449        entry->con = con;
1450
1451        return entry;
1452}
1453
1454void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1455{
1456        struct connection *con;
1457        struct writequeue_entry *e;
1458        int offset = 0;
1459
1460        con = nodeid2con(nodeid, allocation);
1461        if (!con)
1462                return NULL;
1463
1464        spin_lock(&con->writequeue_lock);
1465        e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1466        if ((&e->list == &con->writequeue) ||
1467            (PAGE_SIZE - e->end < len)) {
1468                e = NULL;
1469        } else {
1470                offset = e->end;
1471                e->end += len;
1472                e->users++;
1473        }
1474        spin_unlock(&con->writequeue_lock);
1475
1476        if (e) {
1477        got_one:
1478                *ppc = page_address(e->page) + offset;
1479                return e;
1480        }
1481
1482        e = new_writequeue_entry(con, allocation);
1483        if (e) {
1484                spin_lock(&con->writequeue_lock);
1485                offset = e->end;
1486                e->end += len;
1487                e->users++;
1488                list_add_tail(&e->list, &con->writequeue);
1489                spin_unlock(&con->writequeue_lock);
1490                goto got_one;
1491        }
1492        return NULL;
1493}
1494
1495void dlm_lowcomms_commit_buffer(void *mh)
1496{
1497        struct writequeue_entry *e = (struct writequeue_entry *)mh;
1498        struct connection *con = e->con;
1499        int users;
1500
1501        spin_lock(&con->writequeue_lock);
1502        users = --e->users;
1503        if (users)
1504                goto out;
1505        e->len = e->end - e->offset;
1506        spin_unlock(&con->writequeue_lock);
1507
1508        queue_work(send_workqueue, &con->swork);
1509        return;
1510
1511out:
1512        spin_unlock(&con->writequeue_lock);
1513        return;
1514}
1515
1516/* Send a message */
1517static void send_to_sock(struct connection *con)
1518{
1519        int ret = 0;
1520        const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1521        struct writequeue_entry *e;
1522        int len, offset;
1523        int count = 0;
1524
1525        mutex_lock(&con->sock_mutex);
1526        if (con->sock == NULL)
1527                goto out_connect;
1528
1529        spin_lock(&con->writequeue_lock);
1530        for (;;) {
1531                e = list_entry(con->writequeue.next, struct writequeue_entry,
1532                               list);
1533                if ((struct list_head *) e == &con->writequeue)
1534                        break;
1535
1536                len = e->len;
1537                offset = e->offset;
1538                BUG_ON(len == 0 && e->users == 0);
1539                spin_unlock(&con->writequeue_lock);
1540
1541                ret = 0;
1542                if (len) {
1543                        ret = kernel_sendpage(con->sock, e->page, offset, len,
1544                                              msg_flags);
1545                        if (ret == -EAGAIN || ret == 0) {
1546                                if (ret == -EAGAIN &&
1547                                    test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
1548                                    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1549                                        /* Notify TCP that we're limited by the
1550                                         * application window size.
1551                                         */
1552                                        set_bit(SOCK_NOSPACE, &con->sock->flags);
1553                                        con->sock->sk->sk_write_pending++;
1554                                }
1555                                cond_resched();
1556                                goto out;
1557                        } else if (ret < 0)
1558                                goto send_error;
1559                }
1560
1561                /* Don't starve people filling buffers */
1562                if (++count >= MAX_SEND_MSG_COUNT) {
1563                        cond_resched();
1564                        count = 0;
1565                }
1566
1567                spin_lock(&con->writequeue_lock);
1568                writequeue_entry_complete(e, ret);
1569        }
1570        spin_unlock(&con->writequeue_lock);
1571out:
1572        mutex_unlock(&con->sock_mutex);
1573        return;
1574
1575send_error:
1576        mutex_unlock(&con->sock_mutex);
1577        close_connection(con, true, false, true);
1578        /* Requeue the send work. When the work daemon runs again, it will try
1579           a new connection, then call this function again. */
1580        queue_work(send_workqueue, &con->swork);
1581        return;
1582
1583out_connect:
1584        mutex_unlock(&con->sock_mutex);
1585        queue_work(send_workqueue, &con->swork);
1586        cond_resched();
1587}
1588
1589static void clean_one_writequeue(struct connection *con)
1590{
1591        struct writequeue_entry *e, *safe;
1592
1593        spin_lock(&con->writequeue_lock);
1594        list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1595                list_del(&e->list);
1596                free_entry(e);
1597        }
1598        spin_unlock(&con->writequeue_lock);
1599}
1600
1601/* Called from recovery when it knows that a node has
1602   left the cluster */
1603int dlm_lowcomms_close(int nodeid)
1604{
1605        struct connection *con;
1606        struct dlm_node_addr *na;
1607
1608        log_print("closing connection to node %d", nodeid);
1609        con = nodeid2con(nodeid, 0);
1610        if (con) {
1611                set_bit(CF_CLOSE, &con->flags);
1612                close_connection(con, true, true, true);
1613                clean_one_writequeue(con);
1614        }
1615
1616        spin_lock(&dlm_node_addrs_spin);
1617        na = find_node_addr(nodeid);
1618        if (na) {
1619                list_del(&na->list);
1620                while (na->addr_count--)
1621                        kfree(na->addr[na->addr_count]);
1622                kfree(na);
1623        }
1624        spin_unlock(&dlm_node_addrs_spin);
1625
1626        return 0;
1627}
1628
1629/* Receive workqueue function */
1630static void process_recv_sockets(struct work_struct *work)
1631{
1632        struct connection *con = container_of(work, struct connection, rwork);
1633        int err;
1634
1635        clear_bit(CF_READ_PENDING, &con->flags);
1636        do {
1637                err = con->rx_action(con);
1638        } while (!err);
1639}
1640
1641/* Send workqueue function */
1642static void process_send_sockets(struct work_struct *work)
1643{
1644        struct connection *con = container_of(work, struct connection, swork);
1645
1646        clear_bit(CF_WRITE_PENDING, &con->flags);
1647        if (con->sock == NULL) /* not mutex protected so check it inside too */
1648                con->connect_action(con);
1649        if (!list_empty(&con->writequeue))
1650                send_to_sock(con);
1651}
1652
1653
1654/* Discard all entries on the write queues */
1655static void clean_writequeues(void)
1656{
1657        foreach_conn(clean_one_writequeue);
1658}
1659
1660static void work_stop(void)
1661{
1662        destroy_workqueue(recv_workqueue);
1663        destroy_workqueue(send_workqueue);
1664}
1665
1666static int work_start(void)
1667{
1668        recv_workqueue = alloc_workqueue("dlm_recv",
1669                                         WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1670        if (!recv_workqueue) {
1671                log_print("can't start dlm_recv");
1672                return -ENOMEM;
1673        }
1674
1675        send_workqueue = alloc_workqueue("dlm_send",
1676                                         WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1677        if (!send_workqueue) {
1678                log_print("can't start dlm_send");
1679                destroy_workqueue(recv_workqueue);
1680                return -ENOMEM;
1681        }
1682
1683        return 0;
1684}
1685
1686static void _stop_conn(struct connection *con, bool and_other)
1687{
1688        mutex_lock(&con->sock_mutex);
1689        set_bit(CF_CLOSE, &con->flags);
1690        set_bit(CF_READ_PENDING, &con->flags);
1691        set_bit(CF_WRITE_PENDING, &con->flags);
1692        if (con->sock && con->sock->sk) {
1693                write_lock_bh(&con->sock->sk->sk_callback_lock);
1694                con->sock->sk->sk_user_data = NULL;
1695                write_unlock_bh(&con->sock->sk->sk_callback_lock);
1696        }
1697        if (con->othercon && and_other)
1698                _stop_conn(con->othercon, false);
1699        mutex_unlock(&con->sock_mutex);
1700}
1701
1702static void stop_conn(struct connection *con)
1703{
1704        _stop_conn(con, true);
1705}
1706
1707static void free_conn(struct connection *con)
1708{
1709        close_connection(con, true, true, true);
1710        if (con->othercon)
1711                kmem_cache_free(con_cache, con->othercon);
1712        hlist_del(&con->list);
1713        kmem_cache_free(con_cache, con);
1714}
1715
1716static void work_flush(void)
1717{
1718        int ok;
1719        int i;
1720        struct hlist_node *n;
1721        struct connection *con;
1722
1723        flush_workqueue(recv_workqueue);
1724        flush_workqueue(send_workqueue);
1725        do {
1726                ok = 1;
1727                foreach_conn(stop_conn);
1728                flush_workqueue(recv_workqueue);
1729                flush_workqueue(send_workqueue);
1730                for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
1731                        hlist_for_each_entry_safe(con, n,
1732                                                  &connection_hash[i], list) {
1733                                ok &= test_bit(CF_READ_PENDING, &con->flags);
1734                                ok &= test_bit(CF_WRITE_PENDING, &con->flags);
1735                                if (con->othercon) {
1736                                        ok &= test_bit(CF_READ_PENDING,
1737                                                       &con->othercon->flags);
1738                                        ok &= test_bit(CF_WRITE_PENDING,
1739                                                       &con->othercon->flags);
1740                                }
1741                        }
1742                }
1743        } while (!ok);
1744}
1745
1746void dlm_lowcomms_stop(void)
1747{
1748        /* Set all the flags to prevent any
1749           socket activity.
1750        */
1751        mutex_lock(&connections_lock);
1752        dlm_allow_conn = 0;
1753        mutex_unlock(&connections_lock);
1754        work_flush();
1755        clean_writequeues();
1756        foreach_conn(free_conn);
1757        work_stop();
1758
1759        kmem_cache_destroy(con_cache);
1760}
1761
1762int dlm_lowcomms_start(void)
1763{
1764        int error = -EINVAL;
1765        struct connection *con;
1766        int i;
1767
1768        for (i = 0; i < CONN_HASH_SIZE; i++)
1769                INIT_HLIST_HEAD(&connection_hash[i]);
1770
1771        init_local();
1772        if (!dlm_local_count) {
1773                error = -ENOTCONN;
1774                log_print("no local IP address has been set");
1775                goto fail;
1776        }
1777
1778        error = -ENOMEM;
1779        con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1780                                      __alignof__(struct connection), 0,
1781                                      NULL);
1782        if (!con_cache)
1783                goto fail;
1784
1785        error = work_start();
1786        if (error)
1787                goto fail_destroy;
1788
1789        dlm_allow_conn = 1;
1790
1791        /* Start listening */
1792        if (dlm_config.ci_protocol == 0)
1793                error = tcp_listen_for_all();
1794        else
1795                error = sctp_listen_for_all();
1796        if (error)
1797                goto fail_unlisten;
1798
1799        return 0;
1800
1801fail_unlisten:
1802        dlm_allow_conn = 0;
1803        con = nodeid2con(0,0);
1804        if (con) {
1805                close_connection(con, false, true, true);
1806                kmem_cache_free(con_cache, con);
1807        }
1808fail_destroy:
1809        kmem_cache_destroy(con_cache);
1810fail:
1811        return error;
1812}
1813
1814void dlm_lowcomms_exit(void)
1815{
1816        struct dlm_node_addr *na, *safe;
1817
1818        spin_lock(&dlm_node_addrs_spin);
1819        list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1820                list_del(&na->list);
1821                while (na->addr_count--)
1822                        kfree(na->addr[na->addr_count]);
1823                kfree(na);
1824        }
1825        spin_unlock(&dlm_node_addrs_spin);
1826}
1827