linux/net/rds/connection.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/kernel.h>
  34#include <linux/list.h>
  35#include <linux/slab.h>
  36#include <linux/export.h>
  37#include <net/ipv6.h>
  38#include <net/inet6_hashtables.h>
  39#include <net/addrconf.h>
  40
  41#include "rds.h"
  42#include "loop.h"
  43
  44#define RDS_CONNECTION_HASH_BITS 12
  45#define RDS_CONNECTION_HASH_ENTRIES (1 << RDS_CONNECTION_HASH_BITS)
  46#define RDS_CONNECTION_HASH_MASK (RDS_CONNECTION_HASH_ENTRIES - 1)
  47
  48/* converting this to RCU is a chore for another day.. */
  49static DEFINE_SPINLOCK(rds_conn_lock);
  50static unsigned long rds_conn_count;
  51static struct hlist_head rds_conn_hash[RDS_CONNECTION_HASH_ENTRIES];
  52static struct kmem_cache *rds_conn_slab;
  53
  54static struct hlist_head *rds_conn_bucket(const struct in6_addr *laddr,
  55                                          const struct in6_addr *faddr)
  56{
  57        static u32 rds6_hash_secret __read_mostly;
  58        static u32 rds_hash_secret __read_mostly;
  59
  60        u32 lhash, fhash, hash;
  61
  62        net_get_random_once(&rds_hash_secret, sizeof(rds_hash_secret));
  63        net_get_random_once(&rds6_hash_secret, sizeof(rds6_hash_secret));
  64
  65        lhash = (__force u32)laddr->s6_addr32[3];
  66#if IS_ENABLED(CONFIG_IPV6)
  67        fhash = __ipv6_addr_jhash(faddr, rds6_hash_secret);
  68#else
  69        fhash = (__force u32)faddr->s6_addr32[3];
  70#endif
  71        hash = __inet_ehashfn(lhash, 0, fhash, 0, rds_hash_secret);
  72
  73        return &rds_conn_hash[hash & RDS_CONNECTION_HASH_MASK];
  74}
  75
  76#define rds_conn_info_set(var, test, suffix) do {               \
  77        if (test)                                               \
  78                var |= RDS_INFO_CONNECTION_FLAG_##suffix;       \
  79} while (0)
  80
  81/* rcu read lock must be held or the connection spinlock */
  82static struct rds_connection *rds_conn_lookup(struct net *net,
  83                                              struct hlist_head *head,
  84                                              const struct in6_addr *laddr,
  85                                              const struct in6_addr *faddr,
  86                                              struct rds_transport *trans,
  87                                              u8 tos, int dev_if)
  88{
  89        struct rds_connection *conn, *ret = NULL;
  90
  91        hlist_for_each_entry_rcu(conn, head, c_hash_node) {
  92                if (ipv6_addr_equal(&conn->c_faddr, faddr) &&
  93                    ipv6_addr_equal(&conn->c_laddr, laddr) &&
  94                    conn->c_trans == trans &&
  95                    conn->c_tos == tos &&
  96                    net == rds_conn_net(conn) &&
  97                    conn->c_dev_if == dev_if) {
  98                        ret = conn;
  99                        break;
 100                }
 101        }
 102        rdsdebug("returning conn %p for %pI6c -> %pI6c\n", ret,
 103                 laddr, faddr);
 104        return ret;
 105}
 106
 107/*
 108 * This is called by transports as they're bringing down a connection.
 109 * It clears partial message state so that the transport can start sending
 110 * and receiving over this connection again in the future.  It is up to
 111 * the transport to have serialized this call with its send and recv.
 112 */
 113static void rds_conn_path_reset(struct rds_conn_path *cp)
 114{
 115        struct rds_connection *conn = cp->cp_conn;
 116
 117        rdsdebug("connection %pI6c to %pI6c reset\n",
 118                 &conn->c_laddr, &conn->c_faddr);
 119
 120        rds_stats_inc(s_conn_reset);
 121        rds_send_path_reset(cp);
 122        cp->cp_flags = 0;
 123
 124        /* Do not clear next_rx_seq here, else we cannot distinguish
 125         * retransmitted packets from new packets, and will hand all
 126         * of them to the application. That is not consistent with the
 127         * reliability guarantees of RDS. */
 128}
 129
 130static void __rds_conn_path_init(struct rds_connection *conn,
 131                                 struct rds_conn_path *cp, bool is_outgoing)
 132{
 133        spin_lock_init(&cp->cp_lock);
 134        cp->cp_next_tx_seq = 1;
 135        init_waitqueue_head(&cp->cp_waitq);
 136        INIT_LIST_HEAD(&cp->cp_send_queue);
 137        INIT_LIST_HEAD(&cp->cp_retrans);
 138
 139        cp->cp_conn = conn;
 140        atomic_set(&cp->cp_state, RDS_CONN_DOWN);
 141        cp->cp_send_gen = 0;
 142        cp->cp_reconnect_jiffies = 0;
 143        cp->cp_conn->c_proposed_version = RDS_PROTOCOL_VERSION;
 144        INIT_DELAYED_WORK(&cp->cp_send_w, rds_send_worker);
 145        INIT_DELAYED_WORK(&cp->cp_recv_w, rds_recv_worker);
 146        INIT_DELAYED_WORK(&cp->cp_conn_w, rds_connect_worker);
 147        INIT_WORK(&cp->cp_down_w, rds_shutdown_worker);
 148        mutex_init(&cp->cp_cm_lock);
 149        cp->cp_flags = 0;
 150}
 151
 152/*
 153 * There is only every one 'conn' for a given pair of addresses in the
 154 * system at a time.  They contain messages to be retransmitted and so
 155 * span the lifetime of the actual underlying transport connections.
 156 *
 157 * For now they are not garbage collected once they're created.  They
 158 * are torn down as the module is removed, if ever.
 159 */
 160static struct rds_connection *__rds_conn_create(struct net *net,
 161                                                const struct in6_addr *laddr,
 162                                                const struct in6_addr *faddr,
 163                                                struct rds_transport *trans,
 164                                                gfp_t gfp, u8 tos,
 165                                                int is_outgoing,
 166                                                int dev_if)
 167{
 168        struct rds_connection *conn, *parent = NULL;
 169        struct hlist_head *head = rds_conn_bucket(laddr, faddr);
 170        struct rds_transport *loop_trans;
 171        unsigned long flags;
 172        int ret, i;
 173        int npaths = (trans->t_mp_capable ? RDS_MPATH_WORKERS : 1);
 174
 175        rcu_read_lock();
 176        conn = rds_conn_lookup(net, head, laddr, faddr, trans, tos, dev_if);
 177        if (conn &&
 178            conn->c_loopback &&
 179            conn->c_trans != &rds_loop_transport &&
 180            ipv6_addr_equal(laddr, faddr) &&
 181            !is_outgoing) {
 182                /* This is a looped back IB connection, and we're
 183                 * called by the code handling the incoming connect.
 184                 * We need a second connection object into which we
 185                 * can stick the other QP. */
 186                parent = conn;
 187                conn = parent->c_passive;
 188        }
 189        rcu_read_unlock();
 190        if (conn)
 191                goto out;
 192
 193        conn = kmem_cache_zalloc(rds_conn_slab, gfp);
 194        if (!conn) {
 195                conn = ERR_PTR(-ENOMEM);
 196                goto out;
 197        }
 198        conn->c_path = kcalloc(npaths, sizeof(struct rds_conn_path), gfp);
 199        if (!conn->c_path) {
 200                kmem_cache_free(rds_conn_slab, conn);
 201                conn = ERR_PTR(-ENOMEM);
 202                goto out;
 203        }
 204
 205        INIT_HLIST_NODE(&conn->c_hash_node);
 206        conn->c_laddr = *laddr;
 207        conn->c_isv6 = !ipv6_addr_v4mapped(laddr);
 208        conn->c_faddr = *faddr;
 209        conn->c_dev_if = dev_if;
 210        conn->c_tos = tos;
 211
 212#if IS_ENABLED(CONFIG_IPV6)
 213        /* If the local address is link local, set c_bound_if to be the
 214         * index used for this connection.  Otherwise, set it to 0 as
 215         * the socket is not bound to an interface.  c_bound_if is used
 216         * to look up a socket when a packet is received
 217         */
 218        if (ipv6_addr_type(laddr) & IPV6_ADDR_LINKLOCAL)
 219                conn->c_bound_if = dev_if;
 220        else
 221#endif
 222                conn->c_bound_if = 0;
 223
 224        rds_conn_net_set(conn, net);
 225
 226        ret = rds_cong_get_maps(conn);
 227        if (ret) {
 228                kfree(conn->c_path);
 229                kmem_cache_free(rds_conn_slab, conn);
 230                conn = ERR_PTR(ret);
 231                goto out;
 232        }
 233
 234        /*
 235         * This is where a connection becomes loopback.  If *any* RDS sockets
 236         * can bind to the destination address then we'd rather the messages
 237         * flow through loopback rather than either transport.
 238         */
 239        loop_trans = rds_trans_get_preferred(net, faddr, conn->c_dev_if);
 240        if (loop_trans) {
 241                rds_trans_put(loop_trans);
 242                conn->c_loopback = 1;
 243                if (is_outgoing && trans->t_prefer_loopback) {
 244                        /* "outgoing" connection - and the transport
 245                         * says it wants the connection handled by the
 246                         * loopback transport. This is what TCP does.
 247                         */
 248                        trans = &rds_loop_transport;
 249                }
 250        }
 251
 252        conn->c_trans = trans;
 253
 254        init_waitqueue_head(&conn->c_hs_waitq);
 255        for (i = 0; i < npaths; i++) {
 256                __rds_conn_path_init(conn, &conn->c_path[i],
 257                                     is_outgoing);
 258                conn->c_path[i].cp_index = i;
 259        }
 260        rcu_read_lock();
 261        if (rds_destroy_pending(conn))
 262                ret = -ENETDOWN;
 263        else
 264                ret = trans->conn_alloc(conn, GFP_ATOMIC);
 265        if (ret) {
 266                rcu_read_unlock();
 267                kfree(conn->c_path);
 268                kmem_cache_free(rds_conn_slab, conn);
 269                conn = ERR_PTR(ret);
 270                goto out;
 271        }
 272
 273        rdsdebug("allocated conn %p for %pI6c -> %pI6c over %s %s\n",
 274                 conn, laddr, faddr,
 275                 strnlen(trans->t_name, sizeof(trans->t_name)) ?
 276                 trans->t_name : "[unknown]", is_outgoing ? "(outgoing)" : "");
 277
 278        /*
 279         * Since we ran without holding the conn lock, someone could
 280         * have created the same conn (either normal or passive) in the
 281         * interim. We check while holding the lock. If we won, we complete
 282         * init and return our conn. If we lost, we rollback and return the
 283         * other one.
 284         */
 285        spin_lock_irqsave(&rds_conn_lock, flags);
 286        if (parent) {
 287                /* Creating passive conn */
 288                if (parent->c_passive) {
 289                        trans->conn_free(conn->c_path[0].cp_transport_data);
 290                        kfree(conn->c_path);
 291                        kmem_cache_free(rds_conn_slab, conn);
 292                        conn = parent->c_passive;
 293                } else {
 294                        parent->c_passive = conn;
 295                        rds_cong_add_conn(conn);
 296                        rds_conn_count++;
 297                }
 298        } else {
 299                /* Creating normal conn */
 300                struct rds_connection *found;
 301
 302                found = rds_conn_lookup(net, head, laddr, faddr, trans,
 303                                        tos, dev_if);
 304                if (found) {
 305                        struct rds_conn_path *cp;
 306                        int i;
 307
 308                        for (i = 0; i < npaths; i++) {
 309                                cp = &conn->c_path[i];
 310                                /* The ->conn_alloc invocation may have
 311                                 * allocated resource for all paths, so all
 312                                 * of them may have to be freed here.
 313                                 */
 314                                if (cp->cp_transport_data)
 315                                        trans->conn_free(cp->cp_transport_data);
 316                        }
 317                        kfree(conn->c_path);
 318                        kmem_cache_free(rds_conn_slab, conn);
 319                        conn = found;
 320                } else {
 321                        conn->c_my_gen_num = rds_gen_num;
 322                        conn->c_peer_gen_num = 0;
 323                        hlist_add_head_rcu(&conn->c_hash_node, head);
 324                        rds_cong_add_conn(conn);
 325                        rds_conn_count++;
 326                }
 327        }
 328        spin_unlock_irqrestore(&rds_conn_lock, flags);
 329        rcu_read_unlock();
 330
 331out:
 332        return conn;
 333}
 334
 335struct rds_connection *rds_conn_create(struct net *net,
 336                                       const struct in6_addr *laddr,
 337                                       const struct in6_addr *faddr,
 338                                       struct rds_transport *trans, u8 tos,
 339                                       gfp_t gfp, int dev_if)
 340{
 341        return __rds_conn_create(net, laddr, faddr, trans, gfp, tos, 0, dev_if);
 342}
 343EXPORT_SYMBOL_GPL(rds_conn_create);
 344
 345struct rds_connection *rds_conn_create_outgoing(struct net *net,
 346                                                const struct in6_addr *laddr,
 347                                                const struct in6_addr *faddr,
 348                                                struct rds_transport *trans,
 349                                                u8 tos, gfp_t gfp, int dev_if)
 350{
 351        return __rds_conn_create(net, laddr, faddr, trans, gfp, tos, 1, dev_if);
 352}
 353EXPORT_SYMBOL_GPL(rds_conn_create_outgoing);
 354
 355void rds_conn_shutdown(struct rds_conn_path *cp)
 356{
 357        struct rds_connection *conn = cp->cp_conn;
 358
 359        /* shut it down unless it's down already */
 360        if (!rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_DOWN)) {
 361                /*
 362                 * Quiesce the connection mgmt handlers before we start tearing
 363                 * things down. We don't hold the mutex for the entire
 364                 * duration of the shutdown operation, else we may be
 365                 * deadlocking with the CM handler. Instead, the CM event
 366                 * handler is supposed to check for state DISCONNECTING
 367                 */
 368                mutex_lock(&cp->cp_cm_lock);
 369                if (!rds_conn_path_transition(cp, RDS_CONN_UP,
 370                                              RDS_CONN_DISCONNECTING) &&
 371                    !rds_conn_path_transition(cp, RDS_CONN_ERROR,
 372                                              RDS_CONN_DISCONNECTING)) {
 373                        rds_conn_path_error(cp,
 374                                            "shutdown called in state %d\n",
 375                                            atomic_read(&cp->cp_state));
 376                        mutex_unlock(&cp->cp_cm_lock);
 377                        return;
 378                }
 379                mutex_unlock(&cp->cp_cm_lock);
 380
 381                wait_event(cp->cp_waitq,
 382                           !test_bit(RDS_IN_XMIT, &cp->cp_flags));
 383                wait_event(cp->cp_waitq,
 384                           !test_bit(RDS_RECV_REFILL, &cp->cp_flags));
 385
 386                conn->c_trans->conn_path_shutdown(cp);
 387                rds_conn_path_reset(cp);
 388
 389                if (!rds_conn_path_transition(cp, RDS_CONN_DISCONNECTING,
 390                                              RDS_CONN_DOWN) &&
 391                    !rds_conn_path_transition(cp, RDS_CONN_ERROR,
 392                                              RDS_CONN_DOWN)) {
 393                        /* This can happen - eg when we're in the middle of tearing
 394                         * down the connection, and someone unloads the rds module.
 395                         * Quite reproducible with loopback connections.
 396                         * Mostly harmless.
 397                         *
 398                         * Note that this also happens with rds-tcp because
 399                         * we could have triggered rds_conn_path_drop in irq
 400                         * mode from rds_tcp_state change on the receipt of
 401                         * a FIN, thus we need to recheck for RDS_CONN_ERROR
 402                         * here.
 403                         */
 404                        rds_conn_path_error(cp, "%s: failed to transition "
 405                                            "to state DOWN, current state "
 406                                            "is %d\n", __func__,
 407                                            atomic_read(&cp->cp_state));
 408                        return;
 409                }
 410        }
 411
 412        /* Then reconnect if it's still live.
 413         * The passive side of an IB loopback connection is never added
 414         * to the conn hash, so we never trigger a reconnect on this
 415         * conn - the reconnect is always triggered by the active peer. */
 416        cancel_delayed_work_sync(&cp->cp_conn_w);
 417        rcu_read_lock();
 418        if (!hlist_unhashed(&conn->c_hash_node)) {
 419                rcu_read_unlock();
 420                rds_queue_reconnect(cp);
 421        } else {
 422                rcu_read_unlock();
 423        }
 424}
 425
 426/* destroy a single rds_conn_path. rds_conn_destroy() iterates over
 427 * all paths using rds_conn_path_destroy()
 428 */
 429static void rds_conn_path_destroy(struct rds_conn_path *cp)
 430{
 431        struct rds_message *rm, *rtmp;
 432
 433        if (!cp->cp_transport_data)
 434                return;
 435
 436        /* make sure lingering queued work won't try to ref the conn */
 437        cancel_delayed_work_sync(&cp->cp_send_w);
 438        cancel_delayed_work_sync(&cp->cp_recv_w);
 439
 440        rds_conn_path_drop(cp, true);
 441        flush_work(&cp->cp_down_w);
 442
 443        /* tear down queued messages */
 444        list_for_each_entry_safe(rm, rtmp,
 445                                 &cp->cp_send_queue,
 446                                 m_conn_item) {
 447                list_del_init(&rm->m_conn_item);
 448                BUG_ON(!list_empty(&rm->m_sock_item));
 449                rds_message_put(rm);
 450        }
 451        if (cp->cp_xmit_rm)
 452                rds_message_put(cp->cp_xmit_rm);
 453
 454        WARN_ON(delayed_work_pending(&cp->cp_send_w));
 455        WARN_ON(delayed_work_pending(&cp->cp_recv_w));
 456        WARN_ON(delayed_work_pending(&cp->cp_conn_w));
 457        WARN_ON(work_pending(&cp->cp_down_w));
 458
 459        cp->cp_conn->c_trans->conn_free(cp->cp_transport_data);
 460}
 461
 462/*
 463 * Stop and free a connection.
 464 *
 465 * This can only be used in very limited circumstances.  It assumes that once
 466 * the conn has been shutdown that no one else is referencing the connection.
 467 * We can only ensure this in the rmmod path in the current code.
 468 */
 469void rds_conn_destroy(struct rds_connection *conn)
 470{
 471        unsigned long flags;
 472        int i;
 473        struct rds_conn_path *cp;
 474        int npaths = (conn->c_trans->t_mp_capable ? RDS_MPATH_WORKERS : 1);
 475
 476        rdsdebug("freeing conn %p for %pI4 -> "
 477                 "%pI4\n", conn, &conn->c_laddr,
 478                 &conn->c_faddr);
 479
 480        /* Ensure conn will not be scheduled for reconnect */
 481        spin_lock_irq(&rds_conn_lock);
 482        hlist_del_init_rcu(&conn->c_hash_node);
 483        spin_unlock_irq(&rds_conn_lock);
 484        synchronize_rcu();
 485
 486        /* shut the connection down */
 487        for (i = 0; i < npaths; i++) {
 488                cp = &conn->c_path[i];
 489                rds_conn_path_destroy(cp);
 490                BUG_ON(!list_empty(&cp->cp_retrans));
 491        }
 492
 493        /*
 494         * The congestion maps aren't freed up here.  They're
 495         * freed by rds_cong_exit() after all the connections
 496         * have been freed.
 497         */
 498        rds_cong_remove_conn(conn);
 499
 500        kfree(conn->c_path);
 501        kmem_cache_free(rds_conn_slab, conn);
 502
 503        spin_lock_irqsave(&rds_conn_lock, flags);
 504        rds_conn_count--;
 505        spin_unlock_irqrestore(&rds_conn_lock, flags);
 506}
 507EXPORT_SYMBOL_GPL(rds_conn_destroy);
 508
 509static void __rds_inc_msg_cp(struct rds_incoming *inc,
 510                             struct rds_info_iterator *iter,
 511                             void *saddr, void *daddr, int flip, bool isv6)
 512{
 513#if IS_ENABLED(CONFIG_IPV6)
 514        if (isv6)
 515                rds6_inc_info_copy(inc, iter, saddr, daddr, flip);
 516        else
 517#endif
 518                rds_inc_info_copy(inc, iter, *(__be32 *)saddr,
 519                                  *(__be32 *)daddr, flip);
 520}
 521
 522static void rds_conn_message_info_cmn(struct socket *sock, unsigned int len,
 523                                      struct rds_info_iterator *iter,
 524                                      struct rds_info_lengths *lens,
 525                                      int want_send, bool isv6)
 526{
 527        struct hlist_head *head;
 528        struct list_head *list;
 529        struct rds_connection *conn;
 530        struct rds_message *rm;
 531        unsigned int total = 0;
 532        unsigned long flags;
 533        size_t i;
 534        int j;
 535
 536        if (isv6)
 537                len /= sizeof(struct rds6_info_message);
 538        else
 539                len /= sizeof(struct rds_info_message);
 540
 541        rcu_read_lock();
 542
 543        for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
 544             i++, head++) {
 545                hlist_for_each_entry_rcu(conn, head, c_hash_node) {
 546                        struct rds_conn_path *cp;
 547                        int npaths;
 548
 549                        if (!isv6 && conn->c_isv6)
 550                                continue;
 551
 552                        npaths = (conn->c_trans->t_mp_capable ?
 553                                 RDS_MPATH_WORKERS : 1);
 554
 555                        for (j = 0; j < npaths; j++) {
 556                                cp = &conn->c_path[j];
 557                                if (want_send)
 558                                        list = &cp->cp_send_queue;
 559                                else
 560                                        list = &cp->cp_retrans;
 561
 562                                spin_lock_irqsave(&cp->cp_lock, flags);
 563
 564                                /* XXX too lazy to maintain counts.. */
 565                                list_for_each_entry(rm, list, m_conn_item) {
 566                                        total++;
 567                                        if (total <= len)
 568                                                __rds_inc_msg_cp(&rm->m_inc,
 569                                                                 iter,
 570                                                                 &conn->c_laddr,
 571                                                                 &conn->c_faddr,
 572                                                                 0, isv6);
 573                                }
 574
 575                                spin_unlock_irqrestore(&cp->cp_lock, flags);
 576                        }
 577                }
 578        }
 579        rcu_read_unlock();
 580
 581        lens->nr = total;
 582        if (isv6)
 583                lens->each = sizeof(struct rds6_info_message);
 584        else
 585                lens->each = sizeof(struct rds_info_message);
 586}
 587
 588static void rds_conn_message_info(struct socket *sock, unsigned int len,
 589                                  struct rds_info_iterator *iter,
 590                                  struct rds_info_lengths *lens,
 591                                  int want_send)
 592{
 593        rds_conn_message_info_cmn(sock, len, iter, lens, want_send, false);
 594}
 595
 596#if IS_ENABLED(CONFIG_IPV6)
 597static void rds6_conn_message_info(struct socket *sock, unsigned int len,
 598                                   struct rds_info_iterator *iter,
 599                                   struct rds_info_lengths *lens,
 600                                   int want_send)
 601{
 602        rds_conn_message_info_cmn(sock, len, iter, lens, want_send, true);
 603}
 604#endif
 605
 606static void rds_conn_message_info_send(struct socket *sock, unsigned int len,
 607                                       struct rds_info_iterator *iter,
 608                                       struct rds_info_lengths *lens)
 609{
 610        rds_conn_message_info(sock, len, iter, lens, 1);
 611}
 612
 613#if IS_ENABLED(CONFIG_IPV6)
 614static void rds6_conn_message_info_send(struct socket *sock, unsigned int len,
 615                                        struct rds_info_iterator *iter,
 616                                        struct rds_info_lengths *lens)
 617{
 618        rds6_conn_message_info(sock, len, iter, lens, 1);
 619}
 620#endif
 621
 622static void rds_conn_message_info_retrans(struct socket *sock,
 623                                          unsigned int len,
 624                                          struct rds_info_iterator *iter,
 625                                          struct rds_info_lengths *lens)
 626{
 627        rds_conn_message_info(sock, len, iter, lens, 0);
 628}
 629
 630#if IS_ENABLED(CONFIG_IPV6)
 631static void rds6_conn_message_info_retrans(struct socket *sock,
 632                                           unsigned int len,
 633                                           struct rds_info_iterator *iter,
 634                                           struct rds_info_lengths *lens)
 635{
 636        rds6_conn_message_info(sock, len, iter, lens, 0);
 637}
 638#endif
 639
 640void rds_for_each_conn_info(struct socket *sock, unsigned int len,
 641                          struct rds_info_iterator *iter,
 642                          struct rds_info_lengths *lens,
 643                          int (*visitor)(struct rds_connection *, void *),
 644                          u64 *buffer,
 645                          size_t item_len)
 646{
 647        struct hlist_head *head;
 648        struct rds_connection *conn;
 649        size_t i;
 650
 651        rcu_read_lock();
 652
 653        lens->nr = 0;
 654        lens->each = item_len;
 655
 656        for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
 657             i++, head++) {
 658                hlist_for_each_entry_rcu(conn, head, c_hash_node) {
 659
 660                        /* XXX no c_lock usage.. */
 661                        if (!visitor(conn, buffer))
 662                                continue;
 663
 664                        /* We copy as much as we can fit in the buffer,
 665                         * but we count all items so that the caller
 666                         * can resize the buffer. */
 667                        if (len >= item_len) {
 668                                rds_info_copy(iter, buffer, item_len);
 669                                len -= item_len;
 670                        }
 671                        lens->nr++;
 672                }
 673        }
 674        rcu_read_unlock();
 675}
 676EXPORT_SYMBOL_GPL(rds_for_each_conn_info);
 677
 678static void rds_walk_conn_path_info(struct socket *sock, unsigned int len,
 679                                    struct rds_info_iterator *iter,
 680                                    struct rds_info_lengths *lens,
 681                                    int (*visitor)(struct rds_conn_path *, void *),
 682                                    u64 *buffer,
 683                                    size_t item_len)
 684{
 685        struct hlist_head *head;
 686        struct rds_connection *conn;
 687        size_t i;
 688
 689        rcu_read_lock();
 690
 691        lens->nr = 0;
 692        lens->each = item_len;
 693
 694        for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
 695             i++, head++) {
 696                hlist_for_each_entry_rcu(conn, head, c_hash_node) {
 697                        struct rds_conn_path *cp;
 698
 699                        /* XXX We only copy the information from the first
 700                         * path for now.  The problem is that if there are
 701                         * more than one underlying paths, we cannot report
 702                         * information of all of them using the existing
 703                         * API.  For example, there is only one next_tx_seq,
 704                         * which path's next_tx_seq should we report?  It is
 705                         * a bug in the design of MPRDS.
 706                         */
 707                        cp = conn->c_path;
 708
 709                        /* XXX no cp_lock usage.. */
 710                        if (!visitor(cp, buffer))
 711                                continue;
 712
 713                        /* We copy as much as we can fit in the buffer,
 714                         * but we count all items so that the caller
 715                         * can resize the buffer.
 716                         */
 717                        if (len >= item_len) {
 718                                rds_info_copy(iter, buffer, item_len);
 719                                len -= item_len;
 720                        }
 721                        lens->nr++;
 722                }
 723        }
 724        rcu_read_unlock();
 725}
 726
 727static int rds_conn_info_visitor(struct rds_conn_path *cp, void *buffer)
 728{
 729        struct rds_info_connection *cinfo = buffer;
 730        struct rds_connection *conn = cp->cp_conn;
 731
 732        if (conn->c_isv6)
 733                return 0;
 734
 735        cinfo->next_tx_seq = cp->cp_next_tx_seq;
 736        cinfo->next_rx_seq = cp->cp_next_rx_seq;
 737        cinfo->laddr = conn->c_laddr.s6_addr32[3];
 738        cinfo->faddr = conn->c_faddr.s6_addr32[3];
 739        cinfo->tos = conn->c_tos;
 740        strncpy(cinfo->transport, conn->c_trans->t_name,
 741                sizeof(cinfo->transport));
 742        cinfo->flags = 0;
 743
 744        rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &cp->cp_flags),
 745                          SENDING);
 746        /* XXX Future: return the state rather than these funky bits */
 747        rds_conn_info_set(cinfo->flags,
 748                          atomic_read(&cp->cp_state) == RDS_CONN_CONNECTING,
 749                          CONNECTING);
 750        rds_conn_info_set(cinfo->flags,
 751                          atomic_read(&cp->cp_state) == RDS_CONN_UP,
 752                          CONNECTED);
 753        return 1;
 754}
 755
 756#if IS_ENABLED(CONFIG_IPV6)
 757static int rds6_conn_info_visitor(struct rds_conn_path *cp, void *buffer)
 758{
 759        struct rds6_info_connection *cinfo6 = buffer;
 760        struct rds_connection *conn = cp->cp_conn;
 761
 762        cinfo6->next_tx_seq = cp->cp_next_tx_seq;
 763        cinfo6->next_rx_seq = cp->cp_next_rx_seq;
 764        cinfo6->laddr = conn->c_laddr;
 765        cinfo6->faddr = conn->c_faddr;
 766        strncpy(cinfo6->transport, conn->c_trans->t_name,
 767                sizeof(cinfo6->transport));
 768        cinfo6->flags = 0;
 769
 770        rds_conn_info_set(cinfo6->flags, test_bit(RDS_IN_XMIT, &cp->cp_flags),
 771                          SENDING);
 772        /* XXX Future: return the state rather than these funky bits */
 773        rds_conn_info_set(cinfo6->flags,
 774                          atomic_read(&cp->cp_state) == RDS_CONN_CONNECTING,
 775                          CONNECTING);
 776        rds_conn_info_set(cinfo6->flags,
 777                          atomic_read(&cp->cp_state) == RDS_CONN_UP,
 778                          CONNECTED);
 779        /* Just return 1 as there is no error case. This is a helper function
 780         * for rds_walk_conn_path_info() and it wants a return value.
 781         */
 782        return 1;
 783}
 784#endif
 785
 786static void rds_conn_info(struct socket *sock, unsigned int len,
 787                          struct rds_info_iterator *iter,
 788                          struct rds_info_lengths *lens)
 789{
 790        u64 buffer[(sizeof(struct rds_info_connection) + 7) / 8];
 791
 792        rds_walk_conn_path_info(sock, len, iter, lens,
 793                                rds_conn_info_visitor,
 794                                buffer,
 795                                sizeof(struct rds_info_connection));
 796}
 797
 798#if IS_ENABLED(CONFIG_IPV6)
 799static void rds6_conn_info(struct socket *sock, unsigned int len,
 800                           struct rds_info_iterator *iter,
 801                           struct rds_info_lengths *lens)
 802{
 803        u64 buffer[(sizeof(struct rds6_info_connection) + 7) / 8];
 804
 805        rds_walk_conn_path_info(sock, len, iter, lens,
 806                                rds6_conn_info_visitor,
 807                                buffer,
 808                                sizeof(struct rds6_info_connection));
 809}
 810#endif
 811
 812int rds_conn_init(void)
 813{
 814        int ret;
 815
 816        ret = rds_loop_net_init(); /* register pernet callback */
 817        if (ret)
 818                return ret;
 819
 820        rds_conn_slab = kmem_cache_create("rds_connection",
 821                                          sizeof(struct rds_connection),
 822                                          0, 0, NULL);
 823        if (!rds_conn_slab) {
 824                rds_loop_net_exit();
 825                return -ENOMEM;
 826        }
 827
 828        rds_info_register_func(RDS_INFO_CONNECTIONS, rds_conn_info);
 829        rds_info_register_func(RDS_INFO_SEND_MESSAGES,
 830                               rds_conn_message_info_send);
 831        rds_info_register_func(RDS_INFO_RETRANS_MESSAGES,
 832                               rds_conn_message_info_retrans);
 833#if IS_ENABLED(CONFIG_IPV6)
 834        rds_info_register_func(RDS6_INFO_CONNECTIONS, rds6_conn_info);
 835        rds_info_register_func(RDS6_INFO_SEND_MESSAGES,
 836                               rds6_conn_message_info_send);
 837        rds_info_register_func(RDS6_INFO_RETRANS_MESSAGES,
 838                               rds6_conn_message_info_retrans);
 839#endif
 840        return 0;
 841}
 842
 843void rds_conn_exit(void)
 844{
 845        rds_loop_net_exit(); /* unregister pernet callback */
 846        rds_loop_exit();
 847
 848        WARN_ON(!hlist_empty(rds_conn_hash));
 849
 850        kmem_cache_destroy(rds_conn_slab);
 851
 852        rds_info_deregister_func(RDS_INFO_CONNECTIONS, rds_conn_info);
 853        rds_info_deregister_func(RDS_INFO_SEND_MESSAGES,
 854                                 rds_conn_message_info_send);
 855        rds_info_deregister_func(RDS_INFO_RETRANS_MESSAGES,
 856                                 rds_conn_message_info_retrans);
 857#if IS_ENABLED(CONFIG_IPV6)
 858        rds_info_deregister_func(RDS6_INFO_CONNECTIONS, rds6_conn_info);
 859        rds_info_deregister_func(RDS6_INFO_SEND_MESSAGES,
 860                                 rds6_conn_message_info_send);
 861        rds_info_deregister_func(RDS6_INFO_RETRANS_MESSAGES,
 862                                 rds6_conn_message_info_retrans);
 863#endif
 864}
 865
 866/*
 867 * Force a disconnect
 868 */
 869void rds_conn_path_drop(struct rds_conn_path *cp, bool destroy)
 870{
 871        atomic_set(&cp->cp_state, RDS_CONN_ERROR);
 872
 873        rcu_read_lock();
 874        if (!destroy && rds_destroy_pending(cp->cp_conn)) {
 875                rcu_read_unlock();
 876                return;
 877        }
 878        queue_work(rds_wq, &cp->cp_down_w);
 879        rcu_read_unlock();
 880}
 881EXPORT_SYMBOL_GPL(rds_conn_path_drop);
 882
 883void rds_conn_drop(struct rds_connection *conn)
 884{
 885        WARN_ON(conn->c_trans->t_mp_capable);
 886        rds_conn_path_drop(&conn->c_path[0], false);
 887}
 888EXPORT_SYMBOL_GPL(rds_conn_drop);
 889
 890/*
 891 * If the connection is down, trigger a connect. We may have scheduled a
 892 * delayed reconnect however - in this case we should not interfere.
 893 */
 894void rds_conn_path_connect_if_down(struct rds_conn_path *cp)
 895{
 896        rcu_read_lock();
 897        if (rds_destroy_pending(cp->cp_conn)) {
 898                rcu_read_unlock();
 899                return;
 900        }
 901        if (rds_conn_path_state(cp) == RDS_CONN_DOWN &&
 902            !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags))
 903                queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
 904        rcu_read_unlock();
 905}
 906EXPORT_SYMBOL_GPL(rds_conn_path_connect_if_down);
 907
 908void rds_conn_connect_if_down(struct rds_connection *conn)
 909{
 910        WARN_ON(conn->c_trans->t_mp_capable);
 911        rds_conn_path_connect_if_down(&conn->c_path[0]);
 912}
 913EXPORT_SYMBOL_GPL(rds_conn_connect_if_down);
 914
 915void
 916__rds_conn_path_error(struct rds_conn_path *cp, const char *fmt, ...)
 917{
 918        va_list ap;
 919
 920        va_start(ap, fmt);
 921        vprintk(fmt, ap);
 922        va_end(ap);
 923
 924        rds_conn_path_drop(cp, false);
 925}
 926