linux/fs/dlm/lowcomms.c
<<
>>
Prefs
   1/******************************************************************************
   2*******************************************************************************
   3**
   4**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   5**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
   6**
   7**  This copyrighted material is made available to anyone wishing to use,
   8**  modify, copy, or redistribute it subject to the terms and conditions
   9**  of the GNU General Public License v.2.
  10**
  11*******************************************************************************
  12******************************************************************************/
  13
  14/*
  15 * lowcomms.c
  16 *
  17 * This is the "low-level" comms layer.
  18 *
  19 * It is responsible for sending/receiving messages
  20 * from other nodes in the cluster.
  21 *
  22 * Cluster nodes are referred to by their nodeids. nodeids are
  23 * simply 32 bit numbers to the locking module - if they need to
  24 * be expanded for the cluster infrastructure then that is its
  25 * responsibility. It is this layer's
  26 * responsibility to resolve these into IP address or
  27 * whatever it needs for inter-node communication.
  28 *
  29 * The comms level is two kernel threads that deal mainly with
  30 * the receiving of messages from other nodes and passing them
  31 * up to the mid-level comms layer (which understands the
  32 * message format) for execution by the locking core, and
  33 * a send thread which does all the setting up of connections
  34 * to remote nodes and the sending of data. Threads are not allowed
  35 * to send their own data because it may cause them to wait in times
  36 * of high load. Also, this way, the sending thread can collect together
  37 * messages bound for one node and send them in one block.
  38 *
  39 * lowcomms will choose to use either TCP or SCTP as its transport layer
  40 * depending on the configuration variable 'protocol'. This should be set
  41 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
  42 * cluster-wide mechanism as it must be the same on all nodes of the cluster
  43 * for the DLM to function.
  44 *
  45 */
  46
  47#include <asm/ioctls.h>
  48#include <net/sock.h>
  49#include <net/tcp.h>
  50#include <linux/pagemap.h>
  51#include <linux/file.h>
  52#include <linux/mutex.h>
  53#include <linux/sctp.h>
  54#include <linux/slab.h>
  55#include <net/sctp/sctp.h>
  56#include <net/ipv6.h>
  57
  58#include "dlm_internal.h"
  59#include "lowcomms.h"
  60#include "midcomms.h"
  61#include "config.h"
  62
  63#define NEEDED_RMEM (4*1024*1024)
  64#define CONN_HASH_SIZE 32
  65
  66/* Number of messages to send before rescheduling */
  67#define MAX_SEND_MSG_COUNT 25
  68
  69struct cbuf {
  70        unsigned int base;
  71        unsigned int len;
  72        unsigned int mask;
  73};
  74
  75static void cbuf_add(struct cbuf *cb, int n)
  76{
  77        cb->len += n;
  78}
  79
  80static int cbuf_data(struct cbuf *cb)
  81{
  82        return ((cb->base + cb->len) & cb->mask);
  83}
  84
  85static void cbuf_init(struct cbuf *cb, int size)
  86{
  87        cb->base = cb->len = 0;
  88        cb->mask = size-1;
  89}
  90
  91static void cbuf_eat(struct cbuf *cb, int n)
  92{
  93        cb->len  -= n;
  94        cb->base += n;
  95        cb->base &= cb->mask;
  96}
  97
  98static bool cbuf_empty(struct cbuf *cb)
  99{
 100        return cb->len == 0;
 101}
 102
 103struct connection {
 104        struct socket *sock;    /* NULL if not connected */
 105        uint32_t nodeid;        /* So we know who we are in the list */
 106        struct mutex sock_mutex;
 107        unsigned long flags;
 108#define CF_READ_PENDING 1
 109#define CF_WRITE_PENDING 2
 110#define CF_INIT_PENDING 4
 111#define CF_IS_OTHERCON 5
 112#define CF_CLOSE 6
 113#define CF_APP_LIMITED 7
 114        struct list_head writequeue;  /* List of outgoing writequeue_entries */
 115        spinlock_t writequeue_lock;
 116        int (*rx_action) (struct connection *); /* What to do when active */
 117        void (*connect_action) (struct connection *);   /* What to do to connect */
 118        struct page *rx_page;
 119        struct cbuf cb;
 120        int retries;
 121#define MAX_CONNECT_RETRIES 3
 122        struct hlist_node list;
 123        struct connection *othercon;
 124        struct work_struct rwork; /* Receive workqueue */
 125        struct work_struct swork; /* Send workqueue */
 126};
 127#define sock2con(x) ((struct connection *)(x)->sk_user_data)
 128
 129/* An entry waiting to be sent */
 130struct writequeue_entry {
 131        struct list_head list;
 132        struct page *page;
 133        int offset;
 134        int len;
 135        int end;
 136        int users;
 137        struct connection *con;
 138};
 139
 140struct dlm_node_addr {
 141        struct list_head list;
 142        int nodeid;
 143        int addr_count;
 144        int curr_addr_index;
 145        struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
 146};
 147
 148static struct listen_sock_callbacks {
 149        void (*sk_error_report)(struct sock *);
 150        void (*sk_data_ready)(struct sock *, int);
 151        void (*sk_state_change)(struct sock *);
 152        void (*sk_write_space)(struct sock *);
 153} listen_sock;
 154
 155static LIST_HEAD(dlm_node_addrs);
 156static DEFINE_SPINLOCK(dlm_node_addrs_spin);
 157
 158static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
 159static int dlm_local_count;
 160static int dlm_allow_conn;
 161
 162/* Work queues */
 163static struct workqueue_struct *recv_workqueue;
 164static struct workqueue_struct *send_workqueue;
 165
 166static struct hlist_head connection_hash[CONN_HASH_SIZE];
 167static DEFINE_MUTEX(connections_lock);
 168static struct kmem_cache *con_cache;
 169
 170static void process_recv_sockets(struct work_struct *work);
 171static void process_send_sockets(struct work_struct *work);
 172
 173
 174/* This is deliberately very simple because most clusters have simple
 175   sequential nodeids, so we should be able to go straight to a connection
 176   struct in the array */
 177static inline int nodeid_hash(int nodeid)
 178{
 179        return nodeid & (CONN_HASH_SIZE-1);
 180}
 181
 182static struct connection *__find_con(int nodeid)
 183{
 184        int r;
 185        struct connection *con;
 186
 187        r = nodeid_hash(nodeid);
 188
 189        hlist_for_each_entry(con, &connection_hash[r], list) {
 190                if (con->nodeid == nodeid)
 191                        return con;
 192        }
 193        return NULL;
 194}
 195
 196/*
 197 * If 'allocation' is zero then we don't attempt to create a new
 198 * connection structure for this node.
 199 */
 200static struct connection *__nodeid2con(int nodeid, gfp_t alloc)
 201{
 202        struct connection *con = NULL;
 203        int r;
 204
 205        con = __find_con(nodeid);
 206        if (con || !alloc)
 207                return con;
 208
 209        con = kmem_cache_zalloc(con_cache, alloc);
 210        if (!con)
 211                return NULL;
 212
 213        r = nodeid_hash(nodeid);
 214        hlist_add_head(&con->list, &connection_hash[r]);
 215
 216        con->nodeid = nodeid;
 217        mutex_init(&con->sock_mutex);
 218        INIT_LIST_HEAD(&con->writequeue);
 219        spin_lock_init(&con->writequeue_lock);
 220        INIT_WORK(&con->swork, process_send_sockets);
 221        INIT_WORK(&con->rwork, process_recv_sockets);
 222
 223        /* Setup action pointers for child sockets */
 224        if (con->nodeid) {
 225                struct connection *zerocon = __find_con(0);
 226
 227                con->connect_action = zerocon->connect_action;
 228                if (!con->rx_action)
 229                        con->rx_action = zerocon->rx_action;
 230        }
 231
 232        return con;
 233}
 234
 235/* Loop round all connections */
 236static void foreach_conn(void (*conn_func)(struct connection *c))
 237{
 238        int i;
 239        struct hlist_node *n;
 240        struct connection *con;
 241
 242        for (i = 0; i < CONN_HASH_SIZE; i++) {
 243                hlist_for_each_entry_safe(con, n, &connection_hash[i], list)
 244                        conn_func(con);
 245        }
 246}
 247
 248static struct connection *nodeid2con(int nodeid, gfp_t allocation)
 249{
 250        struct connection *con;
 251
 252        mutex_lock(&connections_lock);
 253        con = __nodeid2con(nodeid, allocation);
 254        mutex_unlock(&connections_lock);
 255
 256        return con;
 257}
 258
 259static struct dlm_node_addr *find_node_addr(int nodeid)
 260{
 261        struct dlm_node_addr *na;
 262
 263        list_for_each_entry(na, &dlm_node_addrs, list) {
 264                if (na->nodeid == nodeid)
 265                        return na;
 266        }
 267        return NULL;
 268}
 269
 270static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
 271{
 272        switch (x->ss_family) {
 273        case AF_INET: {
 274                struct sockaddr_in *sinx = (struct sockaddr_in *)x;
 275                struct sockaddr_in *siny = (struct sockaddr_in *)y;
 276                if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
 277                        return 0;
 278                if (sinx->sin_port != siny->sin_port)
 279                        return 0;
 280                break;
 281        }
 282        case AF_INET6: {
 283                struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
 284                struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
 285                if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
 286                        return 0;
 287                if (sinx->sin6_port != siny->sin6_port)
 288                        return 0;
 289                break;
 290        }
 291        default:
 292                return 0;
 293        }
 294        return 1;
 295}
 296
 297static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
 298                          struct sockaddr *sa_out, bool try_new_addr)
 299{
 300        struct sockaddr_storage sas;
 301        struct dlm_node_addr *na;
 302
 303        if (!dlm_local_count)
 304                return -1;
 305
 306        spin_lock(&dlm_node_addrs_spin);
 307        na = find_node_addr(nodeid);
 308        if (na && na->addr_count) {
 309                memcpy(&sas, na->addr[na->curr_addr_index],
 310                        sizeof(struct sockaddr_storage));
 311
 312                if (try_new_addr) {
 313                        na->curr_addr_index++;
 314                        if (na->curr_addr_index == na->addr_count)
 315                                na->curr_addr_index = 0;
 316                }
 317        }
 318        spin_unlock(&dlm_node_addrs_spin);
 319
 320        if (!na)
 321                return -EEXIST;
 322
 323        if (!na->addr_count)
 324                return -ENOENT;
 325
 326        if (sas_out)
 327                memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
 328
 329        if (!sa_out)
 330                return 0;
 331
 332        if (dlm_local_addr[0]->ss_family == AF_INET) {
 333                struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
 334                struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
 335                ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
 336        } else {
 337                struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
 338                struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
 339                ret6->sin6_addr = in6->sin6_addr;
 340        }
 341
 342        return 0;
 343}
 344
 345static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
 346{
 347        struct dlm_node_addr *na;
 348        int rv = -EEXIST;
 349        int addr_i;
 350
 351        spin_lock(&dlm_node_addrs_spin);
 352        list_for_each_entry(na, &dlm_node_addrs, list) {
 353                if (!na->addr_count)
 354                        continue;
 355
 356                for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
 357                        if (addr_compare(na->addr[addr_i], addr)) {
 358                                *nodeid = na->nodeid;
 359                                rv = 0;
 360                                goto unlock;
 361                        }
 362                }
 363        }
 364unlock:
 365        spin_unlock(&dlm_node_addrs_spin);
 366        return rv;
 367}
 368
 369int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
 370{
 371        struct sockaddr_storage *new_addr;
 372        struct dlm_node_addr *new_node, *na;
 373
 374        new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
 375        if (!new_node)
 376                return -ENOMEM;
 377
 378        new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
 379        if (!new_addr) {
 380                kfree(new_node);
 381                return -ENOMEM;
 382        }
 383
 384        memcpy(new_addr, addr, len);
 385
 386        spin_lock(&dlm_node_addrs_spin);
 387        na = find_node_addr(nodeid);
 388        if (!na) {
 389                new_node->nodeid = nodeid;
 390                new_node->addr[0] = new_addr;
 391                new_node->addr_count = 1;
 392                list_add(&new_node->list, &dlm_node_addrs);
 393                spin_unlock(&dlm_node_addrs_spin);
 394                return 0;
 395        }
 396
 397        if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
 398                spin_unlock(&dlm_node_addrs_spin);
 399                kfree(new_addr);
 400                kfree(new_node);
 401                return -ENOSPC;
 402        }
 403
 404        na->addr[na->addr_count++] = new_addr;
 405        spin_unlock(&dlm_node_addrs_spin);
 406        kfree(new_node);
 407        return 0;
 408}
 409
 410/* Data available on socket or listen socket received a connect */
 411static void lowcomms_data_ready(struct sock *sk, int count_unused)
 412{
 413        struct connection *con = sock2con(sk);
 414        if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
 415                queue_work(recv_workqueue, &con->rwork);
 416}
 417
 418static void lowcomms_write_space(struct sock *sk)
 419{
 420        struct connection *con = sock2con(sk);
 421
 422        if (!con)
 423                return;
 424
 425        clear_bit(SOCK_NOSPACE, &con->sock->flags);
 426
 427        if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
 428                con->sock->sk->sk_write_pending--;
 429                clear_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags);
 430        }
 431
 432        if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
 433                queue_work(send_workqueue, &con->swork);
 434}
 435
 436static inline void lowcomms_connect_sock(struct connection *con)
 437{
 438        if (test_bit(CF_CLOSE, &con->flags))
 439                return;
 440        queue_work(send_workqueue, &con->swork);
 441        cond_resched();
 442}
 443
 444static void lowcomms_state_change(struct sock *sk)
 445{
 446        /* SCTP layer is not calling sk_data_ready when the connection
 447         * is done, so we catch the signal through here. Also, it
 448         * doesn't switch socket state when entering shutdown, so we
 449         * skip the write in that case.
 450         */
 451        if (sk->sk_shutdown) {
 452                if (sk->sk_shutdown == RCV_SHUTDOWN)
 453                        lowcomms_data_ready(sk, 0);
 454        } else if (sk->sk_state == TCP_ESTABLISHED) {
 455                lowcomms_write_space(sk);
 456        }
 457}
 458
 459int dlm_lowcomms_connect_node(int nodeid)
 460{
 461        struct connection *con;
 462
 463        if (nodeid == dlm_our_nodeid())
 464                return 0;
 465
 466        con = nodeid2con(nodeid, GFP_NOFS);
 467        if (!con)
 468                return -ENOMEM;
 469        lowcomms_connect_sock(con);
 470        return 0;
 471}
 472
 473static void lowcomms_error_report(struct sock *sk)
 474{
 475        struct connection *con;
 476        struct sockaddr_storage saddr;
 477        int buflen;
 478        void (*orig_report)(struct sock *) = NULL;
 479
 480        read_lock_bh(&sk->sk_callback_lock);
 481        con = sock2con(sk);
 482        if (con == NULL)
 483                goto out;
 484
 485        orig_report = listen_sock.sk_error_report;
 486        if (con->sock == NULL ||
 487            kernel_getpeername(con->sock, (struct sockaddr *)&saddr, &buflen)) {
 488                printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
 489                                   "sending to node %d, port %d, "
 490                                   "sk_err=%d/%d\n", dlm_our_nodeid(),
 491                                   con->nodeid, dlm_config.ci_tcp_port,
 492                                   sk->sk_err, sk->sk_err_soft);
 493        } else if (saddr.ss_family == AF_INET) {
 494                struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr;
 495
 496                printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
 497                                   "sending to node %d at %pI4, port %d, "
 498                                   "sk_err=%d/%d\n", dlm_our_nodeid(),
 499                                   con->nodeid, &sin4->sin_addr.s_addr,
 500                                   dlm_config.ci_tcp_port, sk->sk_err,
 501                                   sk->sk_err_soft);
 502        } else {
 503                struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr;
 504
 505                printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
 506                                   "sending to node %d at %u.%u.%u.%u, "
 507                                   "port %d, sk_err=%d/%d\n", dlm_our_nodeid(),
 508                                   con->nodeid, sin6->sin6_addr.s6_addr32[0],
 509                                   sin6->sin6_addr.s6_addr32[1],
 510                                   sin6->sin6_addr.s6_addr32[2],
 511                                   sin6->sin6_addr.s6_addr32[3],
 512                                   dlm_config.ci_tcp_port, sk->sk_err,
 513                                   sk->sk_err_soft);
 514        }
 515out:
 516        read_unlock_bh(&sk->sk_callback_lock);
 517        if (orig_report)
 518                orig_report(sk);
 519}
 520
 521/* Note: sk_callback_lock must be locked before calling this function. */
 522static void save_listen_callbacks(struct socket *sock)
 523{
 524        struct sock *sk = sock->sk;
 525
 526        listen_sock.sk_data_ready = sk->sk_data_ready;
 527        listen_sock.sk_state_change = sk->sk_state_change;
 528        listen_sock.sk_write_space = sk->sk_write_space;
 529        listen_sock.sk_error_report = sk->sk_error_report;
 530}
 531
 532static void restore_callbacks(struct socket *sock)
 533{
 534        struct sock *sk = sock->sk;
 535
 536        write_lock_bh(&sk->sk_callback_lock);
 537        sk->sk_user_data = NULL;
 538        sk->sk_data_ready = listen_sock.sk_data_ready;
 539        sk->sk_state_change = listen_sock.sk_state_change;
 540        sk->sk_write_space = listen_sock.sk_write_space;
 541        sk->sk_error_report = listen_sock.sk_error_report;
 542        write_unlock_bh(&sk->sk_callback_lock);
 543}
 544
 545/* Make a socket active */
 546static void add_sock(struct socket *sock, struct connection *con)
 547{
 548        struct sock *sk = sock->sk;
 549
 550        write_lock_bh(&sk->sk_callback_lock);
 551        con->sock = sock;
 552
 553        sk->sk_user_data = con;
 554        /* Install a data_ready callback */
 555        sk->sk_data_ready = lowcomms_data_ready;
 556        sk->sk_write_space = lowcomms_write_space;
 557        sk->sk_state_change = lowcomms_state_change;
 558        sk->sk_allocation = GFP_NOFS;
 559        sk->sk_error_report = lowcomms_error_report;
 560        write_unlock_bh(&sk->sk_callback_lock);
 561}
 562
 563/* Add the port number to an IPv6 or 4 sockaddr and return the address
 564   length */
 565static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
 566                          int *addr_len)
 567{
 568        saddr->ss_family =  dlm_local_addr[0]->ss_family;
 569        if (saddr->ss_family == AF_INET) {
 570                struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
 571                in4_addr->sin_port = cpu_to_be16(port);
 572                *addr_len = sizeof(struct sockaddr_in);
 573                memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
 574        } else {
 575                struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
 576                in6_addr->sin6_port = cpu_to_be16(port);
 577                *addr_len = sizeof(struct sockaddr_in6);
 578        }
 579        memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
 580}
 581
 582/* Close a remote connection and tidy up */
 583static void close_connection(struct connection *con, bool and_other,
 584                             bool tx, bool rx)
 585{
 586        clear_bit(CF_WRITE_PENDING, &con->flags);
 587        if (tx && cancel_work_sync(&con->swork))
 588                log_print("canceled swork for node %d", con->nodeid);
 589        if (rx && cancel_work_sync(&con->rwork))
 590                log_print("canceled rwork for node %d", con->nodeid);
 591
 592        mutex_lock(&con->sock_mutex);
 593        if (con->sock) {
 594                restore_callbacks(con->sock);
 595                sock_release(con->sock);
 596                con->sock = NULL;
 597        }
 598        if (con->othercon && and_other) {
 599                /* Will only re-enter once. */
 600                close_connection(con->othercon, false, true, true);
 601        }
 602        if (con->rx_page) {
 603                __free_page(con->rx_page);
 604                con->rx_page = NULL;
 605        }
 606
 607        con->retries = 0;
 608        mutex_unlock(&con->sock_mutex);
 609}
 610
 611/* Data received from remote end */
 612static int receive_from_sock(struct connection *con)
 613{
 614        int ret = 0;
 615        struct msghdr msg = {};
 616        struct kvec iov[2];
 617        unsigned len;
 618        int r;
 619        int call_again_soon = 0;
 620        int nvec;
 621
 622        mutex_lock(&con->sock_mutex);
 623
 624        if (con->sock == NULL) {
 625                ret = -EAGAIN;
 626                goto out_close;
 627        }
 628        if (con->nodeid == 0) {
 629                ret = -EINVAL;
 630                goto out_close;
 631        }
 632
 633        if (con->rx_page == NULL) {
 634                /*
 635                 * This doesn't need to be atomic, but I think it should
 636                 * improve performance if it is.
 637                 */
 638                con->rx_page = alloc_page(GFP_ATOMIC);
 639                if (con->rx_page == NULL)
 640                        goto out_resched;
 641                cbuf_init(&con->cb, PAGE_CACHE_SIZE);
 642        }
 643
 644        /*
 645         * iov[0] is the bit of the circular buffer between the current end
 646         * point (cb.base + cb.len) and the end of the buffer.
 647         */
 648        iov[0].iov_len = con->cb.base - cbuf_data(&con->cb);
 649        iov[0].iov_base = page_address(con->rx_page) + cbuf_data(&con->cb);
 650        iov[1].iov_len = 0;
 651        nvec = 1;
 652
 653        /*
 654         * iov[1] is the bit of the circular buffer between the start of the
 655         * buffer and the start of the currently used section (cb.base)
 656         */
 657        if (cbuf_data(&con->cb) >= con->cb.base) {
 658                iov[0].iov_len = PAGE_CACHE_SIZE - cbuf_data(&con->cb);
 659                iov[1].iov_len = con->cb.base;
 660                iov[1].iov_base = page_address(con->rx_page);
 661                nvec = 2;
 662        }
 663        len = iov[0].iov_len + iov[1].iov_len;
 664
 665        r = ret = kernel_recvmsg(con->sock, &msg, iov, nvec, len,
 666                               MSG_DONTWAIT | MSG_NOSIGNAL);
 667        if (ret <= 0)
 668                goto out_close;
 669        else if (ret == len)
 670                call_again_soon = 1;
 671
 672        cbuf_add(&con->cb, ret);
 673        ret = dlm_process_incoming_buffer(con->nodeid,
 674                                          page_address(con->rx_page),
 675                                          con->cb.base, con->cb.len,
 676                                          PAGE_CACHE_SIZE);
 677        if (ret == -EBADMSG) {
 678                log_print("lowcomms: addr=%p, base=%u, len=%u, read=%d",
 679                          page_address(con->rx_page), con->cb.base,
 680                          con->cb.len, r);
 681        }
 682        if (ret < 0)
 683                goto out_close;
 684        cbuf_eat(&con->cb, ret);
 685
 686        if (cbuf_empty(&con->cb) && !call_again_soon) {
 687                __free_page(con->rx_page);
 688                con->rx_page = NULL;
 689        }
 690
 691        if (call_again_soon)
 692                goto out_resched;
 693        mutex_unlock(&con->sock_mutex);
 694        return 0;
 695
 696out_resched:
 697        if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
 698                queue_work(recv_workqueue, &con->rwork);
 699        mutex_unlock(&con->sock_mutex);
 700        return -EAGAIN;
 701
 702out_close:
 703        mutex_unlock(&con->sock_mutex);
 704        if (ret != -EAGAIN) {
 705                close_connection(con, false, true, false);
 706                /* Reconnect when there is something to send */
 707        }
 708        /* Don't return success if we really got EOF */
 709        if (ret == 0)
 710                ret = -EAGAIN;
 711
 712        return ret;
 713}
 714
 715/* Listening socket is busy, accept a connection */
 716static int tcp_accept_from_sock(struct connection *con)
 717{
 718        int result;
 719        struct sockaddr_storage peeraddr;
 720        struct socket *newsock;
 721        int len;
 722        int nodeid;
 723        struct connection *newcon;
 724        struct connection *addcon;
 725
 726        mutex_lock(&connections_lock);
 727        if (!dlm_allow_conn) {
 728                mutex_unlock(&connections_lock);
 729                return -1;
 730        }
 731        mutex_unlock(&connections_lock);
 732
 733        memset(&peeraddr, 0, sizeof(peeraddr));
 734        result = sock_create_lite(dlm_local_addr[0]->ss_family, SOCK_STREAM,
 735                                  IPPROTO_TCP, &newsock);
 736        if (result < 0)
 737                return -ENOMEM;
 738
 739        mutex_lock_nested(&con->sock_mutex, 0);
 740
 741        result = -ENOTCONN;
 742        if (con->sock == NULL)
 743                goto accept_err;
 744
 745        newsock->type = con->sock->type;
 746        newsock->ops = con->sock->ops;
 747
 748        result = con->sock->ops->accept(con->sock, newsock, O_NONBLOCK);
 749        if (result < 0)
 750                goto accept_err;
 751
 752        /* Get the connected socket's peer */
 753        memset(&peeraddr, 0, sizeof(peeraddr));
 754        if (newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr,
 755                                  &len, 2)) {
 756                result = -ECONNABORTED;
 757                goto accept_err;
 758        }
 759
 760        /* Get the new node's NODEID */
 761        make_sockaddr(&peeraddr, 0, &len);
 762        if (addr_to_nodeid(&peeraddr, &nodeid)) {
 763                unsigned char *b=(unsigned char *)&peeraddr;
 764                log_print("connect from non cluster node");
 765                print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 
 766                                     b, sizeof(struct sockaddr_storage));
 767                sock_release(newsock);
 768                mutex_unlock(&con->sock_mutex);
 769                return -1;
 770        }
 771
 772        log_print("got connection from %d", nodeid);
 773
 774        /*  Check to see if we already have a connection to this node. This
 775         *  could happen if the two nodes initiate a connection at roughly
 776         *  the same time and the connections cross on the wire.
 777         *  In this case we store the incoming one in "othercon"
 778         */
 779        newcon = nodeid2con(nodeid, GFP_NOFS);
 780        if (!newcon) {
 781                result = -ENOMEM;
 782                goto accept_err;
 783        }
 784        mutex_lock_nested(&newcon->sock_mutex, 1);
 785        if (newcon->sock) {
 786                struct connection *othercon = newcon->othercon;
 787
 788                if (!othercon) {
 789                        othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
 790                        if (!othercon) {
 791                                log_print("failed to allocate incoming socket");
 792                                mutex_unlock(&newcon->sock_mutex);
 793                                result = -ENOMEM;
 794                                goto accept_err;
 795                        }
 796                        othercon->nodeid = nodeid;
 797                        othercon->rx_action = receive_from_sock;
 798                        mutex_init(&othercon->sock_mutex);
 799                        INIT_WORK(&othercon->swork, process_send_sockets);
 800                        INIT_WORK(&othercon->rwork, process_recv_sockets);
 801                        set_bit(CF_IS_OTHERCON, &othercon->flags);
 802                }
 803                if (!othercon->sock) {
 804                        newcon->othercon = othercon;
 805                        othercon->sock = newsock;
 806                        newsock->sk->sk_user_data = othercon;
 807                        add_sock(newsock, othercon);
 808                        addcon = othercon;
 809                }
 810                else {
 811                        printk("Extra connection from node %d attempted\n", nodeid);
 812                        result = -EAGAIN;
 813                        mutex_unlock(&newcon->sock_mutex);
 814                        goto accept_err;
 815                }
 816        }
 817        else {
 818                newsock->sk->sk_user_data = newcon;
 819                newcon->rx_action = receive_from_sock;
 820                add_sock(newsock, newcon);
 821                addcon = newcon;
 822        }
 823
 824        mutex_unlock(&newcon->sock_mutex);
 825
 826        /*
 827         * Add it to the active queue in case we got data
 828         * between processing the accept adding the socket
 829         * to the read_sockets list
 830         */
 831        if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
 832                queue_work(recv_workqueue, &addcon->rwork);
 833        mutex_unlock(&con->sock_mutex);
 834
 835        return 0;
 836
 837accept_err:
 838        mutex_unlock(&con->sock_mutex);
 839        sock_release(newsock);
 840
 841        if (result != -EAGAIN)
 842                log_print("error accepting connection from node: %d", result);
 843        return result;
 844}
 845
 846static int sctp_accept_from_sock(struct connection *con)
 847{
 848        /* Check that the new node is in the lockspace */
 849        struct sctp_prim prim;
 850        int nodeid;
 851        int prim_len, ret;
 852        int addr_len;
 853        struct connection *newcon;
 854        struct connection *addcon;
 855        struct socket *newsock;
 856
 857        mutex_lock(&connections_lock);
 858        if (!dlm_allow_conn) {
 859                mutex_unlock(&connections_lock);
 860                return -1;
 861        }
 862        mutex_unlock(&connections_lock);
 863
 864        mutex_lock_nested(&con->sock_mutex, 0);
 865
 866        ret = kernel_accept(con->sock, &newsock, O_NONBLOCK);
 867        if (ret < 0)
 868                goto accept_err;
 869
 870        memset(&prim, 0, sizeof(struct sctp_prim));
 871        prim_len = sizeof(struct sctp_prim);
 872
 873        ret = kernel_getsockopt(newsock, IPPROTO_SCTP, SCTP_PRIMARY_ADDR,
 874                                (char *)&prim, &prim_len);
 875        if (ret < 0) {
 876                log_print("getsockopt/sctp_primary_addr failed: %d", ret);
 877                goto accept_err;
 878        }
 879
 880        make_sockaddr(&prim.ssp_addr, 0, &addr_len);
 881        if (addr_to_nodeid(&prim.ssp_addr, &nodeid)) {
 882                unsigned char *b = (unsigned char *)&prim.ssp_addr;
 883
 884                log_print("reject connect from unknown addr");
 885                print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE,
 886                                     b, sizeof(struct sockaddr_storage));
 887                goto accept_err;
 888        }
 889
 890        newcon = nodeid2con(nodeid, GFP_NOFS);
 891        if (!newcon) {
 892                ret = -ENOMEM;
 893                goto accept_err;
 894        }
 895
 896        mutex_lock_nested(&newcon->sock_mutex, 1);
 897
 898        if (newcon->sock) {
 899                struct connection *othercon = newcon->othercon;
 900
 901                if (!othercon) {
 902                        othercon = kmem_cache_zalloc(con_cache, GFP_NOFS);
 903                        if (!othercon) {
 904                                log_print("failed to allocate incoming socket");
 905                                mutex_unlock(&newcon->sock_mutex);
 906                                ret = -ENOMEM;
 907                                goto accept_err;
 908                        }
 909                        othercon->nodeid = nodeid;
 910                        othercon->rx_action = receive_from_sock;
 911                        mutex_init(&othercon->sock_mutex);
 912                        INIT_WORK(&othercon->swork, process_send_sockets);
 913                        INIT_WORK(&othercon->rwork, process_recv_sockets);
 914                        set_bit(CF_IS_OTHERCON, &othercon->flags);
 915                }
 916                if (!othercon->sock) {
 917                        newcon->othercon = othercon;
 918                        othercon->sock = newsock;
 919                        newsock->sk->sk_user_data = othercon;
 920                        add_sock(newsock, othercon);
 921                        addcon = othercon;
 922                } else {
 923                        printk("Extra connection from node %d attempted\n", nodeid);
 924                        ret = -EAGAIN;
 925                        mutex_unlock(&newcon->sock_mutex);
 926                        goto accept_err;
 927                }
 928        } else {
 929                newsock->sk->sk_user_data = newcon;
 930                newcon->rx_action = receive_from_sock;
 931                add_sock(newsock, newcon);
 932                addcon = newcon;
 933        }
 934
 935        log_print("connected to %d", nodeid);
 936
 937        mutex_unlock(&newcon->sock_mutex);
 938
 939        /*
 940         * Add it to the active queue in case we got data
 941         * between processing the accept adding the socket
 942         * to the read_sockets list
 943         */
 944        if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
 945                queue_work(recv_workqueue, &addcon->rwork);
 946        mutex_unlock(&con->sock_mutex);
 947
 948        return 0;
 949
 950accept_err:
 951        mutex_unlock(&con->sock_mutex);
 952        if (newsock)
 953                sock_release(newsock);
 954        if (ret != -EAGAIN)
 955                log_print("error accepting connection from node: %d", ret);
 956
 957        return ret;
 958}
 959
 960static void free_entry(struct writequeue_entry *e)
 961{
 962        __free_page(e->page);
 963        kfree(e);
 964}
 965
 966/*
 967 * writequeue_entry_complete - try to delete and free write queue entry
 968 * @e: write queue entry to try to delete
 969 * @completed: bytes completed
 970 *
 971 * writequeue_lock must be held.
 972 */
 973static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
 974{
 975        e->offset += completed;
 976        e->len -= completed;
 977
 978        if (e->len == 0 && e->users == 0) {
 979                list_del(&e->list);
 980                free_entry(e);
 981        }
 982}
 983
 984/*
 985 * sctp_bind_addrs - bind a SCTP socket to all our addresses
 986 */
 987static int sctp_bind_addrs(struct connection *con, uint16_t port)
 988{
 989        struct sockaddr_storage localaddr;
 990        int i, addr_len, result = 0;
 991
 992        for (i = 0; i < dlm_local_count; i++) {
 993                memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
 994                make_sockaddr(&localaddr, port, &addr_len);
 995
 996                if (!i)
 997                        result = kernel_bind(con->sock,
 998                                             (struct sockaddr *)&localaddr,
 999                                             addr_len);
1000                else
1001                        result = kernel_setsockopt(con->sock, SOL_SCTP,
1002                                                   SCTP_SOCKOPT_BINDX_ADD,
1003                                                   (char *)&localaddr, addr_len);
1004
1005                if (result < 0) {
1006                        log_print("Can't bind to %d addr number %d, %d.\n",
1007                                  port, i + 1, result);
1008                        break;
1009                }
1010        }
1011        return result;
1012}
1013
1014/* Initiate an SCTP association.
1015   This is a special case of send_to_sock() in that we don't yet have a
1016   peeled-off socket for this association, so we use the listening socket
1017   and add the primary IP address of the remote node.
1018 */
1019static void sctp_connect_to_sock(struct connection *con)
1020{
1021        struct sockaddr_storage daddr;
1022        int one = 1;
1023        int result;
1024        int addr_len;
1025        struct socket *sock;
1026
1027        if (con->nodeid == 0) {
1028                log_print("attempt to connect sock 0 foiled");
1029                return;
1030        }
1031
1032        mutex_lock(&con->sock_mutex);
1033
1034        /* Some odd races can cause double-connects, ignore them */
1035        if (con->retries++ > MAX_CONNECT_RETRIES)
1036                goto out;
1037
1038        if (con->sock) {
1039                log_print("node %d already connected.", con->nodeid);
1040                goto out;
1041        }
1042
1043        memset(&daddr, 0, sizeof(daddr));
1044        result = nodeid_to_addr(con->nodeid, &daddr, NULL, true);
1045        if (result < 0) {
1046                log_print("no address for nodeid %d", con->nodeid);
1047                goto out;
1048        }
1049
1050        /* Create a socket to communicate with */
1051        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1052                                  IPPROTO_SCTP, &sock);
1053        if (result < 0)
1054                goto socket_err;
1055
1056        sock->sk->sk_user_data = con;
1057        con->rx_action = receive_from_sock;
1058        con->connect_action = sctp_connect_to_sock;
1059        add_sock(sock, con);
1060
1061        /* Bind to all addresses. */
1062        if (sctp_bind_addrs(con, 0))
1063                goto bind_err;
1064
1065        make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
1066
1067        log_print("connecting to %d", con->nodeid);
1068
1069        /* Turn off Nagle's algorithm */
1070        kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1071                          sizeof(one));
1072
1073        result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
1074                                   O_NONBLOCK);
1075        if (result == -EINPROGRESS)
1076                result = 0;
1077        if (result == 0)
1078                goto out;
1079
1080
1081bind_err:
1082        con->sock = NULL;
1083        sock_release(sock);
1084
1085socket_err:
1086        /*
1087         * Some errors are fatal and this list might need adjusting. For other
1088         * errors we try again until the max number of retries is reached.
1089         */
1090        if (result != -EHOSTUNREACH &&
1091            result != -ENETUNREACH &&
1092            result != -ENETDOWN &&
1093            result != -EINVAL &&
1094            result != -EPROTONOSUPPORT) {
1095                log_print("connect %d try %d error %d", con->nodeid,
1096                          con->retries, result);
1097                mutex_unlock(&con->sock_mutex);
1098                msleep(1000);
1099                lowcomms_connect_sock(con);
1100                return;
1101        }
1102
1103out:
1104        mutex_unlock(&con->sock_mutex);
1105        set_bit(CF_WRITE_PENDING, &con->flags);
1106}
1107
1108/* Connect a new socket to its peer */
1109static void tcp_connect_to_sock(struct connection *con)
1110{
1111        struct sockaddr_storage saddr, src_addr;
1112        int addr_len;
1113        struct socket *sock = NULL;
1114        int one = 1;
1115        int result;
1116
1117        if (con->nodeid == 0) {
1118                log_print("attempt to connect sock 0 foiled");
1119                return;
1120        }
1121
1122        mutex_lock(&con->sock_mutex);
1123        if (con->retries++ > MAX_CONNECT_RETRIES)
1124                goto out;
1125
1126        /* Some odd races can cause double-connects, ignore them */
1127        if (con->sock)
1128                goto out;
1129
1130        /* Create a socket to communicate with */
1131        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1132                                  IPPROTO_TCP, &sock);
1133        if (result < 0)
1134                goto out_err;
1135
1136        memset(&saddr, 0, sizeof(saddr));
1137        result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
1138        if (result < 0) {
1139                log_print("no address for nodeid %d", con->nodeid);
1140                goto out_err;
1141        }
1142
1143        sock->sk->sk_user_data = con;
1144        con->rx_action = receive_from_sock;
1145        con->connect_action = tcp_connect_to_sock;
1146        add_sock(sock, con);
1147
1148        /* Bind to our cluster-known address connecting to avoid
1149           routing problems */
1150        memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1151        make_sockaddr(&src_addr, 0, &addr_len);
1152        result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1153                                 addr_len);
1154        if (result < 0) {
1155                log_print("could not bind for connect: %d", result);
1156                /* This *may* not indicate a critical error */
1157        }
1158
1159        make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
1160
1161        log_print("connecting to %d", con->nodeid);
1162
1163        /* Turn off Nagle's algorithm */
1164        kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1165                          sizeof(one));
1166
1167        result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
1168                                   O_NONBLOCK);
1169        if (result == -EINPROGRESS)
1170                result = 0;
1171        if (result == 0)
1172                goto out;
1173
1174out_err:
1175        if (con->sock) {
1176                sock_release(con->sock);
1177                con->sock = NULL;
1178        } else if (sock) {
1179                sock_release(sock);
1180        }
1181        /*
1182         * Some errors are fatal and this list might need adjusting. For other
1183         * errors we try again until the max number of retries is reached.
1184         */
1185        if (result != -EHOSTUNREACH &&
1186            result != -ENETUNREACH &&
1187            result != -ENETDOWN && 
1188            result != -EINVAL &&
1189            result != -EPROTONOSUPPORT) {
1190                log_print("connect %d try %d error %d", con->nodeid,
1191                          con->retries, result);
1192                mutex_unlock(&con->sock_mutex);
1193                msleep(1000);
1194                lowcomms_connect_sock(con);
1195                return;
1196        }
1197out:
1198        mutex_unlock(&con->sock_mutex);
1199        set_bit(CF_WRITE_PENDING, &con->flags);
1200        return;
1201}
1202
1203static struct socket *tcp_create_listen_sock(struct connection *con,
1204                                             struct sockaddr_storage *saddr)
1205{
1206        struct socket *sock = NULL;
1207        int result = 0;
1208        int one = 1;
1209        int addr_len;
1210
1211        if (dlm_local_addr[0]->ss_family == AF_INET)
1212                addr_len = sizeof(struct sockaddr_in);
1213        else
1214                addr_len = sizeof(struct sockaddr_in6);
1215
1216        /* Create a socket to communicate with */
1217        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1218                                  IPPROTO_TCP, &sock);
1219        if (result < 0) {
1220                log_print("Can't create listening comms socket");
1221                goto create_out;
1222        }
1223
1224        /* Turn off Nagle's algorithm */
1225        kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY, (char *)&one,
1226                          sizeof(one));
1227
1228        result = kernel_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
1229                                   (char *)&one, sizeof(one));
1230
1231        if (result < 0) {
1232                log_print("Failed to set SO_REUSEADDR on socket: %d", result);
1233        }
1234        sock->sk->sk_user_data = con;
1235        save_listen_callbacks(sock);
1236        con->rx_action = tcp_accept_from_sock;
1237        con->connect_action = tcp_connect_to_sock;
1238
1239        /* Bind to our port */
1240        make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1241        result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1242        if (result < 0) {
1243                log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1244                sock_release(sock);
1245                sock = NULL;
1246                con->sock = NULL;
1247                goto create_out;
1248        }
1249        result = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
1250                                 (char *)&one, sizeof(one));
1251        if (result < 0) {
1252                log_print("Set keepalive failed: %d", result);
1253        }
1254
1255        result = sock->ops->listen(sock, 5);
1256        if (result < 0) {
1257                log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1258                sock_release(sock);
1259                sock = NULL;
1260                goto create_out;
1261        }
1262
1263create_out:
1264        return sock;
1265}
1266
1267/* Get local addresses */
1268static void init_local(void)
1269{
1270        struct sockaddr_storage sas, *addr;
1271        int i;
1272
1273        dlm_local_count = 0;
1274        for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1275                if (dlm_our_addr(&sas, i))
1276                        break;
1277
1278                addr = kmalloc(sizeof(*addr), GFP_NOFS);
1279                if (!addr)
1280                        break;
1281                memcpy(addr, &sas, sizeof(*addr));
1282                dlm_local_addr[dlm_local_count++] = addr;
1283        }
1284}
1285
1286/* Initialise SCTP socket and bind to all interfaces */
1287static int sctp_listen_for_all(void)
1288{
1289        struct socket *sock = NULL;
1290        int result = -EINVAL;
1291        struct connection *con = nodeid2con(0, GFP_NOFS);
1292        int bufsize = NEEDED_RMEM;
1293        int one = 1;
1294
1295        if (!con)
1296                return -ENOMEM;
1297
1298        log_print("Using SCTP for communications");
1299
1300        result = sock_create_kern(dlm_local_addr[0]->ss_family, SOCK_STREAM,
1301                                  IPPROTO_SCTP, &sock);
1302        if (result < 0) {
1303                log_print("Can't create comms socket, check SCTP is loaded");
1304                goto out;
1305        }
1306
1307        result = kernel_setsockopt(sock, SOL_SOCKET, SO_RCVBUFFORCE,
1308                                 (char *)&bufsize, sizeof(bufsize));
1309        if (result)
1310                log_print("Error increasing buffer space on socket %d", result);
1311
1312        result = kernel_setsockopt(sock, SOL_SCTP, SCTP_NODELAY, (char *)&one,
1313                                   sizeof(one));
1314        if (result < 0)
1315                log_print("Could not set SCTP NODELAY error %d\n", result);
1316
1317        write_lock_bh(&sock->sk->sk_callback_lock);
1318        /* Init con struct */
1319        sock->sk->sk_user_data = con;
1320        save_listen_callbacks(sock);
1321        con->sock = sock;
1322        con->sock->sk->sk_data_ready = lowcomms_data_ready;
1323        con->rx_action = sctp_accept_from_sock;
1324        con->connect_action = sctp_connect_to_sock;
1325        write_unlock_bh(&sock->sk->sk_callback_lock);
1326
1327        /* Bind to all addresses. */
1328        if (sctp_bind_addrs(con, dlm_config.ci_tcp_port))
1329                goto create_delsock;
1330
1331        result = sock->ops->listen(sock, 5);
1332        if (result < 0) {
1333                log_print("Can't set socket listening");
1334                goto create_delsock;
1335        }
1336
1337        return 0;
1338
1339create_delsock:
1340        sock_release(sock);
1341        con->sock = NULL;
1342out:
1343        return result;
1344}
1345
1346static int tcp_listen_for_all(void)
1347{
1348        struct socket *sock = NULL;
1349        struct connection *con = nodeid2con(0, GFP_NOFS);
1350        int result = -EINVAL;
1351
1352        if (!con)
1353                return -ENOMEM;
1354
1355        /* We don't support multi-homed hosts */
1356        if (dlm_local_addr[1] != NULL) {
1357                log_print("TCP protocol can't handle multi-homed hosts, "
1358                          "try SCTP");
1359                return -EINVAL;
1360        }
1361
1362        log_print("Using TCP for communications");
1363
1364        sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1365        if (sock) {
1366                add_sock(sock, con);
1367                result = 0;
1368        }
1369        else {
1370                result = -EADDRINUSE;
1371        }
1372
1373        return result;
1374}
1375
1376
1377
1378static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1379                                                     gfp_t allocation)
1380{
1381        struct writequeue_entry *entry;
1382
1383        entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1384        if (!entry)
1385                return NULL;
1386
1387        entry->page = alloc_page(allocation);
1388        if (!entry->page) {
1389                kfree(entry);
1390                return NULL;
1391        }
1392
1393        entry->offset = 0;
1394        entry->len = 0;
1395        entry->end = 0;
1396        entry->users = 0;
1397        entry->con = con;
1398
1399        return entry;
1400}
1401
1402void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1403{
1404        struct connection *con;
1405        struct writequeue_entry *e;
1406        int offset = 0;
1407
1408        con = nodeid2con(nodeid, allocation);
1409        if (!con)
1410                return NULL;
1411
1412        spin_lock(&con->writequeue_lock);
1413        e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1414        if ((&e->list == &con->writequeue) ||
1415            (PAGE_CACHE_SIZE - e->end < len)) {
1416                e = NULL;
1417        } else {
1418                offset = e->end;
1419                e->end += len;
1420                e->users++;
1421        }
1422        spin_unlock(&con->writequeue_lock);
1423
1424        if (e) {
1425        got_one:
1426                *ppc = page_address(e->page) + offset;
1427                return e;
1428        }
1429
1430        e = new_writequeue_entry(con, allocation);
1431        if (e) {
1432                spin_lock(&con->writequeue_lock);
1433                offset = e->end;
1434                e->end += len;
1435                e->users++;
1436                list_add_tail(&e->list, &con->writequeue);
1437                spin_unlock(&con->writequeue_lock);
1438                goto got_one;
1439        }
1440        return NULL;
1441}
1442
1443void dlm_lowcomms_commit_buffer(void *mh)
1444{
1445        struct writequeue_entry *e = (struct writequeue_entry *)mh;
1446        struct connection *con = e->con;
1447        int users;
1448
1449        spin_lock(&con->writequeue_lock);
1450        users = --e->users;
1451        if (users)
1452                goto out;
1453        e->len = e->end - e->offset;
1454        spin_unlock(&con->writequeue_lock);
1455
1456        if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
1457                queue_work(send_workqueue, &con->swork);
1458        }
1459        return;
1460
1461out:
1462        spin_unlock(&con->writequeue_lock);
1463        return;
1464}
1465
1466/* Send a message */
1467static void send_to_sock(struct connection *con)
1468{
1469        int ret = 0;
1470        const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1471        struct writequeue_entry *e;
1472        int len, offset;
1473        int count = 0;
1474
1475        mutex_lock(&con->sock_mutex);
1476        if (con->sock == NULL)
1477                goto out_connect;
1478
1479        spin_lock(&con->writequeue_lock);
1480        for (;;) {
1481                e = list_entry(con->writequeue.next, struct writequeue_entry,
1482                               list);
1483                if ((struct list_head *) e == &con->writequeue)
1484                        break;
1485
1486                len = e->len;
1487                offset = e->offset;
1488                BUG_ON(len == 0 && e->users == 0);
1489                spin_unlock(&con->writequeue_lock);
1490
1491                ret = 0;
1492                if (len) {
1493                        ret = kernel_sendpage(con->sock, e->page, offset, len,
1494                                              msg_flags);
1495                        if (ret == -EAGAIN || ret == 0) {
1496                                if (ret == -EAGAIN &&
1497                                    test_bit(SOCK_ASYNC_NOSPACE, &con->sock->flags) &&
1498                                    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1499                                        /* Notify TCP that we're limited by the
1500                                         * application window size.
1501                                         */
1502                                        set_bit(SOCK_NOSPACE, &con->sock->flags);
1503                                        con->sock->sk->sk_write_pending++;
1504                                }
1505                                cond_resched();
1506                                goto out;
1507                        } else if (ret < 0)
1508                                goto send_error;
1509                }
1510
1511                /* Don't starve people filling buffers */
1512                if (++count >= MAX_SEND_MSG_COUNT) {
1513                        cond_resched();
1514                        count = 0;
1515                }
1516
1517                spin_lock(&con->writequeue_lock);
1518                writequeue_entry_complete(e, ret);
1519        }
1520        spin_unlock(&con->writequeue_lock);
1521out:
1522        mutex_unlock(&con->sock_mutex);
1523        return;
1524
1525send_error:
1526        mutex_unlock(&con->sock_mutex);
1527        close_connection(con, false, false, true);
1528        lowcomms_connect_sock(con);
1529        return;
1530
1531out_connect:
1532        mutex_unlock(&con->sock_mutex);
1533        lowcomms_connect_sock(con);
1534}
1535
1536static void clean_one_writequeue(struct connection *con)
1537{
1538        struct writequeue_entry *e, *safe;
1539
1540        spin_lock(&con->writequeue_lock);
1541        list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1542                list_del(&e->list);
1543                free_entry(e);
1544        }
1545        spin_unlock(&con->writequeue_lock);
1546}
1547
1548/* Called from recovery when it knows that a node has
1549   left the cluster */
1550int dlm_lowcomms_close(int nodeid)
1551{
1552        struct connection *con;
1553        struct dlm_node_addr *na;
1554
1555        log_print("closing connection to node %d", nodeid);
1556        con = nodeid2con(nodeid, 0);
1557        if (con) {
1558                set_bit(CF_CLOSE, &con->flags);
1559                close_connection(con, true, true, true);
1560                clean_one_writequeue(con);
1561        }
1562
1563        spin_lock(&dlm_node_addrs_spin);
1564        na = find_node_addr(nodeid);
1565        if (na) {
1566                list_del(&na->list);
1567                while (na->addr_count--)
1568                        kfree(na->addr[na->addr_count]);
1569                kfree(na);
1570        }
1571        spin_unlock(&dlm_node_addrs_spin);
1572
1573        return 0;
1574}
1575
1576/* Receive workqueue function */
1577static void process_recv_sockets(struct work_struct *work)
1578{
1579        struct connection *con = container_of(work, struct connection, rwork);
1580        int err;
1581
1582        clear_bit(CF_READ_PENDING, &con->flags);
1583        do {
1584                err = con->rx_action(con);
1585        } while (!err);
1586}
1587
1588/* Send workqueue function */
1589static void process_send_sockets(struct work_struct *work)
1590{
1591        struct connection *con = container_of(work, struct connection, swork);
1592
1593        if (con->sock == NULL) /* not mutex protected so check it inside too */
1594                con->connect_action(con);
1595        if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags))
1596                send_to_sock(con);
1597}
1598
1599
1600/* Discard all entries on the write queues */
1601static void clean_writequeues(void)
1602{
1603        foreach_conn(clean_one_writequeue);
1604}
1605
1606static void work_stop(void)
1607{
1608        destroy_workqueue(recv_workqueue);
1609        destroy_workqueue(send_workqueue);
1610}
1611
1612static int work_start(void)
1613{
1614        recv_workqueue = alloc_workqueue("dlm_recv",
1615                                         WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1616        if (!recv_workqueue) {
1617                log_print("can't start dlm_recv");
1618                return -ENOMEM;
1619        }
1620
1621        send_workqueue = alloc_workqueue("dlm_send",
1622                                         WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1623        if (!send_workqueue) {
1624                log_print("can't start dlm_send");
1625                destroy_workqueue(recv_workqueue);
1626                return -ENOMEM;
1627        }
1628
1629        return 0;
1630}
1631
1632static void stop_conn(struct connection *con)
1633{
1634        con->flags |= 0x0F;
1635        if (con->sock && con->sock->sk)
1636                con->sock->sk->sk_user_data = NULL;
1637}
1638
1639static void free_conn(struct connection *con)
1640{
1641        close_connection(con, true, true, true);
1642        if (con->othercon)
1643                kmem_cache_free(con_cache, con->othercon);
1644        hlist_del(&con->list);
1645        kmem_cache_free(con_cache, con);
1646}
1647
1648void dlm_lowcomms_stop(void)
1649{
1650        /* Set all the flags to prevent any
1651           socket activity.
1652        */
1653        mutex_lock(&connections_lock);
1654        dlm_allow_conn = 0;
1655        foreach_conn(stop_conn);
1656        clean_writequeues();
1657        foreach_conn(free_conn);
1658        mutex_unlock(&connections_lock);
1659
1660        work_stop();
1661
1662        kmem_cache_destroy(con_cache);
1663}
1664
1665int dlm_lowcomms_start(void)
1666{
1667        int error = -EINVAL;
1668        struct connection *con;
1669        int i;
1670
1671        for (i = 0; i < CONN_HASH_SIZE; i++)
1672                INIT_HLIST_HEAD(&connection_hash[i]);
1673
1674        init_local();
1675        if (!dlm_local_count) {
1676                error = -ENOTCONN;
1677                log_print("no local IP address has been set");
1678                goto fail;
1679        }
1680
1681        error = -ENOMEM;
1682        con_cache = kmem_cache_create("dlm_conn", sizeof(struct connection),
1683                                      __alignof__(struct connection), 0,
1684                                      NULL);
1685        if (!con_cache)
1686                goto fail;
1687
1688        error = work_start();
1689        if (error)
1690                goto fail_destroy;
1691
1692        dlm_allow_conn = 1;
1693
1694        /* Start listening */
1695        if (dlm_config.ci_protocol == 0)
1696                error = tcp_listen_for_all();
1697        else
1698                error = sctp_listen_for_all();
1699        if (error)
1700                goto fail_unlisten;
1701
1702        return 0;
1703
1704fail_unlisten:
1705        dlm_allow_conn = 0;
1706        con = nodeid2con(0,0);
1707        if (con) {
1708                close_connection(con, false, true, true);
1709                kmem_cache_free(con_cache, con);
1710        }
1711fail_destroy:
1712        kmem_cache_destroy(con_cache);
1713fail:
1714        return error;
1715}
1716
1717void dlm_lowcomms_exit(void)
1718{
1719        struct dlm_node_addr *na, *safe;
1720
1721        spin_lock(&dlm_node_addrs_spin);
1722        list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1723                list_del(&na->list);
1724                while (na->addr_count--)
1725                        kfree(na->addr[na->addr_count]);
1726                kfree(na);
1727        }
1728        spin_unlock(&dlm_node_addrs_spin);
1729}
1730