linux/fs/dlm/lowcomms.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-only
   2/******************************************************************************
   3*******************************************************************************
   4**
   5**  Copyright (C) Sistina Software, Inc.  1997-2003  All rights reserved.
   6**  Copyright (C) 2004-2009 Red Hat, Inc.  All rights reserved.
   7**
   8**
   9*******************************************************************************
  10******************************************************************************/
  11
  12/*
  13 * lowcomms.c
  14 *
  15 * This is the "low-level" comms layer.
  16 *
  17 * It is responsible for sending/receiving messages
  18 * from other nodes in the cluster.
  19 *
  20 * Cluster nodes are referred to by their nodeids. nodeids are
  21 * simply 32 bit numbers to the locking module - if they need to
  22 * be expanded for the cluster infrastructure then that is its
  23 * responsibility. It is this layer's
  24 * responsibility to resolve these into IP address or
  25 * whatever it needs for inter-node communication.
  26 *
  27 * The comms level is two kernel threads that deal mainly with
  28 * the receiving of messages from other nodes and passing them
  29 * up to the mid-level comms layer (which understands the
  30 * message format) for execution by the locking core, and
  31 * a send thread which does all the setting up of connections
  32 * to remote nodes and the sending of data. Threads are not allowed
  33 * to send their own data because it may cause them to wait in times
  34 * of high load. Also, this way, the sending thread can collect together
  35 * messages bound for one node and send them in one block.
  36 *
  37 * lowcomms will choose to use either TCP or SCTP as its transport layer
  38 * depending on the configuration variable 'protocol'. This should be set
  39 * to 0 (default) for TCP or 1 for SCTP. It should be configured using a
  40 * cluster-wide mechanism as it must be the same on all nodes of the cluster
  41 * for the DLM to function.
  42 *
  43 */
  44
  45#include <asm/ioctls.h>
  46#include <net/sock.h>
  47#include <net/tcp.h>
  48#include <linux/pagemap.h>
  49#include <linux/file.h>
  50#include <linux/mutex.h>
  51#include <linux/sctp.h>
  52#include <linux/slab.h>
  53#include <net/sctp/sctp.h>
  54#include <net/ipv6.h>
  55
  56#include "dlm_internal.h"
  57#include "lowcomms.h"
  58#include "midcomms.h"
  59#include "config.h"
  60
  61#define NEEDED_RMEM (4*1024*1024)
  62#define CONN_HASH_SIZE 32
  63
  64/* Number of messages to send before rescheduling */
  65#define MAX_SEND_MSG_COUNT 25
  66#define DLM_SHUTDOWN_WAIT_TIMEOUT msecs_to_jiffies(10000)
  67
  68struct connection {
  69        struct socket *sock;    /* NULL if not connected */
  70        uint32_t nodeid;        /* So we know who we are in the list */
  71        struct mutex sock_mutex;
  72        unsigned long flags;
  73#define CF_READ_PENDING 1
  74#define CF_WRITE_PENDING 2
  75#define CF_INIT_PENDING 4
  76#define CF_IS_OTHERCON 5
  77#define CF_CLOSE 6
  78#define CF_APP_LIMITED 7
  79#define CF_CLOSING 8
  80#define CF_SHUTDOWN 9
  81        struct list_head writequeue;  /* List of outgoing writequeue_entries */
  82        spinlock_t writequeue_lock;
  83        int (*rx_action) (struct connection *); /* What to do when active */
  84        void (*connect_action) (struct connection *);   /* What to do to connect */
  85        void (*shutdown_action)(struct connection *con); /* What to do to shutdown */
  86        int retries;
  87#define MAX_CONNECT_RETRIES 3
  88        struct hlist_node list;
  89        struct connection *othercon;
  90        struct work_struct rwork; /* Receive workqueue */
  91        struct work_struct swork; /* Send workqueue */
  92        wait_queue_head_t shutdown_wait; /* wait for graceful shutdown */
  93        unsigned char *rx_buf;
  94        int rx_buflen;
  95        int rx_leftover;
  96        struct rcu_head rcu;
  97};
  98#define sock2con(x) ((struct connection *)(x)->sk_user_data)
  99
 100/* An entry waiting to be sent */
 101struct writequeue_entry {
 102        struct list_head list;
 103        struct page *page;
 104        int offset;
 105        int len;
 106        int end;
 107        int users;
 108        struct connection *con;
 109};
 110
 111struct dlm_node_addr {
 112        struct list_head list;
 113        int nodeid;
 114        int addr_count;
 115        int curr_addr_index;
 116        struct sockaddr_storage *addr[DLM_MAX_ADDR_COUNT];
 117};
 118
 119static struct listen_sock_callbacks {
 120        void (*sk_error_report)(struct sock *);
 121        void (*sk_data_ready)(struct sock *);
 122        void (*sk_state_change)(struct sock *);
 123        void (*sk_write_space)(struct sock *);
 124} listen_sock;
 125
 126static LIST_HEAD(dlm_node_addrs);
 127static DEFINE_SPINLOCK(dlm_node_addrs_spin);
 128
 129static struct sockaddr_storage *dlm_local_addr[DLM_MAX_ADDR_COUNT];
 130static int dlm_local_count;
 131static int dlm_allow_conn;
 132
 133/* Work queues */
 134static struct workqueue_struct *recv_workqueue;
 135static struct workqueue_struct *send_workqueue;
 136
 137static struct hlist_head connection_hash[CONN_HASH_SIZE];
 138static DEFINE_SPINLOCK(connections_lock);
 139DEFINE_STATIC_SRCU(connections_srcu);
 140
 141static void process_recv_sockets(struct work_struct *work);
 142static void process_send_sockets(struct work_struct *work);
 143
 144
 145/* This is deliberately very simple because most clusters have simple
 146   sequential nodeids, so we should be able to go straight to a connection
 147   struct in the array */
 148static inline int nodeid_hash(int nodeid)
 149{
 150        return nodeid & (CONN_HASH_SIZE-1);
 151}
 152
 153static struct connection *__find_con(int nodeid)
 154{
 155        int r, idx;
 156        struct connection *con;
 157
 158        r = nodeid_hash(nodeid);
 159
 160        idx = srcu_read_lock(&connections_srcu);
 161        hlist_for_each_entry_rcu(con, &connection_hash[r], list) {
 162                if (con->nodeid == nodeid) {
 163                        srcu_read_unlock(&connections_srcu, idx);
 164                        return con;
 165                }
 166        }
 167        srcu_read_unlock(&connections_srcu, idx);
 168
 169        return NULL;
 170}
 171
 172/*
 173 * If 'allocation' is zero then we don't attempt to create a new
 174 * connection structure for this node.
 175 */
 176static struct connection *nodeid2con(int nodeid, gfp_t alloc)
 177{
 178        struct connection *con, *tmp;
 179        int r;
 180
 181        con = __find_con(nodeid);
 182        if (con || !alloc)
 183                return con;
 184
 185        con = kzalloc(sizeof(*con), alloc);
 186        if (!con)
 187                return NULL;
 188
 189        con->rx_buflen = dlm_config.ci_buffer_size;
 190        con->rx_buf = kmalloc(con->rx_buflen, GFP_NOFS);
 191        if (!con->rx_buf) {
 192                kfree(con);
 193                return NULL;
 194        }
 195
 196        con->nodeid = nodeid;
 197        mutex_init(&con->sock_mutex);
 198        INIT_LIST_HEAD(&con->writequeue);
 199        spin_lock_init(&con->writequeue_lock);
 200        INIT_WORK(&con->swork, process_send_sockets);
 201        INIT_WORK(&con->rwork, process_recv_sockets);
 202        init_waitqueue_head(&con->shutdown_wait);
 203
 204        /* Setup action pointers for child sockets */
 205        if (con->nodeid) {
 206                struct connection *zerocon = __find_con(0);
 207
 208                con->connect_action = zerocon->connect_action;
 209                if (!con->rx_action)
 210                        con->rx_action = zerocon->rx_action;
 211        }
 212
 213        r = nodeid_hash(nodeid);
 214
 215        spin_lock(&connections_lock);
 216        /* Because multiple workqueues/threads calls this function it can
 217         * race on multiple cpu's. Instead of locking hot path __find_con()
 218         * we just check in rare cases of recently added nodes again
 219         * under protection of connections_lock. If this is the case we
 220         * abort our connection creation and return the existing connection.
 221         */
 222        tmp = __find_con(nodeid);
 223        if (tmp) {
 224                spin_unlock(&connections_lock);
 225                kfree(con->rx_buf);
 226                kfree(con);
 227                return tmp;
 228        }
 229
 230        hlist_add_head_rcu(&con->list, &connection_hash[r]);
 231        spin_unlock(&connections_lock);
 232
 233        return con;
 234}
 235
 236/* Loop round all connections */
 237static void foreach_conn(void (*conn_func)(struct connection *c))
 238{
 239        int i, idx;
 240        struct connection *con;
 241
 242        idx = srcu_read_lock(&connections_srcu);
 243        for (i = 0; i < CONN_HASH_SIZE; i++) {
 244                hlist_for_each_entry_rcu(con, &connection_hash[i], list)
 245                        conn_func(con);
 246        }
 247        srcu_read_unlock(&connections_srcu, idx);
 248}
 249
 250static struct dlm_node_addr *find_node_addr(int nodeid)
 251{
 252        struct dlm_node_addr *na;
 253
 254        list_for_each_entry(na, &dlm_node_addrs, list) {
 255                if (na->nodeid == nodeid)
 256                        return na;
 257        }
 258        return NULL;
 259}
 260
 261static int addr_compare(struct sockaddr_storage *x, struct sockaddr_storage *y)
 262{
 263        switch (x->ss_family) {
 264        case AF_INET: {
 265                struct sockaddr_in *sinx = (struct sockaddr_in *)x;
 266                struct sockaddr_in *siny = (struct sockaddr_in *)y;
 267                if (sinx->sin_addr.s_addr != siny->sin_addr.s_addr)
 268                        return 0;
 269                if (sinx->sin_port != siny->sin_port)
 270                        return 0;
 271                break;
 272        }
 273        case AF_INET6: {
 274                struct sockaddr_in6 *sinx = (struct sockaddr_in6 *)x;
 275                struct sockaddr_in6 *siny = (struct sockaddr_in6 *)y;
 276                if (!ipv6_addr_equal(&sinx->sin6_addr, &siny->sin6_addr))
 277                        return 0;
 278                if (sinx->sin6_port != siny->sin6_port)
 279                        return 0;
 280                break;
 281        }
 282        default:
 283                return 0;
 284        }
 285        return 1;
 286}
 287
 288static int nodeid_to_addr(int nodeid, struct sockaddr_storage *sas_out,
 289                          struct sockaddr *sa_out, bool try_new_addr)
 290{
 291        struct sockaddr_storage sas;
 292        struct dlm_node_addr *na;
 293
 294        if (!dlm_local_count)
 295                return -1;
 296
 297        spin_lock(&dlm_node_addrs_spin);
 298        na = find_node_addr(nodeid);
 299        if (na && na->addr_count) {
 300                memcpy(&sas, na->addr[na->curr_addr_index],
 301                       sizeof(struct sockaddr_storage));
 302
 303                if (try_new_addr) {
 304                        na->curr_addr_index++;
 305                        if (na->curr_addr_index == na->addr_count)
 306                                na->curr_addr_index = 0;
 307                }
 308        }
 309        spin_unlock(&dlm_node_addrs_spin);
 310
 311        if (!na)
 312                return -EEXIST;
 313
 314        if (!na->addr_count)
 315                return -ENOENT;
 316
 317        if (sas_out)
 318                memcpy(sas_out, &sas, sizeof(struct sockaddr_storage));
 319
 320        if (!sa_out)
 321                return 0;
 322
 323        if (dlm_local_addr[0]->ss_family == AF_INET) {
 324                struct sockaddr_in *in4  = (struct sockaddr_in *) &sas;
 325                struct sockaddr_in *ret4 = (struct sockaddr_in *) sa_out;
 326                ret4->sin_addr.s_addr = in4->sin_addr.s_addr;
 327        } else {
 328                struct sockaddr_in6 *in6  = (struct sockaddr_in6 *) &sas;
 329                struct sockaddr_in6 *ret6 = (struct sockaddr_in6 *) sa_out;
 330                ret6->sin6_addr = in6->sin6_addr;
 331        }
 332
 333        return 0;
 334}
 335
 336static int addr_to_nodeid(struct sockaddr_storage *addr, int *nodeid)
 337{
 338        struct dlm_node_addr *na;
 339        int rv = -EEXIST;
 340        int addr_i;
 341
 342        spin_lock(&dlm_node_addrs_spin);
 343        list_for_each_entry(na, &dlm_node_addrs, list) {
 344                if (!na->addr_count)
 345                        continue;
 346
 347                for (addr_i = 0; addr_i < na->addr_count; addr_i++) {
 348                        if (addr_compare(na->addr[addr_i], addr)) {
 349                                *nodeid = na->nodeid;
 350                                rv = 0;
 351                                goto unlock;
 352                        }
 353                }
 354        }
 355unlock:
 356        spin_unlock(&dlm_node_addrs_spin);
 357        return rv;
 358}
 359
 360int dlm_lowcomms_addr(int nodeid, struct sockaddr_storage *addr, int len)
 361{
 362        struct sockaddr_storage *new_addr;
 363        struct dlm_node_addr *new_node, *na;
 364
 365        new_node = kzalloc(sizeof(struct dlm_node_addr), GFP_NOFS);
 366        if (!new_node)
 367                return -ENOMEM;
 368
 369        new_addr = kzalloc(sizeof(struct sockaddr_storage), GFP_NOFS);
 370        if (!new_addr) {
 371                kfree(new_node);
 372                return -ENOMEM;
 373        }
 374
 375        memcpy(new_addr, addr, len);
 376
 377        spin_lock(&dlm_node_addrs_spin);
 378        na = find_node_addr(nodeid);
 379        if (!na) {
 380                new_node->nodeid = nodeid;
 381                new_node->addr[0] = new_addr;
 382                new_node->addr_count = 1;
 383                list_add(&new_node->list, &dlm_node_addrs);
 384                spin_unlock(&dlm_node_addrs_spin);
 385                return 0;
 386        }
 387
 388        if (na->addr_count >= DLM_MAX_ADDR_COUNT) {
 389                spin_unlock(&dlm_node_addrs_spin);
 390                kfree(new_addr);
 391                kfree(new_node);
 392                return -ENOSPC;
 393        }
 394
 395        na->addr[na->addr_count++] = new_addr;
 396        spin_unlock(&dlm_node_addrs_spin);
 397        kfree(new_node);
 398        return 0;
 399}
 400
 401/* Data available on socket or listen socket received a connect */
 402static void lowcomms_data_ready(struct sock *sk)
 403{
 404        struct connection *con;
 405
 406        read_lock_bh(&sk->sk_callback_lock);
 407        con = sock2con(sk);
 408        if (con && !test_and_set_bit(CF_READ_PENDING, &con->flags))
 409                queue_work(recv_workqueue, &con->rwork);
 410        read_unlock_bh(&sk->sk_callback_lock);
 411}
 412
 413static void lowcomms_write_space(struct sock *sk)
 414{
 415        struct connection *con;
 416
 417        read_lock_bh(&sk->sk_callback_lock);
 418        con = sock2con(sk);
 419        if (!con)
 420                goto out;
 421
 422        clear_bit(SOCK_NOSPACE, &con->sock->flags);
 423
 424        if (test_and_clear_bit(CF_APP_LIMITED, &con->flags)) {
 425                con->sock->sk->sk_write_pending--;
 426                clear_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags);
 427        }
 428
 429        queue_work(send_workqueue, &con->swork);
 430out:
 431        read_unlock_bh(&sk->sk_callback_lock);
 432}
 433
 434static inline void lowcomms_connect_sock(struct connection *con)
 435{
 436        if (test_bit(CF_CLOSE, &con->flags))
 437                return;
 438        queue_work(send_workqueue, &con->swork);
 439        cond_resched();
 440}
 441
 442static void lowcomms_state_change(struct sock *sk)
 443{
 444        /* SCTP layer is not calling sk_data_ready when the connection
 445         * is done, so we catch the signal through here. Also, it
 446         * doesn't switch socket state when entering shutdown, so we
 447         * skip the write in that case.
 448         */
 449        if (sk->sk_shutdown) {
 450                if (sk->sk_shutdown == RCV_SHUTDOWN)
 451                        lowcomms_data_ready(sk);
 452        } else if (sk->sk_state == TCP_ESTABLISHED) {
 453                lowcomms_write_space(sk);
 454        }
 455}
 456
 457int dlm_lowcomms_connect_node(int nodeid)
 458{
 459        struct connection *con;
 460
 461        if (nodeid == dlm_our_nodeid())
 462                return 0;
 463
 464        con = nodeid2con(nodeid, GFP_NOFS);
 465        if (!con)
 466                return -ENOMEM;
 467        lowcomms_connect_sock(con);
 468        return 0;
 469}
 470
 471static void lowcomms_error_report(struct sock *sk)
 472{
 473        struct connection *con;
 474        struct sockaddr_storage saddr;
 475        void (*orig_report)(struct sock *) = NULL;
 476
 477        read_lock_bh(&sk->sk_callback_lock);
 478        con = sock2con(sk);
 479        if (con == NULL)
 480                goto out;
 481
 482        orig_report = listen_sock.sk_error_report;
 483        if (con->sock == NULL ||
 484            kernel_getpeername(con->sock, (struct sockaddr *)&saddr) < 0) {
 485                printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
 486                                   "sending to node %d, port %d, "
 487                                   "sk_err=%d/%d\n", dlm_our_nodeid(),
 488                                   con->nodeid, dlm_config.ci_tcp_port,
 489                                   sk->sk_err, sk->sk_err_soft);
 490        } else if (saddr.ss_family == AF_INET) {
 491                struct sockaddr_in *sin4 = (struct sockaddr_in *)&saddr;
 492
 493                printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
 494                                   "sending to node %d at %pI4, port %d, "
 495                                   "sk_err=%d/%d\n", dlm_our_nodeid(),
 496                                   con->nodeid, &sin4->sin_addr.s_addr,
 497                                   dlm_config.ci_tcp_port, sk->sk_err,
 498                                   sk->sk_err_soft);
 499        } else {
 500                struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)&saddr;
 501
 502                printk_ratelimited(KERN_ERR "dlm: node %d: socket error "
 503                                   "sending to node %d at %u.%u.%u.%u, "
 504                                   "port %d, sk_err=%d/%d\n", dlm_our_nodeid(),
 505                                   con->nodeid, sin6->sin6_addr.s6_addr32[0],
 506                                   sin6->sin6_addr.s6_addr32[1],
 507                                   sin6->sin6_addr.s6_addr32[2],
 508                                   sin6->sin6_addr.s6_addr32[3],
 509                                   dlm_config.ci_tcp_port, sk->sk_err,
 510                                   sk->sk_err_soft);
 511        }
 512out:
 513        read_unlock_bh(&sk->sk_callback_lock);
 514        if (orig_report)
 515                orig_report(sk);
 516}
 517
 518/* Note: sk_callback_lock must be locked before calling this function. */
 519static void save_listen_callbacks(struct socket *sock)
 520{
 521        struct sock *sk = sock->sk;
 522
 523        listen_sock.sk_data_ready = sk->sk_data_ready;
 524        listen_sock.sk_state_change = sk->sk_state_change;
 525        listen_sock.sk_write_space = sk->sk_write_space;
 526        listen_sock.sk_error_report = sk->sk_error_report;
 527}
 528
 529static void restore_callbacks(struct socket *sock)
 530{
 531        struct sock *sk = sock->sk;
 532
 533        write_lock_bh(&sk->sk_callback_lock);
 534        sk->sk_user_data = NULL;
 535        sk->sk_data_ready = listen_sock.sk_data_ready;
 536        sk->sk_state_change = listen_sock.sk_state_change;
 537        sk->sk_write_space = listen_sock.sk_write_space;
 538        sk->sk_error_report = listen_sock.sk_error_report;
 539        write_unlock_bh(&sk->sk_callback_lock);
 540}
 541
 542/* Make a socket active */
 543static void add_sock(struct socket *sock, struct connection *con)
 544{
 545        struct sock *sk = sock->sk;
 546
 547        write_lock_bh(&sk->sk_callback_lock);
 548        con->sock = sock;
 549
 550        sk->sk_user_data = con;
 551        /* Install a data_ready callback */
 552        sk->sk_data_ready = lowcomms_data_ready;
 553        sk->sk_write_space = lowcomms_write_space;
 554        sk->sk_state_change = lowcomms_state_change;
 555        sk->sk_allocation = GFP_NOFS;
 556        sk->sk_error_report = lowcomms_error_report;
 557        write_unlock_bh(&sk->sk_callback_lock);
 558}
 559
 560/* Add the port number to an IPv6 or 4 sockaddr and return the address
 561   length */
 562static void make_sockaddr(struct sockaddr_storage *saddr, uint16_t port,
 563                          int *addr_len)
 564{
 565        saddr->ss_family =  dlm_local_addr[0]->ss_family;
 566        if (saddr->ss_family == AF_INET) {
 567                struct sockaddr_in *in4_addr = (struct sockaddr_in *)saddr;
 568                in4_addr->sin_port = cpu_to_be16(port);
 569                *addr_len = sizeof(struct sockaddr_in);
 570                memset(&in4_addr->sin_zero, 0, sizeof(in4_addr->sin_zero));
 571        } else {
 572                struct sockaddr_in6 *in6_addr = (struct sockaddr_in6 *)saddr;
 573                in6_addr->sin6_port = cpu_to_be16(port);
 574                *addr_len = sizeof(struct sockaddr_in6);
 575        }
 576        memset((char *)saddr + *addr_len, 0, sizeof(struct sockaddr_storage) - *addr_len);
 577}
 578
 579/* Close a remote connection and tidy up */
 580static void close_connection(struct connection *con, bool and_other,
 581                             bool tx, bool rx)
 582{
 583        bool closing = test_and_set_bit(CF_CLOSING, &con->flags);
 584
 585        if (tx && !closing && cancel_work_sync(&con->swork)) {
 586                log_print("canceled swork for node %d", con->nodeid);
 587                clear_bit(CF_WRITE_PENDING, &con->flags);
 588        }
 589        if (rx && !closing && cancel_work_sync(&con->rwork)) {
 590                log_print("canceled rwork for node %d", con->nodeid);
 591                clear_bit(CF_READ_PENDING, &con->flags);
 592        }
 593
 594        mutex_lock(&con->sock_mutex);
 595        if (con->sock) {
 596                restore_callbacks(con->sock);
 597                sock_release(con->sock);
 598                con->sock = NULL;
 599        }
 600        if (con->othercon && and_other) {
 601                /* Will only re-enter once. */
 602                close_connection(con->othercon, false, true, true);
 603        }
 604
 605        con->rx_leftover = 0;
 606        con->retries = 0;
 607        mutex_unlock(&con->sock_mutex);
 608        clear_bit(CF_CLOSING, &con->flags);
 609}
 610
 611static void shutdown_connection(struct connection *con)
 612{
 613        int ret;
 614
 615        if (cancel_work_sync(&con->swork)) {
 616                log_print("canceled swork for node %d", con->nodeid);
 617                clear_bit(CF_WRITE_PENDING, &con->flags);
 618        }
 619
 620        mutex_lock(&con->sock_mutex);
 621        /* nothing to shutdown */
 622        if (!con->sock) {
 623                mutex_unlock(&con->sock_mutex);
 624                return;
 625        }
 626
 627        set_bit(CF_SHUTDOWN, &con->flags);
 628        ret = kernel_sock_shutdown(con->sock, SHUT_WR);
 629        mutex_unlock(&con->sock_mutex);
 630        if (ret) {
 631                log_print("Connection %p failed to shutdown: %d will force close",
 632                          con, ret);
 633                goto force_close;
 634        } else {
 635                ret = wait_event_timeout(con->shutdown_wait,
 636                                         !test_bit(CF_SHUTDOWN, &con->flags),
 637                                         DLM_SHUTDOWN_WAIT_TIMEOUT);
 638                if (ret == 0) {
 639                        log_print("Connection %p shutdown timed out, will force close",
 640                                  con);
 641                        goto force_close;
 642                }
 643        }
 644
 645        return;
 646
 647force_close:
 648        clear_bit(CF_SHUTDOWN, &con->flags);
 649        close_connection(con, false, true, true);
 650}
 651
 652static void dlm_tcp_shutdown(struct connection *con)
 653{
 654        if (con->othercon)
 655                shutdown_connection(con->othercon);
 656        shutdown_connection(con);
 657}
 658
 659static int con_realloc_receive_buf(struct connection *con, int newlen)
 660{
 661        unsigned char *newbuf;
 662
 663        newbuf = kmalloc(newlen, GFP_NOFS);
 664        if (!newbuf)
 665                return -ENOMEM;
 666
 667        /* copy any leftover from last receive */
 668        if (con->rx_leftover)
 669                memmove(newbuf, con->rx_buf, con->rx_leftover);
 670
 671        /* swap to new buffer space */
 672        kfree(con->rx_buf);
 673        con->rx_buflen = newlen;
 674        con->rx_buf = newbuf;
 675
 676        return 0;
 677}
 678
 679/* Data received from remote end */
 680static int receive_from_sock(struct connection *con)
 681{
 682        int call_again_soon = 0;
 683        struct msghdr msg;
 684        struct kvec iov;
 685        int ret, buflen;
 686
 687        mutex_lock(&con->sock_mutex);
 688
 689        if (con->sock == NULL) {
 690                ret = -EAGAIN;
 691                goto out_close;
 692        }
 693
 694        if (con->nodeid == 0) {
 695                ret = -EINVAL;
 696                goto out_close;
 697        }
 698
 699        /* realloc if we get new buffer size to read out */
 700        buflen = dlm_config.ci_buffer_size;
 701        if (con->rx_buflen != buflen && con->rx_leftover <= buflen) {
 702                ret = con_realloc_receive_buf(con, buflen);
 703                if (ret < 0)
 704                        goto out_resched;
 705        }
 706
 707        /* calculate new buffer parameter regarding last receive and
 708         * possible leftover bytes
 709         */
 710        iov.iov_base = con->rx_buf + con->rx_leftover;
 711        iov.iov_len = con->rx_buflen - con->rx_leftover;
 712
 713        memset(&msg, 0, sizeof(msg));
 714        msg.msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
 715        ret = kernel_recvmsg(con->sock, &msg, &iov, 1, iov.iov_len,
 716                             msg.msg_flags);
 717        if (ret <= 0)
 718                goto out_close;
 719        else if (ret == iov.iov_len)
 720                call_again_soon = 1;
 721
 722        /* new buflen according readed bytes and leftover from last receive */
 723        buflen = ret + con->rx_leftover;
 724        ret = dlm_process_incoming_buffer(con->nodeid, con->rx_buf, buflen);
 725        if (ret < 0)
 726                goto out_close;
 727
 728        /* calculate leftover bytes from process and put it into begin of
 729         * the receive buffer, so next receive we have the full message
 730         * at the start address of the receive buffer.
 731         */
 732        con->rx_leftover = buflen - ret;
 733        if (con->rx_leftover) {
 734                memmove(con->rx_buf, con->rx_buf + ret,
 735                        con->rx_leftover);
 736                call_again_soon = true;
 737        }
 738
 739        if (call_again_soon)
 740                goto out_resched;
 741
 742        mutex_unlock(&con->sock_mutex);
 743        return 0;
 744
 745out_resched:
 746        if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
 747                queue_work(recv_workqueue, &con->rwork);
 748        mutex_unlock(&con->sock_mutex);
 749        return -EAGAIN;
 750
 751out_close:
 752        mutex_unlock(&con->sock_mutex);
 753        if (ret != -EAGAIN) {
 754                /* Reconnect when there is something to send */
 755                close_connection(con, false, true, false);
 756                if (ret == 0) {
 757                        log_print("connection %p got EOF from %d",
 758                                  con, con->nodeid);
 759                        /* handling for tcp shutdown */
 760                        clear_bit(CF_SHUTDOWN, &con->flags);
 761                        wake_up(&con->shutdown_wait);
 762                        /* signal to breaking receive worker */
 763                        ret = -1;
 764                }
 765        }
 766        return ret;
 767}
 768
 769/* Listening socket is busy, accept a connection */
 770static int accept_from_sock(struct connection *con)
 771{
 772        int result;
 773        struct sockaddr_storage peeraddr;
 774        struct socket *newsock;
 775        int len;
 776        int nodeid;
 777        struct connection *newcon;
 778        struct connection *addcon;
 779        unsigned int mark;
 780
 781        if (!dlm_allow_conn) {
 782                return -1;
 783        }
 784
 785        mutex_lock_nested(&con->sock_mutex, 0);
 786
 787        if (!con->sock) {
 788                mutex_unlock(&con->sock_mutex);
 789                return -ENOTCONN;
 790        }
 791
 792        result = kernel_accept(con->sock, &newsock, O_NONBLOCK);
 793        if (result < 0)
 794                goto accept_err;
 795
 796        /* Get the connected socket's peer */
 797        memset(&peeraddr, 0, sizeof(peeraddr));
 798        len = newsock->ops->getname(newsock, (struct sockaddr *)&peeraddr, 2);
 799        if (len < 0) {
 800                result = -ECONNABORTED;
 801                goto accept_err;
 802        }
 803
 804        /* Get the new node's NODEID */
 805        make_sockaddr(&peeraddr, 0, &len);
 806        if (addr_to_nodeid(&peeraddr, &nodeid)) {
 807                unsigned char *b=(unsigned char *)&peeraddr;
 808                log_print("connect from non cluster node");
 809                print_hex_dump_bytes("ss: ", DUMP_PREFIX_NONE, 
 810                                     b, sizeof(struct sockaddr_storage));
 811                sock_release(newsock);
 812                mutex_unlock(&con->sock_mutex);
 813                return -1;
 814        }
 815
 816        dlm_comm_mark(nodeid, &mark);
 817        sock_set_mark(newsock->sk, mark);
 818
 819        log_print("got connection from %d", nodeid);
 820
 821        /*  Check to see if we already have a connection to this node. This
 822         *  could happen if the two nodes initiate a connection at roughly
 823         *  the same time and the connections cross on the wire.
 824         *  In this case we store the incoming one in "othercon"
 825         */
 826        newcon = nodeid2con(nodeid, GFP_NOFS);
 827        if (!newcon) {
 828                result = -ENOMEM;
 829                goto accept_err;
 830        }
 831        mutex_lock_nested(&newcon->sock_mutex, 1);
 832        if (newcon->sock) {
 833                struct connection *othercon = newcon->othercon;
 834
 835                if (!othercon) {
 836                        othercon = kzalloc(sizeof(*othercon), GFP_NOFS);
 837                        if (!othercon) {
 838                                log_print("failed to allocate incoming socket");
 839                                mutex_unlock(&newcon->sock_mutex);
 840                                result = -ENOMEM;
 841                                goto accept_err;
 842                        }
 843
 844                        othercon->rx_buflen = dlm_config.ci_buffer_size;
 845                        othercon->rx_buf = kmalloc(othercon->rx_buflen, GFP_NOFS);
 846                        if (!othercon->rx_buf) {
 847                                mutex_unlock(&newcon->sock_mutex);
 848                                kfree(othercon);
 849                                log_print("failed to allocate incoming socket receive buffer");
 850                                result = -ENOMEM;
 851                                goto accept_err;
 852                        }
 853
 854                        othercon->nodeid = nodeid;
 855                        othercon->rx_action = receive_from_sock;
 856                        mutex_init(&othercon->sock_mutex);
 857                        INIT_LIST_HEAD(&othercon->writequeue);
 858                        spin_lock_init(&othercon->writequeue_lock);
 859                        INIT_WORK(&othercon->swork, process_send_sockets);
 860                        INIT_WORK(&othercon->rwork, process_recv_sockets);
 861                        init_waitqueue_head(&othercon->shutdown_wait);
 862                        set_bit(CF_IS_OTHERCON, &othercon->flags);
 863                } else {
 864                        /* close other sock con if we have something new */
 865                        close_connection(othercon, false, true, false);
 866                }
 867
 868                mutex_lock_nested(&othercon->sock_mutex, 2);
 869                newcon->othercon = othercon;
 870                add_sock(newsock, othercon);
 871                addcon = othercon;
 872                mutex_unlock(&othercon->sock_mutex);
 873        }
 874        else {
 875                newcon->rx_action = receive_from_sock;
 876                /* accept copies the sk after we've saved the callbacks, so we
 877                   don't want to save them a second time or comm errors will
 878                   result in calling sk_error_report recursively. */
 879                add_sock(newsock, newcon);
 880                addcon = newcon;
 881        }
 882
 883        mutex_unlock(&newcon->sock_mutex);
 884
 885        /*
 886         * Add it to the active queue in case we got data
 887         * between processing the accept adding the socket
 888         * to the read_sockets list
 889         */
 890        if (!test_and_set_bit(CF_READ_PENDING, &addcon->flags))
 891                queue_work(recv_workqueue, &addcon->rwork);
 892        mutex_unlock(&con->sock_mutex);
 893
 894        return 0;
 895
 896accept_err:
 897        mutex_unlock(&con->sock_mutex);
 898        if (newsock)
 899                sock_release(newsock);
 900
 901        if (result != -EAGAIN)
 902                log_print("error accepting connection from node: %d", result);
 903        return result;
 904}
 905
 906static void free_entry(struct writequeue_entry *e)
 907{
 908        __free_page(e->page);
 909        kfree(e);
 910}
 911
 912/*
 913 * writequeue_entry_complete - try to delete and free write queue entry
 914 * @e: write queue entry to try to delete
 915 * @completed: bytes completed
 916 *
 917 * writequeue_lock must be held.
 918 */
 919static void writequeue_entry_complete(struct writequeue_entry *e, int completed)
 920{
 921        e->offset += completed;
 922        e->len -= completed;
 923
 924        if (e->len == 0 && e->users == 0) {
 925                list_del(&e->list);
 926                free_entry(e);
 927        }
 928}
 929
 930/*
 931 * sctp_bind_addrs - bind a SCTP socket to all our addresses
 932 */
 933static int sctp_bind_addrs(struct connection *con, uint16_t port)
 934{
 935        struct sockaddr_storage localaddr;
 936        struct sockaddr *addr = (struct sockaddr *)&localaddr;
 937        int i, addr_len, result = 0;
 938
 939        for (i = 0; i < dlm_local_count; i++) {
 940                memcpy(&localaddr, dlm_local_addr[i], sizeof(localaddr));
 941                make_sockaddr(&localaddr, port, &addr_len);
 942
 943                if (!i)
 944                        result = kernel_bind(con->sock, addr, addr_len);
 945                else
 946                        result = sock_bind_add(con->sock->sk, addr, addr_len);
 947
 948                if (result < 0) {
 949                        log_print("Can't bind to %d addr number %d, %d.\n",
 950                                  port, i + 1, result);
 951                        break;
 952                }
 953        }
 954        return result;
 955}
 956
 957/* Initiate an SCTP association.
 958   This is a special case of send_to_sock() in that we don't yet have a
 959   peeled-off socket for this association, so we use the listening socket
 960   and add the primary IP address of the remote node.
 961 */
 962static void sctp_connect_to_sock(struct connection *con)
 963{
 964        struct sockaddr_storage daddr;
 965        int result;
 966        int addr_len;
 967        struct socket *sock;
 968        unsigned int mark;
 969
 970        if (con->nodeid == 0) {
 971                log_print("attempt to connect sock 0 foiled");
 972                return;
 973        }
 974
 975        dlm_comm_mark(con->nodeid, &mark);
 976
 977        mutex_lock(&con->sock_mutex);
 978
 979        /* Some odd races can cause double-connects, ignore them */
 980        if (con->retries++ > MAX_CONNECT_RETRIES)
 981                goto out;
 982
 983        if (con->sock) {
 984                log_print("node %d already connected.", con->nodeid);
 985                goto out;
 986        }
 987
 988        memset(&daddr, 0, sizeof(daddr));
 989        result = nodeid_to_addr(con->nodeid, &daddr, NULL, true);
 990        if (result < 0) {
 991                log_print("no address for nodeid %d", con->nodeid);
 992                goto out;
 993        }
 994
 995        /* Create a socket to communicate with */
 996        result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
 997                                  SOCK_STREAM, IPPROTO_SCTP, &sock);
 998        if (result < 0)
 999                goto socket_err;
1000
1001        sock_set_mark(sock->sk, mark);
1002
1003        con->rx_action = receive_from_sock;
1004        con->connect_action = sctp_connect_to_sock;
1005        add_sock(sock, con);
1006
1007        /* Bind to all addresses. */
1008        if (sctp_bind_addrs(con, 0))
1009                goto bind_err;
1010
1011        make_sockaddr(&daddr, dlm_config.ci_tcp_port, &addr_len);
1012
1013        log_print("connecting to %d", con->nodeid);
1014
1015        /* Turn off Nagle's algorithm */
1016        sctp_sock_set_nodelay(sock->sk);
1017
1018        /*
1019         * Make sock->ops->connect() function return in specified time,
1020         * since O_NONBLOCK argument in connect() function does not work here,
1021         * then, we should restore the default value of this attribute.
1022         */
1023        sock_set_sndtimeo(sock->sk, 5);
1024        result = sock->ops->connect(sock, (struct sockaddr *)&daddr, addr_len,
1025                                   0);
1026        sock_set_sndtimeo(sock->sk, 0);
1027
1028        if (result == -EINPROGRESS)
1029                result = 0;
1030        if (result == 0)
1031                goto out;
1032
1033bind_err:
1034        con->sock = NULL;
1035        sock_release(sock);
1036
1037socket_err:
1038        /*
1039         * Some errors are fatal and this list might need adjusting. For other
1040         * errors we try again until the max number of retries is reached.
1041         */
1042        if (result != -EHOSTUNREACH &&
1043            result != -ENETUNREACH &&
1044            result != -ENETDOWN &&
1045            result != -EINVAL &&
1046            result != -EPROTONOSUPPORT) {
1047                log_print("connect %d try %d error %d", con->nodeid,
1048                          con->retries, result);
1049                mutex_unlock(&con->sock_mutex);
1050                msleep(1000);
1051                lowcomms_connect_sock(con);
1052                return;
1053        }
1054
1055out:
1056        mutex_unlock(&con->sock_mutex);
1057}
1058
1059/* Connect a new socket to its peer */
1060static void tcp_connect_to_sock(struct connection *con)
1061{
1062        struct sockaddr_storage saddr, src_addr;
1063        int addr_len;
1064        struct socket *sock = NULL;
1065        unsigned int mark;
1066        int result;
1067
1068        if (con->nodeid == 0) {
1069                log_print("attempt to connect sock 0 foiled");
1070                return;
1071        }
1072
1073        dlm_comm_mark(con->nodeid, &mark);
1074
1075        mutex_lock(&con->sock_mutex);
1076        if (con->retries++ > MAX_CONNECT_RETRIES)
1077                goto out;
1078
1079        /* Some odd races can cause double-connects, ignore them */
1080        if (con->sock)
1081                goto out;
1082
1083        /* Create a socket to communicate with */
1084        result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1085                                  SOCK_STREAM, IPPROTO_TCP, &sock);
1086        if (result < 0)
1087                goto out_err;
1088
1089        sock_set_mark(sock->sk, mark);
1090
1091        memset(&saddr, 0, sizeof(saddr));
1092        result = nodeid_to_addr(con->nodeid, &saddr, NULL, false);
1093        if (result < 0) {
1094                log_print("no address for nodeid %d", con->nodeid);
1095                goto out_err;
1096        }
1097
1098        con->rx_action = receive_from_sock;
1099        con->connect_action = tcp_connect_to_sock;
1100        con->shutdown_action = dlm_tcp_shutdown;
1101        add_sock(sock, con);
1102
1103        /* Bind to our cluster-known address connecting to avoid
1104           routing problems */
1105        memcpy(&src_addr, dlm_local_addr[0], sizeof(src_addr));
1106        make_sockaddr(&src_addr, 0, &addr_len);
1107        result = sock->ops->bind(sock, (struct sockaddr *) &src_addr,
1108                                 addr_len);
1109        if (result < 0) {
1110                log_print("could not bind for connect: %d", result);
1111                /* This *may* not indicate a critical error */
1112        }
1113
1114        make_sockaddr(&saddr, dlm_config.ci_tcp_port, &addr_len);
1115
1116        log_print("connecting to %d", con->nodeid);
1117
1118        /* Turn off Nagle's algorithm */
1119        tcp_sock_set_nodelay(sock->sk);
1120
1121        result = sock->ops->connect(sock, (struct sockaddr *)&saddr, addr_len,
1122                                   O_NONBLOCK);
1123        if (result == -EINPROGRESS)
1124                result = 0;
1125        if (result == 0)
1126                goto out;
1127
1128out_err:
1129        if (con->sock) {
1130                sock_release(con->sock);
1131                con->sock = NULL;
1132        } else if (sock) {
1133                sock_release(sock);
1134        }
1135        /*
1136         * Some errors are fatal and this list might need adjusting. For other
1137         * errors we try again until the max number of retries is reached.
1138         */
1139        if (result != -EHOSTUNREACH &&
1140            result != -ENETUNREACH &&
1141            result != -ENETDOWN && 
1142            result != -EINVAL &&
1143            result != -EPROTONOSUPPORT) {
1144                log_print("connect %d try %d error %d", con->nodeid,
1145                          con->retries, result);
1146                mutex_unlock(&con->sock_mutex);
1147                msleep(1000);
1148                lowcomms_connect_sock(con);
1149                return;
1150        }
1151out:
1152        mutex_unlock(&con->sock_mutex);
1153        return;
1154}
1155
1156static struct socket *tcp_create_listen_sock(struct connection *con,
1157                                             struct sockaddr_storage *saddr)
1158{
1159        struct socket *sock = NULL;
1160        int result = 0;
1161        int addr_len;
1162
1163        if (dlm_local_addr[0]->ss_family == AF_INET)
1164                addr_len = sizeof(struct sockaddr_in);
1165        else
1166                addr_len = sizeof(struct sockaddr_in6);
1167
1168        /* Create a socket to communicate with */
1169        result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1170                                  SOCK_STREAM, IPPROTO_TCP, &sock);
1171        if (result < 0) {
1172                log_print("Can't create listening comms socket");
1173                goto create_out;
1174        }
1175
1176        sock_set_mark(sock->sk, dlm_config.ci_mark);
1177
1178        /* Turn off Nagle's algorithm */
1179        tcp_sock_set_nodelay(sock->sk);
1180
1181        sock_set_reuseaddr(sock->sk);
1182
1183        write_lock_bh(&sock->sk->sk_callback_lock);
1184        sock->sk->sk_user_data = con;
1185        save_listen_callbacks(sock);
1186        con->rx_action = accept_from_sock;
1187        con->connect_action = tcp_connect_to_sock;
1188        write_unlock_bh(&sock->sk->sk_callback_lock);
1189
1190        /* Bind to our port */
1191        make_sockaddr(saddr, dlm_config.ci_tcp_port, &addr_len);
1192        result = sock->ops->bind(sock, (struct sockaddr *) saddr, addr_len);
1193        if (result < 0) {
1194                log_print("Can't bind to port %d", dlm_config.ci_tcp_port);
1195                sock_release(sock);
1196                sock = NULL;
1197                con->sock = NULL;
1198                goto create_out;
1199        }
1200        sock_set_keepalive(sock->sk);
1201
1202        result = sock->ops->listen(sock, 5);
1203        if (result < 0) {
1204                log_print("Can't listen on port %d", dlm_config.ci_tcp_port);
1205                sock_release(sock);
1206                sock = NULL;
1207                goto create_out;
1208        }
1209
1210create_out:
1211        return sock;
1212}
1213
1214/* Get local addresses */
1215static void init_local(void)
1216{
1217        struct sockaddr_storage sas, *addr;
1218        int i;
1219
1220        dlm_local_count = 0;
1221        for (i = 0; i < DLM_MAX_ADDR_COUNT; i++) {
1222                if (dlm_our_addr(&sas, i))
1223                        break;
1224
1225                addr = kmemdup(&sas, sizeof(*addr), GFP_NOFS);
1226                if (!addr)
1227                        break;
1228                dlm_local_addr[dlm_local_count++] = addr;
1229        }
1230}
1231
1232static void deinit_local(void)
1233{
1234        int i;
1235
1236        for (i = 0; i < dlm_local_count; i++)
1237                kfree(dlm_local_addr[i]);
1238}
1239
1240/* Initialise SCTP socket and bind to all interfaces */
1241static int sctp_listen_for_all(void)
1242{
1243        struct socket *sock = NULL;
1244        int result = -EINVAL;
1245        struct connection *con = nodeid2con(0, GFP_NOFS);
1246
1247        if (!con)
1248                return -ENOMEM;
1249
1250        log_print("Using SCTP for communications");
1251
1252        result = sock_create_kern(&init_net, dlm_local_addr[0]->ss_family,
1253                                  SOCK_STREAM, IPPROTO_SCTP, &sock);
1254        if (result < 0) {
1255                log_print("Can't create comms socket, check SCTP is loaded");
1256                goto out;
1257        }
1258
1259        sock_set_rcvbuf(sock->sk, NEEDED_RMEM);
1260        sock_set_mark(sock->sk, dlm_config.ci_mark);
1261        sctp_sock_set_nodelay(sock->sk);
1262
1263        write_lock_bh(&sock->sk->sk_callback_lock);
1264        /* Init con struct */
1265        sock->sk->sk_user_data = con;
1266        save_listen_callbacks(sock);
1267        con->sock = sock;
1268        con->sock->sk->sk_data_ready = lowcomms_data_ready;
1269        con->rx_action = accept_from_sock;
1270        con->connect_action = sctp_connect_to_sock;
1271
1272        write_unlock_bh(&sock->sk->sk_callback_lock);
1273
1274        /* Bind to all addresses. */
1275        if (sctp_bind_addrs(con, dlm_config.ci_tcp_port))
1276                goto create_delsock;
1277
1278        result = sock->ops->listen(sock, 5);
1279        if (result < 0) {
1280                log_print("Can't set socket listening");
1281                goto create_delsock;
1282        }
1283
1284        return 0;
1285
1286create_delsock:
1287        sock_release(sock);
1288        con->sock = NULL;
1289out:
1290        return result;
1291}
1292
1293static int tcp_listen_for_all(void)
1294{
1295        struct socket *sock = NULL;
1296        struct connection *con = nodeid2con(0, GFP_NOFS);
1297        int result = -EINVAL;
1298
1299        if (!con)
1300                return -ENOMEM;
1301
1302        /* We don't support multi-homed hosts */
1303        if (dlm_local_addr[1] != NULL) {
1304                log_print("TCP protocol can't handle multi-homed hosts, "
1305                          "try SCTP");
1306                return -EINVAL;
1307        }
1308
1309        log_print("Using TCP for communications");
1310
1311        sock = tcp_create_listen_sock(con, dlm_local_addr[0]);
1312        if (sock) {
1313                add_sock(sock, con);
1314                result = 0;
1315        }
1316        else {
1317                result = -EADDRINUSE;
1318        }
1319
1320        return result;
1321}
1322
1323
1324
1325static struct writequeue_entry *new_writequeue_entry(struct connection *con,
1326                                                     gfp_t allocation)
1327{
1328        struct writequeue_entry *entry;
1329
1330        entry = kmalloc(sizeof(struct writequeue_entry), allocation);
1331        if (!entry)
1332                return NULL;
1333
1334        entry->page = alloc_page(allocation);
1335        if (!entry->page) {
1336                kfree(entry);
1337                return NULL;
1338        }
1339
1340        entry->offset = 0;
1341        entry->len = 0;
1342        entry->end = 0;
1343        entry->users = 0;
1344        entry->con = con;
1345
1346        return entry;
1347}
1348
1349void *dlm_lowcomms_get_buffer(int nodeid, int len, gfp_t allocation, char **ppc)
1350{
1351        struct connection *con;
1352        struct writequeue_entry *e;
1353        int offset = 0;
1354
1355        con = nodeid2con(nodeid, allocation);
1356        if (!con)
1357                return NULL;
1358
1359        spin_lock(&con->writequeue_lock);
1360        e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
1361        if ((&e->list == &con->writequeue) ||
1362            (PAGE_SIZE - e->end < len)) {
1363                e = NULL;
1364        } else {
1365                offset = e->end;
1366                e->end += len;
1367                e->users++;
1368        }
1369        spin_unlock(&con->writequeue_lock);
1370
1371        if (e) {
1372        got_one:
1373                *ppc = page_address(e->page) + offset;
1374                return e;
1375        }
1376
1377        e = new_writequeue_entry(con, allocation);
1378        if (e) {
1379                spin_lock(&con->writequeue_lock);
1380                offset = e->end;
1381                e->end += len;
1382                e->users++;
1383                list_add_tail(&e->list, &con->writequeue);
1384                spin_unlock(&con->writequeue_lock);
1385                goto got_one;
1386        }
1387        return NULL;
1388}
1389
1390void dlm_lowcomms_commit_buffer(void *mh)
1391{
1392        struct writequeue_entry *e = (struct writequeue_entry *)mh;
1393        struct connection *con = e->con;
1394        int users;
1395
1396        spin_lock(&con->writequeue_lock);
1397        users = --e->users;
1398        if (users)
1399                goto out;
1400        e->len = e->end - e->offset;
1401        spin_unlock(&con->writequeue_lock);
1402
1403        queue_work(send_workqueue, &con->swork);
1404        return;
1405
1406out:
1407        spin_unlock(&con->writequeue_lock);
1408        return;
1409}
1410
1411/* Send a message */
1412static void send_to_sock(struct connection *con)
1413{
1414        int ret = 0;
1415        const int msg_flags = MSG_DONTWAIT | MSG_NOSIGNAL;
1416        struct writequeue_entry *e;
1417        int len, offset;
1418        int count = 0;
1419
1420        mutex_lock(&con->sock_mutex);
1421        if (con->sock == NULL)
1422                goto out_connect;
1423
1424        spin_lock(&con->writequeue_lock);
1425        for (;;) {
1426                e = list_entry(con->writequeue.next, struct writequeue_entry,
1427                               list);
1428                if ((struct list_head *) e == &con->writequeue)
1429                        break;
1430
1431                len = e->len;
1432                offset = e->offset;
1433                BUG_ON(len == 0 && e->users == 0);
1434                spin_unlock(&con->writequeue_lock);
1435
1436                ret = 0;
1437                if (len) {
1438                        ret = kernel_sendpage(con->sock, e->page, offset, len,
1439                                              msg_flags);
1440                        if (ret == -EAGAIN || ret == 0) {
1441                                if (ret == -EAGAIN &&
1442                                    test_bit(SOCKWQ_ASYNC_NOSPACE, &con->sock->flags) &&
1443                                    !test_and_set_bit(CF_APP_LIMITED, &con->flags)) {
1444                                        /* Notify TCP that we're limited by the
1445                                         * application window size.
1446                                         */
1447                                        set_bit(SOCK_NOSPACE, &con->sock->flags);
1448                                        con->sock->sk->sk_write_pending++;
1449                                }
1450                                cond_resched();
1451                                goto out;
1452                        } else if (ret < 0)
1453                                goto send_error;
1454                }
1455
1456                /* Don't starve people filling buffers */
1457                if (++count >= MAX_SEND_MSG_COUNT) {
1458                        cond_resched();
1459                        count = 0;
1460                }
1461
1462                spin_lock(&con->writequeue_lock);
1463                writequeue_entry_complete(e, ret);
1464        }
1465        spin_unlock(&con->writequeue_lock);
1466out:
1467        mutex_unlock(&con->sock_mutex);
1468        return;
1469
1470send_error:
1471        mutex_unlock(&con->sock_mutex);
1472        close_connection(con, false, false, true);
1473        /* Requeue the send work. When the work daemon runs again, it will try
1474           a new connection, then call this function again. */
1475        queue_work(send_workqueue, &con->swork);
1476        return;
1477
1478out_connect:
1479        mutex_unlock(&con->sock_mutex);
1480        queue_work(send_workqueue, &con->swork);
1481        cond_resched();
1482}
1483
1484static void clean_one_writequeue(struct connection *con)
1485{
1486        struct writequeue_entry *e, *safe;
1487
1488        spin_lock(&con->writequeue_lock);
1489        list_for_each_entry_safe(e, safe, &con->writequeue, list) {
1490                list_del(&e->list);
1491                free_entry(e);
1492        }
1493        spin_unlock(&con->writequeue_lock);
1494}
1495
1496/* Called from recovery when it knows that a node has
1497   left the cluster */
1498int dlm_lowcomms_close(int nodeid)
1499{
1500        struct connection *con;
1501        struct dlm_node_addr *na;
1502
1503        log_print("closing connection to node %d", nodeid);
1504        con = nodeid2con(nodeid, 0);
1505        if (con) {
1506                set_bit(CF_CLOSE, &con->flags);
1507                close_connection(con, true, true, true);
1508                clean_one_writequeue(con);
1509        }
1510
1511        spin_lock(&dlm_node_addrs_spin);
1512        na = find_node_addr(nodeid);
1513        if (na) {
1514                list_del(&na->list);
1515                while (na->addr_count--)
1516                        kfree(na->addr[na->addr_count]);
1517                kfree(na);
1518        }
1519        spin_unlock(&dlm_node_addrs_spin);
1520
1521        return 0;
1522}
1523
1524/* Receive workqueue function */
1525static void process_recv_sockets(struct work_struct *work)
1526{
1527        struct connection *con = container_of(work, struct connection, rwork);
1528        int err;
1529
1530        clear_bit(CF_READ_PENDING, &con->flags);
1531        do {
1532                err = con->rx_action(con);
1533        } while (!err);
1534}
1535
1536/* Send workqueue function */
1537static void process_send_sockets(struct work_struct *work)
1538{
1539        struct connection *con = container_of(work, struct connection, swork);
1540
1541        clear_bit(CF_WRITE_PENDING, &con->flags);
1542        if (con->sock == NULL) /* not mutex protected so check it inside too */
1543                con->connect_action(con);
1544        if (!list_empty(&con->writequeue))
1545                send_to_sock(con);
1546}
1547
1548static void work_stop(void)
1549{
1550        if (recv_workqueue)
1551                destroy_workqueue(recv_workqueue);
1552        if (send_workqueue)
1553                destroy_workqueue(send_workqueue);
1554}
1555
1556static int work_start(void)
1557{
1558        recv_workqueue = alloc_workqueue("dlm_recv",
1559                                         WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1560        if (!recv_workqueue) {
1561                log_print("can't start dlm_recv");
1562                return -ENOMEM;
1563        }
1564
1565        send_workqueue = alloc_workqueue("dlm_send",
1566                                         WQ_UNBOUND | WQ_MEM_RECLAIM, 1);
1567        if (!send_workqueue) {
1568                log_print("can't start dlm_send");
1569                destroy_workqueue(recv_workqueue);
1570                return -ENOMEM;
1571        }
1572
1573        return 0;
1574}
1575
1576static void _stop_conn(struct connection *con, bool and_other)
1577{
1578        mutex_lock(&con->sock_mutex);
1579        set_bit(CF_CLOSE, &con->flags);
1580        set_bit(CF_READ_PENDING, &con->flags);
1581        set_bit(CF_WRITE_PENDING, &con->flags);
1582        if (con->sock && con->sock->sk) {
1583                write_lock_bh(&con->sock->sk->sk_callback_lock);
1584                con->sock->sk->sk_user_data = NULL;
1585                write_unlock_bh(&con->sock->sk->sk_callback_lock);
1586        }
1587        if (con->othercon && and_other)
1588                _stop_conn(con->othercon, false);
1589        mutex_unlock(&con->sock_mutex);
1590}
1591
1592static void stop_conn(struct connection *con)
1593{
1594        _stop_conn(con, true);
1595}
1596
1597static void shutdown_conn(struct connection *con)
1598{
1599        if (con->shutdown_action)
1600                con->shutdown_action(con);
1601}
1602
1603static void connection_release(struct rcu_head *rcu)
1604{
1605        struct connection *con = container_of(rcu, struct connection, rcu);
1606
1607        kfree(con->rx_buf);
1608        kfree(con);
1609}
1610
1611static void free_conn(struct connection *con)
1612{
1613        close_connection(con, true, true, true);
1614        spin_lock(&connections_lock);
1615        hlist_del_rcu(&con->list);
1616        spin_unlock(&connections_lock);
1617        if (con->othercon) {
1618                clean_one_writequeue(con->othercon);
1619                call_rcu(&con->othercon->rcu, connection_release);
1620        }
1621        clean_one_writequeue(con);
1622        call_rcu(&con->rcu, connection_release);
1623}
1624
1625static void work_flush(void)
1626{
1627        int ok, idx;
1628        int i;
1629        struct connection *con;
1630
1631        do {
1632                ok = 1;
1633                foreach_conn(stop_conn);
1634                if (recv_workqueue)
1635                        flush_workqueue(recv_workqueue);
1636                if (send_workqueue)
1637                        flush_workqueue(send_workqueue);
1638                idx = srcu_read_lock(&connections_srcu);
1639                for (i = 0; i < CONN_HASH_SIZE && ok; i++) {
1640                        hlist_for_each_entry_rcu(con, &connection_hash[i],
1641                                                 list) {
1642                                ok &= test_bit(CF_READ_PENDING, &con->flags);
1643                                ok &= test_bit(CF_WRITE_PENDING, &con->flags);
1644                                if (con->othercon) {
1645                                        ok &= test_bit(CF_READ_PENDING,
1646                                                       &con->othercon->flags);
1647                                        ok &= test_bit(CF_WRITE_PENDING,
1648                                                       &con->othercon->flags);
1649                                }
1650                        }
1651                }
1652                srcu_read_unlock(&connections_srcu, idx);
1653        } while (!ok);
1654}
1655
1656void dlm_lowcomms_stop(void)
1657{
1658        /* Set all the flags to prevent any
1659           socket activity.
1660        */
1661        dlm_allow_conn = 0;
1662
1663        if (recv_workqueue)
1664                flush_workqueue(recv_workqueue);
1665        if (send_workqueue)
1666                flush_workqueue(send_workqueue);
1667
1668        foreach_conn(shutdown_conn);
1669        work_flush();
1670        foreach_conn(free_conn);
1671        work_stop();
1672        deinit_local();
1673}
1674
1675int dlm_lowcomms_start(void)
1676{
1677        int error = -EINVAL;
1678        struct connection *con;
1679        int i;
1680
1681        for (i = 0; i < CONN_HASH_SIZE; i++)
1682                INIT_HLIST_HEAD(&connection_hash[i]);
1683
1684        init_local();
1685        if (!dlm_local_count) {
1686                error = -ENOTCONN;
1687                log_print("no local IP address has been set");
1688                goto fail;
1689        }
1690
1691        error = work_start();
1692        if (error)
1693                goto fail;
1694
1695        dlm_allow_conn = 1;
1696
1697        /* Start listening */
1698        if (dlm_config.ci_protocol == 0)
1699                error = tcp_listen_for_all();
1700        else
1701                error = sctp_listen_for_all();
1702        if (error)
1703                goto fail_unlisten;
1704
1705        return 0;
1706
1707fail_unlisten:
1708        dlm_allow_conn = 0;
1709        con = nodeid2con(0,0);
1710        if (con)
1711                free_conn(con);
1712fail:
1713        return error;
1714}
1715
1716void dlm_lowcomms_exit(void)
1717{
1718        struct dlm_node_addr *na, *safe;
1719
1720        spin_lock(&dlm_node_addrs_spin);
1721        list_for_each_entry_safe(na, safe, &dlm_node_addrs, list) {
1722                list_del(&na->list);
1723                while (na->addr_count--)
1724                        kfree(na->addr[na->addr_count]);
1725                kfree(na);
1726        }
1727        spin_unlock(&dlm_node_addrs_spin);
1728}
1729