linux/net/rds/connection.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/kernel.h>
  34#include <linux/list.h>
  35#include <linux/slab.h>
  36#include <linux/export.h>
  37#include <net/ipv6.h>
  38#include <net/inet6_hashtables.h>
  39#include <net/addrconf.h>
  40
  41#include "rds.h"
  42#include "loop.h"
  43
  44#define RDS_CONNECTION_HASH_BITS 12
  45#define RDS_CONNECTION_HASH_ENTRIES (1 << RDS_CONNECTION_HASH_BITS)
  46#define RDS_CONNECTION_HASH_MASK (RDS_CONNECTION_HASH_ENTRIES - 1)
  47
  48/* converting this to RCU is a chore for another day.. */
  49static DEFINE_SPINLOCK(rds_conn_lock);
  50static unsigned long rds_conn_count;
  51static struct hlist_head rds_conn_hash[RDS_CONNECTION_HASH_ENTRIES];
  52static struct kmem_cache *rds_conn_slab;
  53
  54static struct hlist_head *rds_conn_bucket(const struct in6_addr *laddr,
  55                                          const struct in6_addr *faddr)
  56{
  57        static u32 rds6_hash_secret __read_mostly;
  58        static u32 rds_hash_secret __read_mostly;
  59
  60        u32 lhash, fhash, hash;
  61
  62        net_get_random_once(&rds_hash_secret, sizeof(rds_hash_secret));
  63        net_get_random_once(&rds6_hash_secret, sizeof(rds6_hash_secret));
  64
  65        lhash = (__force u32)laddr->s6_addr32[3];
  66#if IS_ENABLED(CONFIG_IPV6)
  67        fhash = __ipv6_addr_jhash(faddr, rds6_hash_secret);
  68#else
  69        fhash = (__force u32)faddr->s6_addr32[3];
  70#endif
  71        hash = __inet_ehashfn(lhash, 0, fhash, 0, rds_hash_secret);
  72
  73        return &rds_conn_hash[hash & RDS_CONNECTION_HASH_MASK];
  74}
  75
  76#define rds_conn_info_set(var, test, suffix) do {               \
  77        if (test)                                               \
  78                var |= RDS_INFO_CONNECTION_FLAG_##suffix;       \
  79} while (0)
  80
  81/* rcu read lock must be held or the connection spinlock */
  82static struct rds_connection *rds_conn_lookup(struct net *net,
  83                                              struct hlist_head *head,
  84                                              const struct in6_addr *laddr,
  85                                              const struct in6_addr *faddr,
  86                                              struct rds_transport *trans,
  87                                              u8 tos, int dev_if)
  88{
  89        struct rds_connection *conn, *ret = NULL;
  90
  91        hlist_for_each_entry_rcu(conn, head, c_hash_node) {
  92                if (ipv6_addr_equal(&conn->c_faddr, faddr) &&
  93                    ipv6_addr_equal(&conn->c_laddr, laddr) &&
  94                    conn->c_trans == trans &&
  95                    conn->c_tos == tos &&
  96                    net == rds_conn_net(conn) &&
  97                    conn->c_dev_if == dev_if) {
  98                        ret = conn;
  99                        break;
 100                }
 101        }
 102        rdsdebug("returning conn %p for %pI6c -> %pI6c\n", ret,
 103                 laddr, faddr);
 104        return ret;
 105}
 106
 107/*
 108 * This is called by transports as they're bringing down a connection.
 109 * It clears partial message state so that the transport can start sending
 110 * and receiving over this connection again in the future.  It is up to
 111 * the transport to have serialized this call with its send and recv.
 112 */
 113static void rds_conn_path_reset(struct rds_conn_path *cp)
 114{
 115        struct rds_connection *conn = cp->cp_conn;
 116
 117        rdsdebug("connection %pI6c to %pI6c reset\n",
 118                 &conn->c_laddr, &conn->c_faddr);
 119
 120        rds_stats_inc(s_conn_reset);
 121        rds_send_path_reset(cp);
 122        cp->cp_flags = 0;
 123
 124        /* Do not clear next_rx_seq here, else we cannot distinguish
 125         * retransmitted packets from new packets, and will hand all
 126         * of them to the application. That is not consistent with the
 127         * reliability guarantees of RDS. */
 128}
 129
 130static void __rds_conn_path_init(struct rds_connection *conn,
 131                                 struct rds_conn_path *cp, bool is_outgoing)
 132{
 133        spin_lock_init(&cp->cp_lock);
 134        cp->cp_next_tx_seq = 1;
 135        init_waitqueue_head(&cp->cp_waitq);
 136        INIT_LIST_HEAD(&cp->cp_send_queue);
 137        INIT_LIST_HEAD(&cp->cp_retrans);
 138
 139        cp->cp_conn = conn;
 140        atomic_set(&cp->cp_state, RDS_CONN_DOWN);
 141        cp->cp_send_gen = 0;
 142        cp->cp_reconnect_jiffies = 0;
 143        cp->cp_conn->c_proposed_version = RDS_PROTOCOL_VERSION;
 144        INIT_DELAYED_WORK(&cp->cp_send_w, rds_send_worker);
 145        INIT_DELAYED_WORK(&cp->cp_recv_w, rds_recv_worker);
 146        INIT_DELAYED_WORK(&cp->cp_conn_w, rds_connect_worker);
 147        INIT_WORK(&cp->cp_down_w, rds_shutdown_worker);
 148        mutex_init(&cp->cp_cm_lock);
 149        cp->cp_flags = 0;
 150}
 151
 152/*
 153 * There is only every one 'conn' for a given pair of addresses in the
 154 * system at a time.  They contain messages to be retransmitted and so
 155 * span the lifetime of the actual underlying transport connections.
 156 *
 157 * For now they are not garbage collected once they're created.  They
 158 * are torn down as the module is removed, if ever.
 159 */
 160static struct rds_connection *__rds_conn_create(struct net *net,
 161                                                const struct in6_addr *laddr,
 162                                                const struct in6_addr *faddr,
 163                                                struct rds_transport *trans,
 164                                                gfp_t gfp, u8 tos,
 165                                                int is_outgoing,
 166                                                int dev_if)
 167{
 168        struct rds_connection *conn, *parent = NULL;
 169        struct hlist_head *head = rds_conn_bucket(laddr, faddr);
 170        struct rds_transport *loop_trans;
 171        unsigned long flags;
 172        int ret, i;
 173        int npaths = (trans->t_mp_capable ? RDS_MPATH_WORKERS : 1);
 174
 175        rcu_read_lock();
 176        conn = rds_conn_lookup(net, head, laddr, faddr, trans, tos, dev_if);
 177        if (conn &&
 178            conn->c_loopback &&
 179            conn->c_trans != &rds_loop_transport &&
 180            ipv6_addr_equal(laddr, faddr) &&
 181            !is_outgoing) {
 182                /* This is a looped back IB connection, and we're
 183                 * called by the code handling the incoming connect.
 184                 * We need a second connection object into which we
 185                 * can stick the other QP. */
 186                parent = conn;
 187                conn = parent->c_passive;
 188        }
 189        rcu_read_unlock();
 190        if (conn)
 191                goto out;
 192
 193        conn = kmem_cache_zalloc(rds_conn_slab, gfp);
 194        if (!conn) {
 195                conn = ERR_PTR(-ENOMEM);
 196                goto out;
 197        }
 198        conn->c_path = kcalloc(npaths, sizeof(struct rds_conn_path), gfp);
 199        if (!conn->c_path) {
 200                kmem_cache_free(rds_conn_slab, conn);
 201                conn = ERR_PTR(-ENOMEM);
 202                goto out;
 203        }
 204
 205        INIT_HLIST_NODE(&conn->c_hash_node);
 206        conn->c_laddr = *laddr;
 207        conn->c_isv6 = !ipv6_addr_v4mapped(laddr);
 208        conn->c_faddr = *faddr;
 209        conn->c_dev_if = dev_if;
 210        conn->c_tos = tos;
 211
 212#if IS_ENABLED(CONFIG_IPV6)
 213        /* If the local address is link local, set c_bound_if to be the
 214         * index used for this connection.  Otherwise, set it to 0 as
 215         * the socket is not bound to an interface.  c_bound_if is used
 216         * to look up a socket when a packet is received
 217         */
 218        if (ipv6_addr_type(laddr) & IPV6_ADDR_LINKLOCAL)
 219                conn->c_bound_if = dev_if;
 220        else
 221#endif
 222                conn->c_bound_if = 0;
 223
 224        rds_conn_net_set(conn, net);
 225
 226        ret = rds_cong_get_maps(conn);
 227        if (ret) {
 228                kfree(conn->c_path);
 229                kmem_cache_free(rds_conn_slab, conn);
 230                conn = ERR_PTR(ret);
 231                goto out;
 232        }
 233
 234        /*
 235         * This is where a connection becomes loopback.  If *any* RDS sockets
 236         * can bind to the destination address then we'd rather the messages
 237         * flow through loopback rather than either transport.
 238         */
 239        loop_trans = rds_trans_get_preferred(net, faddr, conn->c_dev_if);
 240        if (loop_trans) {
 241                rds_trans_put(loop_trans);
 242                conn->c_loopback = 1;
 243                if (trans->t_prefer_loopback) {
 244                        if (likely(is_outgoing)) {
 245                                /* "outgoing" connection to local address.
 246                                 * Protocol says it wants the connection
 247                                 * handled by the loopback transport.
 248                                 * This is what TCP does.
 249                                 */
 250                                trans = &rds_loop_transport;
 251                        } else {
 252                                /* No transport currently in use
 253                                 * should end up here, but if it
 254                                 * does, reset/destroy the connection.
 255                                 */
 256                                kmem_cache_free(rds_conn_slab, conn);
 257                                conn = ERR_PTR(-EOPNOTSUPP);
 258                                goto out;
 259                        }
 260                }
 261        }
 262
 263        conn->c_trans = trans;
 264
 265        init_waitqueue_head(&conn->c_hs_waitq);
 266        for (i = 0; i < npaths; i++) {
 267                __rds_conn_path_init(conn, &conn->c_path[i],
 268                                     is_outgoing);
 269                conn->c_path[i].cp_index = i;
 270        }
 271        rcu_read_lock();
 272        if (rds_destroy_pending(conn))
 273                ret = -ENETDOWN;
 274        else
 275                ret = trans->conn_alloc(conn, GFP_ATOMIC);
 276        if (ret) {
 277                rcu_read_unlock();
 278                kfree(conn->c_path);
 279                kmem_cache_free(rds_conn_slab, conn);
 280                conn = ERR_PTR(ret);
 281                goto out;
 282        }
 283
 284        rdsdebug("allocated conn %p for %pI6c -> %pI6c over %s %s\n",
 285                 conn, laddr, faddr,
 286                 strnlen(trans->t_name, sizeof(trans->t_name)) ?
 287                 trans->t_name : "[unknown]", is_outgoing ? "(outgoing)" : "");
 288
 289        /*
 290         * Since we ran without holding the conn lock, someone could
 291         * have created the same conn (either normal or passive) in the
 292         * interim. We check while holding the lock. If we won, we complete
 293         * init and return our conn. If we lost, we rollback and return the
 294         * other one.
 295         */
 296        spin_lock_irqsave(&rds_conn_lock, flags);
 297        if (parent) {
 298                /* Creating passive conn */
 299                if (parent->c_passive) {
 300                        trans->conn_free(conn->c_path[0].cp_transport_data);
 301                        kfree(conn->c_path);
 302                        kmem_cache_free(rds_conn_slab, conn);
 303                        conn = parent->c_passive;
 304                } else {
 305                        parent->c_passive = conn;
 306                        rds_cong_add_conn(conn);
 307                        rds_conn_count++;
 308                }
 309        } else {
 310                /* Creating normal conn */
 311                struct rds_connection *found;
 312
 313                found = rds_conn_lookup(net, head, laddr, faddr, trans,
 314                                        tos, dev_if);
 315                if (found) {
 316                        struct rds_conn_path *cp;
 317                        int i;
 318
 319                        for (i = 0; i < npaths; i++) {
 320                                cp = &conn->c_path[i];
 321                                /* The ->conn_alloc invocation may have
 322                                 * allocated resource for all paths, so all
 323                                 * of them may have to be freed here.
 324                                 */
 325                                if (cp->cp_transport_data)
 326                                        trans->conn_free(cp->cp_transport_data);
 327                        }
 328                        kfree(conn->c_path);
 329                        kmem_cache_free(rds_conn_slab, conn);
 330                        conn = found;
 331                } else {
 332                        conn->c_my_gen_num = rds_gen_num;
 333                        conn->c_peer_gen_num = 0;
 334                        hlist_add_head_rcu(&conn->c_hash_node, head);
 335                        rds_cong_add_conn(conn);
 336                        rds_conn_count++;
 337                }
 338        }
 339        spin_unlock_irqrestore(&rds_conn_lock, flags);
 340        rcu_read_unlock();
 341
 342out:
 343        return conn;
 344}
 345
 346struct rds_connection *rds_conn_create(struct net *net,
 347                                       const struct in6_addr *laddr,
 348                                       const struct in6_addr *faddr,
 349                                       struct rds_transport *trans, u8 tos,
 350                                       gfp_t gfp, int dev_if)
 351{
 352        return __rds_conn_create(net, laddr, faddr, trans, gfp, tos, 0, dev_if);
 353}
 354EXPORT_SYMBOL_GPL(rds_conn_create);
 355
 356struct rds_connection *rds_conn_create_outgoing(struct net *net,
 357                                                const struct in6_addr *laddr,
 358                                                const struct in6_addr *faddr,
 359                                                struct rds_transport *trans,
 360                                                u8 tos, gfp_t gfp, int dev_if)
 361{
 362        return __rds_conn_create(net, laddr, faddr, trans, gfp, tos, 1, dev_if);
 363}
 364EXPORT_SYMBOL_GPL(rds_conn_create_outgoing);
 365
 366void rds_conn_shutdown(struct rds_conn_path *cp)
 367{
 368        struct rds_connection *conn = cp->cp_conn;
 369
 370        /* shut it down unless it's down already */
 371        if (!rds_conn_path_transition(cp, RDS_CONN_DOWN, RDS_CONN_DOWN)) {
 372                /*
 373                 * Quiesce the connection mgmt handlers before we start tearing
 374                 * things down. We don't hold the mutex for the entire
 375                 * duration of the shutdown operation, else we may be
 376                 * deadlocking with the CM handler. Instead, the CM event
 377                 * handler is supposed to check for state DISCONNECTING
 378                 */
 379                mutex_lock(&cp->cp_cm_lock);
 380                if (!rds_conn_path_transition(cp, RDS_CONN_UP,
 381                                              RDS_CONN_DISCONNECTING) &&
 382                    !rds_conn_path_transition(cp, RDS_CONN_ERROR,
 383                                              RDS_CONN_DISCONNECTING)) {
 384                        rds_conn_path_error(cp,
 385                                            "shutdown called in state %d\n",
 386                                            atomic_read(&cp->cp_state));
 387                        mutex_unlock(&cp->cp_cm_lock);
 388                        return;
 389                }
 390                mutex_unlock(&cp->cp_cm_lock);
 391
 392                wait_event(cp->cp_waitq,
 393                           !test_bit(RDS_IN_XMIT, &cp->cp_flags));
 394                wait_event(cp->cp_waitq,
 395                           !test_bit(RDS_RECV_REFILL, &cp->cp_flags));
 396
 397                conn->c_trans->conn_path_shutdown(cp);
 398                rds_conn_path_reset(cp);
 399
 400                if (!rds_conn_path_transition(cp, RDS_CONN_DISCONNECTING,
 401                                              RDS_CONN_DOWN) &&
 402                    !rds_conn_path_transition(cp, RDS_CONN_ERROR,
 403                                              RDS_CONN_DOWN)) {
 404                        /* This can happen - eg when we're in the middle of tearing
 405                         * down the connection, and someone unloads the rds module.
 406                         * Quite reproducible with loopback connections.
 407                         * Mostly harmless.
 408                         *
 409                         * Note that this also happens with rds-tcp because
 410                         * we could have triggered rds_conn_path_drop in irq
 411                         * mode from rds_tcp_state change on the receipt of
 412                         * a FIN, thus we need to recheck for RDS_CONN_ERROR
 413                         * here.
 414                         */
 415                        rds_conn_path_error(cp, "%s: failed to transition "
 416                                            "to state DOWN, current state "
 417                                            "is %d\n", __func__,
 418                                            atomic_read(&cp->cp_state));
 419                        return;
 420                }
 421        }
 422
 423        /* Then reconnect if it's still live.
 424         * The passive side of an IB loopback connection is never added
 425         * to the conn hash, so we never trigger a reconnect on this
 426         * conn - the reconnect is always triggered by the active peer. */
 427        cancel_delayed_work_sync(&cp->cp_conn_w);
 428        rcu_read_lock();
 429        if (!hlist_unhashed(&conn->c_hash_node)) {
 430                rcu_read_unlock();
 431                rds_queue_reconnect(cp);
 432        } else {
 433                rcu_read_unlock();
 434        }
 435}
 436
 437/* destroy a single rds_conn_path. rds_conn_destroy() iterates over
 438 * all paths using rds_conn_path_destroy()
 439 */
 440static void rds_conn_path_destroy(struct rds_conn_path *cp)
 441{
 442        struct rds_message *rm, *rtmp;
 443
 444        if (!cp->cp_transport_data)
 445                return;
 446
 447        /* make sure lingering queued work won't try to ref the conn */
 448        cancel_delayed_work_sync(&cp->cp_send_w);
 449        cancel_delayed_work_sync(&cp->cp_recv_w);
 450
 451        rds_conn_path_drop(cp, true);
 452        flush_work(&cp->cp_down_w);
 453
 454        /* tear down queued messages */
 455        list_for_each_entry_safe(rm, rtmp,
 456                                 &cp->cp_send_queue,
 457                                 m_conn_item) {
 458                list_del_init(&rm->m_conn_item);
 459                BUG_ON(!list_empty(&rm->m_sock_item));
 460                rds_message_put(rm);
 461        }
 462        if (cp->cp_xmit_rm)
 463                rds_message_put(cp->cp_xmit_rm);
 464
 465        WARN_ON(delayed_work_pending(&cp->cp_send_w));
 466        WARN_ON(delayed_work_pending(&cp->cp_recv_w));
 467        WARN_ON(delayed_work_pending(&cp->cp_conn_w));
 468        WARN_ON(work_pending(&cp->cp_down_w));
 469
 470        cp->cp_conn->c_trans->conn_free(cp->cp_transport_data);
 471}
 472
 473/*
 474 * Stop and free a connection.
 475 *
 476 * This can only be used in very limited circumstances.  It assumes that once
 477 * the conn has been shutdown that no one else is referencing the connection.
 478 * We can only ensure this in the rmmod path in the current code.
 479 */
 480void rds_conn_destroy(struct rds_connection *conn)
 481{
 482        unsigned long flags;
 483        int i;
 484        struct rds_conn_path *cp;
 485        int npaths = (conn->c_trans->t_mp_capable ? RDS_MPATH_WORKERS : 1);
 486
 487        rdsdebug("freeing conn %p for %pI4 -> "
 488                 "%pI4\n", conn, &conn->c_laddr,
 489                 &conn->c_faddr);
 490
 491        /* Ensure conn will not be scheduled for reconnect */
 492        spin_lock_irq(&rds_conn_lock);
 493        hlist_del_init_rcu(&conn->c_hash_node);
 494        spin_unlock_irq(&rds_conn_lock);
 495        synchronize_rcu();
 496
 497        /* shut the connection down */
 498        for (i = 0; i < npaths; i++) {
 499                cp = &conn->c_path[i];
 500                rds_conn_path_destroy(cp);
 501                BUG_ON(!list_empty(&cp->cp_retrans));
 502        }
 503
 504        /*
 505         * The congestion maps aren't freed up here.  They're
 506         * freed by rds_cong_exit() after all the connections
 507         * have been freed.
 508         */
 509        rds_cong_remove_conn(conn);
 510
 511        kfree(conn->c_path);
 512        kmem_cache_free(rds_conn_slab, conn);
 513
 514        spin_lock_irqsave(&rds_conn_lock, flags);
 515        rds_conn_count--;
 516        spin_unlock_irqrestore(&rds_conn_lock, flags);
 517}
 518EXPORT_SYMBOL_GPL(rds_conn_destroy);
 519
 520static void __rds_inc_msg_cp(struct rds_incoming *inc,
 521                             struct rds_info_iterator *iter,
 522                             void *saddr, void *daddr, int flip, bool isv6)
 523{
 524#if IS_ENABLED(CONFIG_IPV6)
 525        if (isv6)
 526                rds6_inc_info_copy(inc, iter, saddr, daddr, flip);
 527        else
 528#endif
 529                rds_inc_info_copy(inc, iter, *(__be32 *)saddr,
 530                                  *(__be32 *)daddr, flip);
 531}
 532
 533static void rds_conn_message_info_cmn(struct socket *sock, unsigned int len,
 534                                      struct rds_info_iterator *iter,
 535                                      struct rds_info_lengths *lens,
 536                                      int want_send, bool isv6)
 537{
 538        struct hlist_head *head;
 539        struct list_head *list;
 540        struct rds_connection *conn;
 541        struct rds_message *rm;
 542        unsigned int total = 0;
 543        unsigned long flags;
 544        size_t i;
 545        int j;
 546
 547        if (isv6)
 548                len /= sizeof(struct rds6_info_message);
 549        else
 550                len /= sizeof(struct rds_info_message);
 551
 552        rcu_read_lock();
 553
 554        for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
 555             i++, head++) {
 556                hlist_for_each_entry_rcu(conn, head, c_hash_node) {
 557                        struct rds_conn_path *cp;
 558                        int npaths;
 559
 560                        if (!isv6 && conn->c_isv6)
 561                                continue;
 562
 563                        npaths = (conn->c_trans->t_mp_capable ?
 564                                 RDS_MPATH_WORKERS : 1);
 565
 566                        for (j = 0; j < npaths; j++) {
 567                                cp = &conn->c_path[j];
 568                                if (want_send)
 569                                        list = &cp->cp_send_queue;
 570                                else
 571                                        list = &cp->cp_retrans;
 572
 573                                spin_lock_irqsave(&cp->cp_lock, flags);
 574
 575                                /* XXX too lazy to maintain counts.. */
 576                                list_for_each_entry(rm, list, m_conn_item) {
 577                                        total++;
 578                                        if (total <= len)
 579                                                __rds_inc_msg_cp(&rm->m_inc,
 580                                                                 iter,
 581                                                                 &conn->c_laddr,
 582                                                                 &conn->c_faddr,
 583                                                                 0, isv6);
 584                                }
 585
 586                                spin_unlock_irqrestore(&cp->cp_lock, flags);
 587                        }
 588                }
 589        }
 590        rcu_read_unlock();
 591
 592        lens->nr = total;
 593        if (isv6)
 594                lens->each = sizeof(struct rds6_info_message);
 595        else
 596                lens->each = sizeof(struct rds_info_message);
 597}
 598
 599static void rds_conn_message_info(struct socket *sock, unsigned int len,
 600                                  struct rds_info_iterator *iter,
 601                                  struct rds_info_lengths *lens,
 602                                  int want_send)
 603{
 604        rds_conn_message_info_cmn(sock, len, iter, lens, want_send, false);
 605}
 606
 607#if IS_ENABLED(CONFIG_IPV6)
 608static void rds6_conn_message_info(struct socket *sock, unsigned int len,
 609                                   struct rds_info_iterator *iter,
 610                                   struct rds_info_lengths *lens,
 611                                   int want_send)
 612{
 613        rds_conn_message_info_cmn(sock, len, iter, lens, want_send, true);
 614}
 615#endif
 616
 617static void rds_conn_message_info_send(struct socket *sock, unsigned int len,
 618                                       struct rds_info_iterator *iter,
 619                                       struct rds_info_lengths *lens)
 620{
 621        rds_conn_message_info(sock, len, iter, lens, 1);
 622}
 623
 624#if IS_ENABLED(CONFIG_IPV6)
 625static void rds6_conn_message_info_send(struct socket *sock, unsigned int len,
 626                                        struct rds_info_iterator *iter,
 627                                        struct rds_info_lengths *lens)
 628{
 629        rds6_conn_message_info(sock, len, iter, lens, 1);
 630}
 631#endif
 632
 633static void rds_conn_message_info_retrans(struct socket *sock,
 634                                          unsigned int len,
 635                                          struct rds_info_iterator *iter,
 636                                          struct rds_info_lengths *lens)
 637{
 638        rds_conn_message_info(sock, len, iter, lens, 0);
 639}
 640
 641#if IS_ENABLED(CONFIG_IPV6)
 642static void rds6_conn_message_info_retrans(struct socket *sock,
 643                                           unsigned int len,
 644                                           struct rds_info_iterator *iter,
 645                                           struct rds_info_lengths *lens)
 646{
 647        rds6_conn_message_info(sock, len, iter, lens, 0);
 648}
 649#endif
 650
 651void rds_for_each_conn_info(struct socket *sock, unsigned int len,
 652                          struct rds_info_iterator *iter,
 653                          struct rds_info_lengths *lens,
 654                          int (*visitor)(struct rds_connection *, void *),
 655                          u64 *buffer,
 656                          size_t item_len)
 657{
 658        struct hlist_head *head;
 659        struct rds_connection *conn;
 660        size_t i;
 661
 662        rcu_read_lock();
 663
 664        lens->nr = 0;
 665        lens->each = item_len;
 666
 667        for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
 668             i++, head++) {
 669                hlist_for_each_entry_rcu(conn, head, c_hash_node) {
 670
 671                        /* XXX no c_lock usage.. */
 672                        if (!visitor(conn, buffer))
 673                                continue;
 674
 675                        /* We copy as much as we can fit in the buffer,
 676                         * but we count all items so that the caller
 677                         * can resize the buffer. */
 678                        if (len >= item_len) {
 679                                rds_info_copy(iter, buffer, item_len);
 680                                len -= item_len;
 681                        }
 682                        lens->nr++;
 683                }
 684        }
 685        rcu_read_unlock();
 686}
 687EXPORT_SYMBOL_GPL(rds_for_each_conn_info);
 688
 689static void rds_walk_conn_path_info(struct socket *sock, unsigned int len,
 690                                    struct rds_info_iterator *iter,
 691                                    struct rds_info_lengths *lens,
 692                                    int (*visitor)(struct rds_conn_path *, void *),
 693                                    u64 *buffer,
 694                                    size_t item_len)
 695{
 696        struct hlist_head *head;
 697        struct rds_connection *conn;
 698        size_t i;
 699
 700        rcu_read_lock();
 701
 702        lens->nr = 0;
 703        lens->each = item_len;
 704
 705        for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
 706             i++, head++) {
 707                hlist_for_each_entry_rcu(conn, head, c_hash_node) {
 708                        struct rds_conn_path *cp;
 709
 710                        /* XXX We only copy the information from the first
 711                         * path for now.  The problem is that if there are
 712                         * more than one underlying paths, we cannot report
 713                         * information of all of them using the existing
 714                         * API.  For example, there is only one next_tx_seq,
 715                         * which path's next_tx_seq should we report?  It is
 716                         * a bug in the design of MPRDS.
 717                         */
 718                        cp = conn->c_path;
 719
 720                        /* XXX no cp_lock usage.. */
 721                        if (!visitor(cp, buffer))
 722                                continue;
 723
 724                        /* We copy as much as we can fit in the buffer,
 725                         * but we count all items so that the caller
 726                         * can resize the buffer.
 727                         */
 728                        if (len >= item_len) {
 729                                rds_info_copy(iter, buffer, item_len);
 730                                len -= item_len;
 731                        }
 732                        lens->nr++;
 733                }
 734        }
 735        rcu_read_unlock();
 736}
 737
 738static int rds_conn_info_visitor(struct rds_conn_path *cp, void *buffer)
 739{
 740        struct rds_info_connection *cinfo = buffer;
 741        struct rds_connection *conn = cp->cp_conn;
 742
 743        if (conn->c_isv6)
 744                return 0;
 745
 746        cinfo->next_tx_seq = cp->cp_next_tx_seq;
 747        cinfo->next_rx_seq = cp->cp_next_rx_seq;
 748        cinfo->laddr = conn->c_laddr.s6_addr32[3];
 749        cinfo->faddr = conn->c_faddr.s6_addr32[3];
 750        cinfo->tos = conn->c_tos;
 751        strncpy(cinfo->transport, conn->c_trans->t_name,
 752                sizeof(cinfo->transport));
 753        cinfo->flags = 0;
 754
 755        rds_conn_info_set(cinfo->flags, test_bit(RDS_IN_XMIT, &cp->cp_flags),
 756                          SENDING);
 757        /* XXX Future: return the state rather than these funky bits */
 758        rds_conn_info_set(cinfo->flags,
 759                          atomic_read(&cp->cp_state) == RDS_CONN_CONNECTING,
 760                          CONNECTING);
 761        rds_conn_info_set(cinfo->flags,
 762                          atomic_read(&cp->cp_state) == RDS_CONN_UP,
 763                          CONNECTED);
 764        return 1;
 765}
 766
 767#if IS_ENABLED(CONFIG_IPV6)
 768static int rds6_conn_info_visitor(struct rds_conn_path *cp, void *buffer)
 769{
 770        struct rds6_info_connection *cinfo6 = buffer;
 771        struct rds_connection *conn = cp->cp_conn;
 772
 773        cinfo6->next_tx_seq = cp->cp_next_tx_seq;
 774        cinfo6->next_rx_seq = cp->cp_next_rx_seq;
 775        cinfo6->laddr = conn->c_laddr;
 776        cinfo6->faddr = conn->c_faddr;
 777        strncpy(cinfo6->transport, conn->c_trans->t_name,
 778                sizeof(cinfo6->transport));
 779        cinfo6->flags = 0;
 780
 781        rds_conn_info_set(cinfo6->flags, test_bit(RDS_IN_XMIT, &cp->cp_flags),
 782                          SENDING);
 783        /* XXX Future: return the state rather than these funky bits */
 784        rds_conn_info_set(cinfo6->flags,
 785                          atomic_read(&cp->cp_state) == RDS_CONN_CONNECTING,
 786                          CONNECTING);
 787        rds_conn_info_set(cinfo6->flags,
 788                          atomic_read(&cp->cp_state) == RDS_CONN_UP,
 789                          CONNECTED);
 790        /* Just return 1 as there is no error case. This is a helper function
 791         * for rds_walk_conn_path_info() and it wants a return value.
 792         */
 793        return 1;
 794}
 795#endif
 796
 797static void rds_conn_info(struct socket *sock, unsigned int len,
 798                          struct rds_info_iterator *iter,
 799                          struct rds_info_lengths *lens)
 800{
 801        u64 buffer[(sizeof(struct rds_info_connection) + 7) / 8];
 802
 803        rds_walk_conn_path_info(sock, len, iter, lens,
 804                                rds_conn_info_visitor,
 805                                buffer,
 806                                sizeof(struct rds_info_connection));
 807}
 808
 809#if IS_ENABLED(CONFIG_IPV6)
 810static void rds6_conn_info(struct socket *sock, unsigned int len,
 811                           struct rds_info_iterator *iter,
 812                           struct rds_info_lengths *lens)
 813{
 814        u64 buffer[(sizeof(struct rds6_info_connection) + 7) / 8];
 815
 816        rds_walk_conn_path_info(sock, len, iter, lens,
 817                                rds6_conn_info_visitor,
 818                                buffer,
 819                                sizeof(struct rds6_info_connection));
 820}
 821#endif
 822
 823int rds_conn_init(void)
 824{
 825        int ret;
 826
 827        ret = rds_loop_net_init(); /* register pernet callback */
 828        if (ret)
 829                return ret;
 830
 831        rds_conn_slab = kmem_cache_create("rds_connection",
 832                                          sizeof(struct rds_connection),
 833                                          0, 0, NULL);
 834        if (!rds_conn_slab) {
 835                rds_loop_net_exit();
 836                return -ENOMEM;
 837        }
 838
 839        rds_info_register_func(RDS_INFO_CONNECTIONS, rds_conn_info);
 840        rds_info_register_func(RDS_INFO_SEND_MESSAGES,
 841                               rds_conn_message_info_send);
 842        rds_info_register_func(RDS_INFO_RETRANS_MESSAGES,
 843                               rds_conn_message_info_retrans);
 844#if IS_ENABLED(CONFIG_IPV6)
 845        rds_info_register_func(RDS6_INFO_CONNECTIONS, rds6_conn_info);
 846        rds_info_register_func(RDS6_INFO_SEND_MESSAGES,
 847                               rds6_conn_message_info_send);
 848        rds_info_register_func(RDS6_INFO_RETRANS_MESSAGES,
 849                               rds6_conn_message_info_retrans);
 850#endif
 851        return 0;
 852}
 853
 854void rds_conn_exit(void)
 855{
 856        rds_loop_net_exit(); /* unregister pernet callback */
 857        rds_loop_exit();
 858
 859        WARN_ON(!hlist_empty(rds_conn_hash));
 860
 861        kmem_cache_destroy(rds_conn_slab);
 862
 863        rds_info_deregister_func(RDS_INFO_CONNECTIONS, rds_conn_info);
 864        rds_info_deregister_func(RDS_INFO_SEND_MESSAGES,
 865                                 rds_conn_message_info_send);
 866        rds_info_deregister_func(RDS_INFO_RETRANS_MESSAGES,
 867                                 rds_conn_message_info_retrans);
 868#if IS_ENABLED(CONFIG_IPV6)
 869        rds_info_deregister_func(RDS6_INFO_CONNECTIONS, rds6_conn_info);
 870        rds_info_deregister_func(RDS6_INFO_SEND_MESSAGES,
 871                                 rds6_conn_message_info_send);
 872        rds_info_deregister_func(RDS6_INFO_RETRANS_MESSAGES,
 873                                 rds6_conn_message_info_retrans);
 874#endif
 875}
 876
 877/*
 878 * Force a disconnect
 879 */
 880void rds_conn_path_drop(struct rds_conn_path *cp, bool destroy)
 881{
 882        atomic_set(&cp->cp_state, RDS_CONN_ERROR);
 883
 884        rcu_read_lock();
 885        if (!destroy && rds_destroy_pending(cp->cp_conn)) {
 886                rcu_read_unlock();
 887                return;
 888        }
 889        queue_work(rds_wq, &cp->cp_down_w);
 890        rcu_read_unlock();
 891}
 892EXPORT_SYMBOL_GPL(rds_conn_path_drop);
 893
 894void rds_conn_drop(struct rds_connection *conn)
 895{
 896        WARN_ON(conn->c_trans->t_mp_capable);
 897        rds_conn_path_drop(&conn->c_path[0], false);
 898}
 899EXPORT_SYMBOL_GPL(rds_conn_drop);
 900
 901/*
 902 * If the connection is down, trigger a connect. We may have scheduled a
 903 * delayed reconnect however - in this case we should not interfere.
 904 */
 905void rds_conn_path_connect_if_down(struct rds_conn_path *cp)
 906{
 907        rcu_read_lock();
 908        if (rds_destroy_pending(cp->cp_conn)) {
 909                rcu_read_unlock();
 910                return;
 911        }
 912        if (rds_conn_path_state(cp) == RDS_CONN_DOWN &&
 913            !test_and_set_bit(RDS_RECONNECT_PENDING, &cp->cp_flags))
 914                queue_delayed_work(rds_wq, &cp->cp_conn_w, 0);
 915        rcu_read_unlock();
 916}
 917EXPORT_SYMBOL_GPL(rds_conn_path_connect_if_down);
 918
 919/* Check connectivity of all paths
 920 */
 921void rds_check_all_paths(struct rds_connection *conn)
 922{
 923        int i = 0;
 924
 925        do {
 926                rds_conn_path_connect_if_down(&conn->c_path[i]);
 927        } while (++i < conn->c_npaths);
 928}
 929
 930void rds_conn_connect_if_down(struct rds_connection *conn)
 931{
 932        WARN_ON(conn->c_trans->t_mp_capable);
 933        rds_conn_path_connect_if_down(&conn->c_path[0]);
 934}
 935EXPORT_SYMBOL_GPL(rds_conn_connect_if_down);
 936
 937void
 938__rds_conn_path_error(struct rds_conn_path *cp, const char *fmt, ...)
 939{
 940        va_list ap;
 941
 942        va_start(ap, fmt);
 943        vprintk(fmt, ap);
 944        va_end(ap);
 945
 946        rds_conn_path_drop(cp, false);
 947}
 948