linux/net/rxrpc/conn_client.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/* Client connection-specific management code.
   3 *
   4 * Copyright (C) 2016, 2020 Red Hat, Inc. All Rights Reserved.
   5 * Written by David Howells (dhowells@redhat.com)
   6 *
   7 * Client connections need to be cached for a little while after they've made a
   8 * call so as to handle retransmitted DATA packets in case the server didn't
   9 * receive the final ACK or terminating ABORT we sent it.
  10 *
  11 * There are flags of relevance to the cache:
  12 *
  13 *  (2) DONT_REUSE - The connection should be discarded as soon as possible and
  14 *      should not be reused.  This is set when an exclusive connection is used
  15 *      or a call ID counter overflows.
  16 *
  17 * The caching state may only be changed if the cache lock is held.
  18 *
  19 * There are two idle client connection expiry durations.  If the total number
  20 * of connections is below the reap threshold, we use the normal duration; if
  21 * it's above, we use the fast duration.
  22 */
  23
  24#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  25
  26#include <linux/slab.h>
  27#include <linux/idr.h>
  28#include <linux/timer.h>
  29#include <linux/sched/signal.h>
  30
  31#include "ar-internal.h"
  32
  33__read_mostly unsigned int rxrpc_reap_client_connections = 900;
  34__read_mostly unsigned long rxrpc_conn_idle_client_expiry = 2 * 60 * HZ;
  35__read_mostly unsigned long rxrpc_conn_idle_client_fast_expiry = 2 * HZ;
  36
  37/*
  38 * We use machine-unique IDs for our client connections.
  39 */
  40DEFINE_IDR(rxrpc_client_conn_ids);
  41static DEFINE_SPINLOCK(rxrpc_conn_id_lock);
  42
  43/*
  44 * Get a connection ID and epoch for a client connection from the global pool.
  45 * The connection struct pointer is then recorded in the idr radix tree.  The
  46 * epoch doesn't change until the client is rebooted (or, at least, unless the
  47 * module is unloaded).
  48 */
  49static int rxrpc_get_client_connection_id(struct rxrpc_connection *conn,
  50                                          gfp_t gfp)
  51{
  52        struct rxrpc_net *rxnet = conn->params.local->rxnet;
  53        int id;
  54
  55        _enter("");
  56
  57        idr_preload(gfp);
  58        spin_lock(&rxrpc_conn_id_lock);
  59
  60        id = idr_alloc_cyclic(&rxrpc_client_conn_ids, conn,
  61                              1, 0x40000000, GFP_NOWAIT);
  62        if (id < 0)
  63                goto error;
  64
  65        spin_unlock(&rxrpc_conn_id_lock);
  66        idr_preload_end();
  67
  68        conn->proto.epoch = rxnet->epoch;
  69        conn->proto.cid = id << RXRPC_CIDSHIFT;
  70        set_bit(RXRPC_CONN_HAS_IDR, &conn->flags);
  71        _leave(" [CID %x]", conn->proto.cid);
  72        return 0;
  73
  74error:
  75        spin_unlock(&rxrpc_conn_id_lock);
  76        idr_preload_end();
  77        _leave(" = %d", id);
  78        return id;
  79}
  80
  81/*
  82 * Release a connection ID for a client connection from the global pool.
  83 */
  84static void rxrpc_put_client_connection_id(struct rxrpc_connection *conn)
  85{
  86        if (test_bit(RXRPC_CONN_HAS_IDR, &conn->flags)) {
  87                spin_lock(&rxrpc_conn_id_lock);
  88                idr_remove(&rxrpc_client_conn_ids,
  89                           conn->proto.cid >> RXRPC_CIDSHIFT);
  90                spin_unlock(&rxrpc_conn_id_lock);
  91        }
  92}
  93
  94/*
  95 * Destroy the client connection ID tree.
  96 */
  97void rxrpc_destroy_client_conn_ids(void)
  98{
  99        struct rxrpc_connection *conn;
 100        int id;
 101
 102        if (!idr_is_empty(&rxrpc_client_conn_ids)) {
 103                idr_for_each_entry(&rxrpc_client_conn_ids, conn, id) {
 104                        pr_err("AF_RXRPC: Leaked client conn %p {%d}\n",
 105                               conn, refcount_read(&conn->ref));
 106                }
 107                BUG();
 108        }
 109
 110        idr_destroy(&rxrpc_client_conn_ids);
 111}
 112
 113/*
 114 * Allocate a connection bundle.
 115 */
 116static struct rxrpc_bundle *rxrpc_alloc_bundle(struct rxrpc_conn_parameters *cp,
 117                                               gfp_t gfp)
 118{
 119        struct rxrpc_bundle *bundle;
 120
 121        bundle = kzalloc(sizeof(*bundle), gfp);
 122        if (bundle) {
 123                bundle->params = *cp;
 124                rxrpc_get_peer(bundle->params.peer);
 125                refcount_set(&bundle->ref, 1);
 126                spin_lock_init(&bundle->channel_lock);
 127                INIT_LIST_HEAD(&bundle->waiting_calls);
 128        }
 129        return bundle;
 130}
 131
 132struct rxrpc_bundle *rxrpc_get_bundle(struct rxrpc_bundle *bundle)
 133{
 134        refcount_inc(&bundle->ref);
 135        return bundle;
 136}
 137
 138static void rxrpc_free_bundle(struct rxrpc_bundle *bundle)
 139{
 140        rxrpc_put_peer(bundle->params.peer);
 141        kfree(bundle);
 142}
 143
 144void rxrpc_put_bundle(struct rxrpc_bundle *bundle)
 145{
 146        unsigned int d = bundle->debug_id;
 147        bool dead;
 148        int r;
 149
 150        dead = __refcount_dec_and_test(&bundle->ref, &r);
 151
 152        _debug("PUT B=%x %d", d, r);
 153        if (dead)
 154                rxrpc_free_bundle(bundle);
 155}
 156
 157/*
 158 * Allocate a client connection.
 159 */
 160static struct rxrpc_connection *
 161rxrpc_alloc_client_connection(struct rxrpc_bundle *bundle, gfp_t gfp)
 162{
 163        struct rxrpc_connection *conn;
 164        struct rxrpc_net *rxnet = bundle->params.local->rxnet;
 165        int ret;
 166
 167        _enter("");
 168
 169        conn = rxrpc_alloc_connection(gfp);
 170        if (!conn) {
 171                _leave(" = -ENOMEM");
 172                return ERR_PTR(-ENOMEM);
 173        }
 174
 175        refcount_set(&conn->ref, 1);
 176        conn->bundle            = bundle;
 177        conn->params            = bundle->params;
 178        conn->out_clientflag    = RXRPC_CLIENT_INITIATED;
 179        conn->state             = RXRPC_CONN_CLIENT;
 180        conn->service_id        = conn->params.service_id;
 181
 182        ret = rxrpc_get_client_connection_id(conn, gfp);
 183        if (ret < 0)
 184                goto error_0;
 185
 186        ret = rxrpc_init_client_conn_security(conn);
 187        if (ret < 0)
 188                goto error_1;
 189
 190        atomic_inc(&rxnet->nr_conns);
 191        write_lock(&rxnet->conn_lock);
 192        list_add_tail(&conn->proc_link, &rxnet->conn_proc_list);
 193        write_unlock(&rxnet->conn_lock);
 194
 195        rxrpc_get_bundle(bundle);
 196        rxrpc_get_peer(conn->params.peer);
 197        rxrpc_get_local(conn->params.local);
 198        key_get(conn->params.key);
 199
 200        trace_rxrpc_conn(conn->debug_id, rxrpc_conn_new_client,
 201                         refcount_read(&conn->ref),
 202                         __builtin_return_address(0));
 203
 204        atomic_inc(&rxnet->nr_client_conns);
 205        trace_rxrpc_client(conn, -1, rxrpc_client_alloc);
 206        _leave(" = %p", conn);
 207        return conn;
 208
 209error_1:
 210        rxrpc_put_client_connection_id(conn);
 211error_0:
 212        kfree(conn);
 213        _leave(" = %d", ret);
 214        return ERR_PTR(ret);
 215}
 216
 217/*
 218 * Determine if a connection may be reused.
 219 */
 220static bool rxrpc_may_reuse_conn(struct rxrpc_connection *conn)
 221{
 222        struct rxrpc_net *rxnet;
 223        int id_cursor, id, distance, limit;
 224
 225        if (!conn)
 226                goto dont_reuse;
 227
 228        rxnet = conn->params.local->rxnet;
 229        if (test_bit(RXRPC_CONN_DONT_REUSE, &conn->flags))
 230                goto dont_reuse;
 231
 232        if (conn->state != RXRPC_CONN_CLIENT ||
 233            conn->proto.epoch != rxnet->epoch)
 234                goto mark_dont_reuse;
 235
 236        /* The IDR tree gets very expensive on memory if the connection IDs are
 237         * widely scattered throughout the number space, so we shall want to
 238         * kill off connections that, say, have an ID more than about four
 239         * times the maximum number of client conns away from the current
 240         * allocation point to try and keep the IDs concentrated.
 241         */
 242        id_cursor = idr_get_cursor(&rxrpc_client_conn_ids);
 243        id = conn->proto.cid >> RXRPC_CIDSHIFT;
 244        distance = id - id_cursor;
 245        if (distance < 0)
 246                distance = -distance;
 247        limit = max_t(unsigned long, atomic_read(&rxnet->nr_conns) * 4, 1024);
 248        if (distance > limit)
 249                goto mark_dont_reuse;
 250
 251        return true;
 252
 253mark_dont_reuse:
 254        set_bit(RXRPC_CONN_DONT_REUSE, &conn->flags);
 255dont_reuse:
 256        return false;
 257}
 258
 259/*
 260 * Look up the conn bundle that matches the connection parameters, adding it if
 261 * it doesn't yet exist.
 262 */
 263static struct rxrpc_bundle *rxrpc_look_up_bundle(struct rxrpc_conn_parameters *cp,
 264                                                 gfp_t gfp)
 265{
 266        static atomic_t rxrpc_bundle_id;
 267        struct rxrpc_bundle *bundle, *candidate;
 268        struct rxrpc_local *local = cp->local;
 269        struct rb_node *p, **pp, *parent;
 270        long diff;
 271
 272        _enter("{%px,%x,%u,%u}",
 273               cp->peer, key_serial(cp->key), cp->security_level, cp->upgrade);
 274
 275        if (cp->exclusive)
 276                return rxrpc_alloc_bundle(cp, gfp);
 277
 278        /* First, see if the bundle is already there. */
 279        _debug("search 1");
 280        spin_lock(&local->client_bundles_lock);
 281        p = local->client_bundles.rb_node;
 282        while (p) {
 283                bundle = rb_entry(p, struct rxrpc_bundle, local_node);
 284
 285#define cmp(X) ((long)bundle->params.X - (long)cp->X)
 286                diff = (cmp(peer) ?:
 287                        cmp(key) ?:
 288                        cmp(security_level) ?:
 289                        cmp(upgrade));
 290#undef cmp
 291                if (diff < 0)
 292                        p = p->rb_left;
 293                else if (diff > 0)
 294                        p = p->rb_right;
 295                else
 296                        goto found_bundle;
 297        }
 298        spin_unlock(&local->client_bundles_lock);
 299        _debug("not found");
 300
 301        /* It wasn't.  We need to add one. */
 302        candidate = rxrpc_alloc_bundle(cp, gfp);
 303        if (!candidate)
 304                return NULL;
 305
 306        _debug("search 2");
 307        spin_lock(&local->client_bundles_lock);
 308        pp = &local->client_bundles.rb_node;
 309        parent = NULL;
 310        while (*pp) {
 311                parent = *pp;
 312                bundle = rb_entry(parent, struct rxrpc_bundle, local_node);
 313
 314#define cmp(X) ((long)bundle->params.X - (long)cp->X)
 315                diff = (cmp(peer) ?:
 316                        cmp(key) ?:
 317                        cmp(security_level) ?:
 318                        cmp(upgrade));
 319#undef cmp
 320                if (diff < 0)
 321                        pp = &(*pp)->rb_left;
 322                else if (diff > 0)
 323                        pp = &(*pp)->rb_right;
 324                else
 325                        goto found_bundle_free;
 326        }
 327
 328        _debug("new bundle");
 329        candidate->debug_id = atomic_inc_return(&rxrpc_bundle_id);
 330        rb_link_node(&candidate->local_node, parent, pp);
 331        rb_insert_color(&candidate->local_node, &local->client_bundles);
 332        rxrpc_get_bundle(candidate);
 333        spin_unlock(&local->client_bundles_lock);
 334        _leave(" = %u [new]", candidate->debug_id);
 335        return candidate;
 336
 337found_bundle_free:
 338        rxrpc_free_bundle(candidate);
 339found_bundle:
 340        rxrpc_get_bundle(bundle);
 341        spin_unlock(&local->client_bundles_lock);
 342        _leave(" = %u [found]", bundle->debug_id);
 343        return bundle;
 344}
 345
 346/*
 347 * Create or find a client bundle to use for a call.
 348 *
 349 * If we return with a connection, the call will be on its waiting list.  It's
 350 * left to the caller to assign a channel and wake up the call.
 351 */
 352static struct rxrpc_bundle *rxrpc_prep_call(struct rxrpc_sock *rx,
 353                                            struct rxrpc_call *call,
 354                                            struct rxrpc_conn_parameters *cp,
 355                                            struct sockaddr_rxrpc *srx,
 356                                            gfp_t gfp)
 357{
 358        struct rxrpc_bundle *bundle;
 359
 360        _enter("{%d,%lx},", call->debug_id, call->user_call_ID);
 361
 362        cp->peer = rxrpc_lookup_peer(rx, cp->local, srx, gfp);
 363        if (!cp->peer)
 364                goto error;
 365
 366        call->cong_cwnd = cp->peer->cong_cwnd;
 367        if (call->cong_cwnd >= call->cong_ssthresh)
 368                call->cong_mode = RXRPC_CALL_CONGEST_AVOIDANCE;
 369        else
 370                call->cong_mode = RXRPC_CALL_SLOW_START;
 371        if (cp->upgrade)
 372                __set_bit(RXRPC_CALL_UPGRADE, &call->flags);
 373
 374        /* Find the client connection bundle. */
 375        bundle = rxrpc_look_up_bundle(cp, gfp);
 376        if (!bundle)
 377                goto error;
 378
 379        /* Get this call queued.  Someone else may activate it whilst we're
 380         * lining up a new connection, but that's fine.
 381         */
 382        spin_lock(&bundle->channel_lock);
 383        list_add_tail(&call->chan_wait_link, &bundle->waiting_calls);
 384        spin_unlock(&bundle->channel_lock);
 385
 386        _leave(" = [B=%x]", bundle->debug_id);
 387        return bundle;
 388
 389error:
 390        _leave(" = -ENOMEM");
 391        return ERR_PTR(-ENOMEM);
 392}
 393
 394/*
 395 * Allocate a new connection and add it into a bundle.
 396 */
 397static void rxrpc_add_conn_to_bundle(struct rxrpc_bundle *bundle, gfp_t gfp)
 398        __releases(bundle->channel_lock)
 399{
 400        struct rxrpc_connection *candidate = NULL, *old = NULL;
 401        bool conflict;
 402        int i;
 403
 404        _enter("");
 405
 406        conflict = bundle->alloc_conn;
 407        if (!conflict)
 408                bundle->alloc_conn = true;
 409        spin_unlock(&bundle->channel_lock);
 410        if (conflict) {
 411                _leave(" [conf]");
 412                return;
 413        }
 414
 415        candidate = rxrpc_alloc_client_connection(bundle, gfp);
 416
 417        spin_lock(&bundle->channel_lock);
 418        bundle->alloc_conn = false;
 419
 420        if (IS_ERR(candidate)) {
 421                bundle->alloc_error = PTR_ERR(candidate);
 422                spin_unlock(&bundle->channel_lock);
 423                _leave(" [err %ld]", PTR_ERR(candidate));
 424                return;
 425        }
 426
 427        bundle->alloc_error = 0;
 428
 429        for (i = 0; i < ARRAY_SIZE(bundle->conns); i++) {
 430                unsigned int shift = i * RXRPC_MAXCALLS;
 431                int j;
 432
 433                old = bundle->conns[i];
 434                if (!rxrpc_may_reuse_conn(old)) {
 435                        if (old)
 436                                trace_rxrpc_client(old, -1, rxrpc_client_replace);
 437                        candidate->bundle_shift = shift;
 438                        bundle->conns[i] = candidate;
 439                        for (j = 0; j < RXRPC_MAXCALLS; j++)
 440                                set_bit(shift + j, &bundle->avail_chans);
 441                        candidate = NULL;
 442                        break;
 443                }
 444
 445                old = NULL;
 446        }
 447
 448        spin_unlock(&bundle->channel_lock);
 449
 450        if (candidate) {
 451                _debug("discard C=%x", candidate->debug_id);
 452                trace_rxrpc_client(candidate, -1, rxrpc_client_duplicate);
 453                rxrpc_put_connection(candidate);
 454        }
 455
 456        rxrpc_put_connection(old);
 457        _leave("");
 458}
 459
 460/*
 461 * Add a connection to a bundle if there are no usable connections or we have
 462 * connections waiting for extra capacity.
 463 */
 464static void rxrpc_maybe_add_conn(struct rxrpc_bundle *bundle, gfp_t gfp)
 465{
 466        struct rxrpc_call *call;
 467        int i, usable;
 468
 469        _enter("");
 470
 471        spin_lock(&bundle->channel_lock);
 472
 473        /* See if there are any usable connections. */
 474        usable = 0;
 475        for (i = 0; i < ARRAY_SIZE(bundle->conns); i++)
 476                if (rxrpc_may_reuse_conn(bundle->conns[i]))
 477                        usable++;
 478
 479        if (!usable && !list_empty(&bundle->waiting_calls)) {
 480                call = list_first_entry(&bundle->waiting_calls,
 481                                        struct rxrpc_call, chan_wait_link);
 482                if (test_bit(RXRPC_CALL_UPGRADE, &call->flags))
 483                        bundle->try_upgrade = true;
 484        }
 485
 486        if (!usable)
 487                goto alloc_conn;
 488
 489        if (!bundle->avail_chans &&
 490            !bundle->try_upgrade &&
 491            !list_empty(&bundle->waiting_calls) &&
 492            usable < ARRAY_SIZE(bundle->conns))
 493                goto alloc_conn;
 494
 495        spin_unlock(&bundle->channel_lock);
 496        _leave("");
 497        return;
 498
 499alloc_conn:
 500        return rxrpc_add_conn_to_bundle(bundle, gfp);
 501}
 502
 503/*
 504 * Assign a channel to the call at the front of the queue and wake the call up.
 505 * We don't increment the callNumber counter until this number has been exposed
 506 * to the world.
 507 */
 508static void rxrpc_activate_one_channel(struct rxrpc_connection *conn,
 509                                       unsigned int channel)
 510{
 511        struct rxrpc_channel *chan = &conn->channels[channel];
 512        struct rxrpc_bundle *bundle = conn->bundle;
 513        struct rxrpc_call *call = list_entry(bundle->waiting_calls.next,
 514                                             struct rxrpc_call, chan_wait_link);
 515        u32 call_id = chan->call_counter + 1;
 516
 517        _enter("C=%x,%u", conn->debug_id, channel);
 518
 519        trace_rxrpc_client(conn, channel, rxrpc_client_chan_activate);
 520
 521        /* Cancel the final ACK on the previous call if it hasn't been sent yet
 522         * as the DATA packet will implicitly ACK it.
 523         */
 524        clear_bit(RXRPC_CONN_FINAL_ACK_0 + channel, &conn->flags);
 525        clear_bit(conn->bundle_shift + channel, &bundle->avail_chans);
 526
 527        rxrpc_see_call(call);
 528        list_del_init(&call->chan_wait_link);
 529        call->peer      = rxrpc_get_peer(conn->params.peer);
 530        call->conn      = rxrpc_get_connection(conn);
 531        call->cid       = conn->proto.cid | channel;
 532        call->call_id   = call_id;
 533        call->security  = conn->security;
 534        call->security_ix = conn->security_ix;
 535        call->service_id = conn->service_id;
 536
 537        trace_rxrpc_connect_call(call);
 538        _net("CONNECT call %08x:%08x as call %d on conn %d",
 539             call->cid, call->call_id, call->debug_id, conn->debug_id);
 540
 541        write_lock_bh(&call->state_lock);
 542        call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
 543        write_unlock_bh(&call->state_lock);
 544
 545        /* Paired with the read barrier in rxrpc_connect_call().  This orders
 546         * cid and epoch in the connection wrt to call_id without the need to
 547         * take the channel_lock.
 548         *
 549         * We provisionally assign a callNumber at this point, but we don't
 550         * confirm it until the call is about to be exposed.
 551         *
 552         * TODO: Pair with a barrier in the data_ready handler when that looks
 553         * at the call ID through a connection channel.
 554         */
 555        smp_wmb();
 556
 557        chan->call_id           = call_id;
 558        chan->call_debug_id     = call->debug_id;
 559        rcu_assign_pointer(chan->call, call);
 560        wake_up(&call->waitq);
 561}
 562
 563/*
 564 * Remove a connection from the idle list if it's on it.
 565 */
 566static void rxrpc_unidle_conn(struct rxrpc_bundle *bundle, struct rxrpc_connection *conn)
 567{
 568        struct rxrpc_net *rxnet = bundle->params.local->rxnet;
 569        bool drop_ref;
 570
 571        if (!list_empty(&conn->cache_link)) {
 572                drop_ref = false;
 573                spin_lock(&rxnet->client_conn_cache_lock);
 574                if (!list_empty(&conn->cache_link)) {
 575                        list_del_init(&conn->cache_link);
 576                        drop_ref = true;
 577                }
 578                spin_unlock(&rxnet->client_conn_cache_lock);
 579                if (drop_ref)
 580                        rxrpc_put_connection(conn);
 581        }
 582}
 583
 584/*
 585 * Assign channels and callNumbers to waiting calls with channel_lock
 586 * held by caller.
 587 */
 588static void rxrpc_activate_channels_locked(struct rxrpc_bundle *bundle)
 589{
 590        struct rxrpc_connection *conn;
 591        unsigned long avail, mask;
 592        unsigned int channel, slot;
 593
 594        if (bundle->try_upgrade)
 595                mask = 1;
 596        else
 597                mask = ULONG_MAX;
 598
 599        while (!list_empty(&bundle->waiting_calls)) {
 600                avail = bundle->avail_chans & mask;
 601                if (!avail)
 602                        break;
 603                channel = __ffs(avail);
 604                clear_bit(channel, &bundle->avail_chans);
 605
 606                slot = channel / RXRPC_MAXCALLS;
 607                conn = bundle->conns[slot];
 608                if (!conn)
 609                        break;
 610
 611                if (bundle->try_upgrade)
 612                        set_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags);
 613                rxrpc_unidle_conn(bundle, conn);
 614
 615                channel &= (RXRPC_MAXCALLS - 1);
 616                conn->act_chans |= 1 << channel;
 617                rxrpc_activate_one_channel(conn, channel);
 618        }
 619}
 620
 621/*
 622 * Assign channels and callNumbers to waiting calls.
 623 */
 624static void rxrpc_activate_channels(struct rxrpc_bundle *bundle)
 625{
 626        _enter("B=%x", bundle->debug_id);
 627
 628        trace_rxrpc_client(NULL, -1, rxrpc_client_activate_chans);
 629
 630        if (!bundle->avail_chans)
 631                return;
 632
 633        spin_lock(&bundle->channel_lock);
 634        rxrpc_activate_channels_locked(bundle);
 635        spin_unlock(&bundle->channel_lock);
 636        _leave("");
 637}
 638
 639/*
 640 * Wait for a callNumber and a channel to be granted to a call.
 641 */
 642static int rxrpc_wait_for_channel(struct rxrpc_bundle *bundle,
 643                                  struct rxrpc_call *call, gfp_t gfp)
 644{
 645        DECLARE_WAITQUEUE(myself, current);
 646        int ret = 0;
 647
 648        _enter("%d", call->debug_id);
 649
 650        if (!gfpflags_allow_blocking(gfp)) {
 651                rxrpc_maybe_add_conn(bundle, gfp);
 652                rxrpc_activate_channels(bundle);
 653                ret = bundle->alloc_error ?: -EAGAIN;
 654                goto out;
 655        }
 656
 657        add_wait_queue_exclusive(&call->waitq, &myself);
 658        for (;;) {
 659                rxrpc_maybe_add_conn(bundle, gfp);
 660                rxrpc_activate_channels(bundle);
 661                ret = bundle->alloc_error;
 662                if (ret < 0)
 663                        break;
 664
 665                switch (call->interruptibility) {
 666                case RXRPC_INTERRUPTIBLE:
 667                case RXRPC_PREINTERRUPTIBLE:
 668                        set_current_state(TASK_INTERRUPTIBLE);
 669                        break;
 670                case RXRPC_UNINTERRUPTIBLE:
 671                default:
 672                        set_current_state(TASK_UNINTERRUPTIBLE);
 673                        break;
 674                }
 675                if (READ_ONCE(call->state) != RXRPC_CALL_CLIENT_AWAIT_CONN)
 676                        break;
 677                if ((call->interruptibility == RXRPC_INTERRUPTIBLE ||
 678                     call->interruptibility == RXRPC_PREINTERRUPTIBLE) &&
 679                    signal_pending(current)) {
 680                        ret = -ERESTARTSYS;
 681                        break;
 682                }
 683                schedule();
 684        }
 685        remove_wait_queue(&call->waitq, &myself);
 686        __set_current_state(TASK_RUNNING);
 687
 688out:
 689        _leave(" = %d", ret);
 690        return ret;
 691}
 692
 693/*
 694 * find a connection for a call
 695 * - called in process context with IRQs enabled
 696 */
 697int rxrpc_connect_call(struct rxrpc_sock *rx,
 698                       struct rxrpc_call *call,
 699                       struct rxrpc_conn_parameters *cp,
 700                       struct sockaddr_rxrpc *srx,
 701                       gfp_t gfp)
 702{
 703        struct rxrpc_bundle *bundle;
 704        struct rxrpc_net *rxnet = cp->local->rxnet;
 705        int ret = 0;
 706
 707        _enter("{%d,%lx},", call->debug_id, call->user_call_ID);
 708
 709        rxrpc_discard_expired_client_conns(&rxnet->client_conn_reaper);
 710
 711        bundle = rxrpc_prep_call(rx, call, cp, srx, gfp);
 712        if (IS_ERR(bundle)) {
 713                ret = PTR_ERR(bundle);
 714                goto out;
 715        }
 716
 717        if (call->state == RXRPC_CALL_CLIENT_AWAIT_CONN) {
 718                ret = rxrpc_wait_for_channel(bundle, call, gfp);
 719                if (ret < 0)
 720                        goto wait_failed;
 721        }
 722
 723granted_channel:
 724        /* Paired with the write barrier in rxrpc_activate_one_channel(). */
 725        smp_rmb();
 726
 727out_put_bundle:
 728        rxrpc_put_bundle(bundle);
 729out:
 730        _leave(" = %d", ret);
 731        return ret;
 732
 733wait_failed:
 734        spin_lock(&bundle->channel_lock);
 735        list_del_init(&call->chan_wait_link);
 736        spin_unlock(&bundle->channel_lock);
 737
 738        if (call->state != RXRPC_CALL_CLIENT_AWAIT_CONN) {
 739                ret = 0;
 740                goto granted_channel;
 741        }
 742
 743        trace_rxrpc_client(call->conn, ret, rxrpc_client_chan_wait_failed);
 744        rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, 0, ret);
 745        rxrpc_disconnect_client_call(bundle, call);
 746        goto out_put_bundle;
 747}
 748
 749/*
 750 * Note that a call, and thus a connection, is about to be exposed to the
 751 * world.
 752 */
 753void rxrpc_expose_client_call(struct rxrpc_call *call)
 754{
 755        unsigned int channel = call->cid & RXRPC_CHANNELMASK;
 756        struct rxrpc_connection *conn = call->conn;
 757        struct rxrpc_channel *chan = &conn->channels[channel];
 758
 759        if (!test_and_set_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
 760                /* Mark the call ID as being used.  If the callNumber counter
 761                 * exceeds ~2 billion, we kill the connection after its
 762                 * outstanding calls have finished so that the counter doesn't
 763                 * wrap.
 764                 */
 765                chan->call_counter++;
 766                if (chan->call_counter >= INT_MAX)
 767                        set_bit(RXRPC_CONN_DONT_REUSE, &conn->flags);
 768                trace_rxrpc_client(conn, channel, rxrpc_client_exposed);
 769        }
 770}
 771
 772/*
 773 * Set the reap timer.
 774 */
 775static void rxrpc_set_client_reap_timer(struct rxrpc_net *rxnet)
 776{
 777        if (!rxnet->kill_all_client_conns) {
 778                unsigned long now = jiffies;
 779                unsigned long reap_at = now + rxrpc_conn_idle_client_expiry;
 780
 781                if (rxnet->live)
 782                        timer_reduce(&rxnet->client_conn_reap_timer, reap_at);
 783        }
 784}
 785
 786/*
 787 * Disconnect a client call.
 788 */
 789void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call *call)
 790{
 791        struct rxrpc_connection *conn;
 792        struct rxrpc_channel *chan = NULL;
 793        struct rxrpc_net *rxnet = bundle->params.local->rxnet;
 794        unsigned int channel;
 795        bool may_reuse;
 796        u32 cid;
 797
 798        _enter("c=%x", call->debug_id);
 799
 800        spin_lock(&bundle->channel_lock);
 801        set_bit(RXRPC_CALL_DISCONNECTED, &call->flags);
 802
 803        /* Calls that have never actually been assigned a channel can simply be
 804         * discarded.
 805         */
 806        conn = call->conn;
 807        if (!conn) {
 808                _debug("call is waiting");
 809                ASSERTCMP(call->call_id, ==, 0);
 810                ASSERT(!test_bit(RXRPC_CALL_EXPOSED, &call->flags));
 811                list_del_init(&call->chan_wait_link);
 812                goto out;
 813        }
 814
 815        cid = call->cid;
 816        channel = cid & RXRPC_CHANNELMASK;
 817        chan = &conn->channels[channel];
 818        trace_rxrpc_client(conn, channel, rxrpc_client_chan_disconnect);
 819
 820        if (rcu_access_pointer(chan->call) != call) {
 821                spin_unlock(&bundle->channel_lock);
 822                BUG();
 823        }
 824
 825        may_reuse = rxrpc_may_reuse_conn(conn);
 826
 827        /* If a client call was exposed to the world, we save the result for
 828         * retransmission.
 829         *
 830         * We use a barrier here so that the call number and abort code can be
 831         * read without needing to take a lock.
 832         *
 833         * TODO: Make the incoming packet handler check this and handle
 834         * terminal retransmission without requiring access to the call.
 835         */
 836        if (test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
 837                _debug("exposed %u,%u", call->call_id, call->abort_code);
 838                __rxrpc_disconnect_call(conn, call);
 839
 840                if (test_and_clear_bit(RXRPC_CONN_PROBING_FOR_UPGRADE, &conn->flags)) {
 841                        trace_rxrpc_client(conn, channel, rxrpc_client_to_active);
 842                        bundle->try_upgrade = false;
 843                        if (may_reuse)
 844                                rxrpc_activate_channels_locked(bundle);
 845                }
 846
 847        }
 848
 849        /* See if we can pass the channel directly to another call. */
 850        if (may_reuse && !list_empty(&bundle->waiting_calls)) {
 851                trace_rxrpc_client(conn, channel, rxrpc_client_chan_pass);
 852                rxrpc_activate_one_channel(conn, channel);
 853                goto out;
 854        }
 855
 856        /* Schedule the final ACK to be transmitted in a short while so that it
 857         * can be skipped if we find a follow-on call.  The first DATA packet
 858         * of the follow on call will implicitly ACK this call.
 859         */
 860        if (call->completion == RXRPC_CALL_SUCCEEDED &&
 861            test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
 862                unsigned long final_ack_at = jiffies + 2;
 863
 864                WRITE_ONCE(chan->final_ack_at, final_ack_at);
 865                smp_wmb(); /* vs rxrpc_process_delayed_final_acks() */
 866                set_bit(RXRPC_CONN_FINAL_ACK_0 + channel, &conn->flags);
 867                rxrpc_reduce_conn_timer(conn, final_ack_at);
 868        }
 869
 870        /* Deactivate the channel. */
 871        rcu_assign_pointer(chan->call, NULL);
 872        set_bit(conn->bundle_shift + channel, &conn->bundle->avail_chans);
 873        conn->act_chans &= ~(1 << channel);
 874
 875        /* If no channels remain active, then put the connection on the idle
 876         * list for a short while.  Give it a ref to stop it going away if it
 877         * becomes unbundled.
 878         */
 879        if (!conn->act_chans) {
 880                trace_rxrpc_client(conn, channel, rxrpc_client_to_idle);
 881                conn->idle_timestamp = jiffies;
 882
 883                rxrpc_get_connection(conn);
 884                spin_lock(&rxnet->client_conn_cache_lock);
 885                list_move_tail(&conn->cache_link, &rxnet->idle_client_conns);
 886                spin_unlock(&rxnet->client_conn_cache_lock);
 887
 888                rxrpc_set_client_reap_timer(rxnet);
 889        }
 890
 891out:
 892        spin_unlock(&bundle->channel_lock);
 893        _leave("");
 894        return;
 895}
 896
 897/*
 898 * Remove a connection from a bundle.
 899 */
 900static void rxrpc_unbundle_conn(struct rxrpc_connection *conn)
 901{
 902        struct rxrpc_bundle *bundle = conn->bundle;
 903        struct rxrpc_local *local = bundle->params.local;
 904        unsigned int bindex;
 905        bool need_drop = false, need_put = false;
 906        int i;
 907
 908        _enter("C=%x", conn->debug_id);
 909
 910        if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK)
 911                rxrpc_process_delayed_final_acks(conn, true);
 912
 913        spin_lock(&bundle->channel_lock);
 914        bindex = conn->bundle_shift / RXRPC_MAXCALLS;
 915        if (bundle->conns[bindex] == conn) {
 916                _debug("clear slot %u", bindex);
 917                bundle->conns[bindex] = NULL;
 918                for (i = 0; i < RXRPC_MAXCALLS; i++)
 919                        clear_bit(conn->bundle_shift + i, &bundle->avail_chans);
 920                need_drop = true;
 921        }
 922        spin_unlock(&bundle->channel_lock);
 923
 924        /* If there are no more connections, remove the bundle */
 925        if (!bundle->avail_chans) {
 926                _debug("maybe unbundle");
 927                spin_lock(&local->client_bundles_lock);
 928
 929                for (i = 0; i < ARRAY_SIZE(bundle->conns); i++)
 930                        if (bundle->conns[i])
 931                                break;
 932                if (i == ARRAY_SIZE(bundle->conns) && !bundle->params.exclusive) {
 933                        _debug("erase bundle");
 934                        rb_erase(&bundle->local_node, &local->client_bundles);
 935                        need_put = true;
 936                }
 937
 938                spin_unlock(&local->client_bundles_lock);
 939                if (need_put)
 940                        rxrpc_put_bundle(bundle);
 941        }
 942
 943        if (need_drop)
 944                rxrpc_put_connection(conn);
 945        _leave("");
 946}
 947
 948/*
 949 * Clean up a dead client connection.
 950 */
 951static void rxrpc_kill_client_conn(struct rxrpc_connection *conn)
 952{
 953        struct rxrpc_local *local = conn->params.local;
 954        struct rxrpc_net *rxnet = local->rxnet;
 955
 956        _enter("C=%x", conn->debug_id);
 957
 958        trace_rxrpc_client(conn, -1, rxrpc_client_cleanup);
 959        atomic_dec(&rxnet->nr_client_conns);
 960
 961        rxrpc_put_client_connection_id(conn);
 962        rxrpc_kill_connection(conn);
 963}
 964
 965/*
 966 * Clean up a dead client connections.
 967 */
 968void rxrpc_put_client_conn(struct rxrpc_connection *conn)
 969{
 970        const void *here = __builtin_return_address(0);
 971        unsigned int debug_id = conn->debug_id;
 972        bool dead;
 973        int r;
 974
 975        dead = __refcount_dec_and_test(&conn->ref, &r);
 976        trace_rxrpc_conn(debug_id, rxrpc_conn_put_client, r - 1, here);
 977        if (dead)
 978                rxrpc_kill_client_conn(conn);
 979}
 980
 981/*
 982 * Discard expired client connections from the idle list.  Each conn in the
 983 * idle list has been exposed and holds an extra ref because of that.
 984 *
 985 * This may be called from conn setup or from a work item so cannot be
 986 * considered non-reentrant.
 987 */
 988void rxrpc_discard_expired_client_conns(struct work_struct *work)
 989{
 990        struct rxrpc_connection *conn;
 991        struct rxrpc_net *rxnet =
 992                container_of(work, struct rxrpc_net, client_conn_reaper);
 993        unsigned long expiry, conn_expires_at, now;
 994        unsigned int nr_conns;
 995
 996        _enter("");
 997
 998        if (list_empty(&rxnet->idle_client_conns)) {
 999                _leave(" [empty]");
1000                return;
1001        }
1002
1003        /* Don't double up on the discarding */
1004        if (!spin_trylock(&rxnet->client_conn_discard_lock)) {
1005                _leave(" [already]");
1006                return;
1007        }
1008
1009        /* We keep an estimate of what the number of conns ought to be after
1010         * we've discarded some so that we don't overdo the discarding.
1011         */
1012        nr_conns = atomic_read(&rxnet->nr_client_conns);
1013
1014next:
1015        spin_lock(&rxnet->client_conn_cache_lock);
1016
1017        if (list_empty(&rxnet->idle_client_conns))
1018                goto out;
1019
1020        conn = list_entry(rxnet->idle_client_conns.next,
1021                          struct rxrpc_connection, cache_link);
1022
1023        if (!rxnet->kill_all_client_conns) {
1024                /* If the number of connections is over the reap limit, we
1025                 * expedite discard by reducing the expiry timeout.  We must,
1026                 * however, have at least a short grace period to be able to do
1027                 * final-ACK or ABORT retransmission.
1028                 */
1029                expiry = rxrpc_conn_idle_client_expiry;
1030                if (nr_conns > rxrpc_reap_client_connections)
1031                        expiry = rxrpc_conn_idle_client_fast_expiry;
1032                if (conn->params.local->service_closed)
1033                        expiry = rxrpc_closed_conn_expiry * HZ;
1034
1035                conn_expires_at = conn->idle_timestamp + expiry;
1036
1037                now = READ_ONCE(jiffies);
1038                if (time_after(conn_expires_at, now))
1039                        goto not_yet_expired;
1040        }
1041
1042        trace_rxrpc_client(conn, -1, rxrpc_client_discard);
1043        list_del_init(&conn->cache_link);
1044
1045        spin_unlock(&rxnet->client_conn_cache_lock);
1046
1047        rxrpc_unbundle_conn(conn);
1048        rxrpc_put_connection(conn); /* Drop the ->cache_link ref */
1049
1050        nr_conns--;
1051        goto next;
1052
1053not_yet_expired:
1054        /* The connection at the front of the queue hasn't yet expired, so
1055         * schedule the work item for that point if we discarded something.
1056         *
1057         * We don't worry if the work item is already scheduled - it can look
1058         * after rescheduling itself at a later time.  We could cancel it, but
1059         * then things get messier.
1060         */
1061        _debug("not yet");
1062        if (!rxnet->kill_all_client_conns)
1063                timer_reduce(&rxnet->client_conn_reap_timer, conn_expires_at);
1064
1065out:
1066        spin_unlock(&rxnet->client_conn_cache_lock);
1067        spin_unlock(&rxnet->client_conn_discard_lock);
1068        _leave("");
1069}
1070
1071/*
1072 * Preemptively destroy all the client connection records rather than waiting
1073 * for them to time out
1074 */
1075void rxrpc_destroy_all_client_connections(struct rxrpc_net *rxnet)
1076{
1077        _enter("");
1078
1079        spin_lock(&rxnet->client_conn_cache_lock);
1080        rxnet->kill_all_client_conns = true;
1081        spin_unlock(&rxnet->client_conn_cache_lock);
1082
1083        del_timer_sync(&rxnet->client_conn_reap_timer);
1084
1085        if (!rxrpc_queue_work(&rxnet->client_conn_reaper))
1086                _debug("destroy: queue failed");
1087
1088        _leave("");
1089}
1090
1091/*
1092 * Clean up the client connections on a local endpoint.
1093 */
1094void rxrpc_clean_up_local_conns(struct rxrpc_local *local)
1095{
1096        struct rxrpc_connection *conn, *tmp;
1097        struct rxrpc_net *rxnet = local->rxnet;
1098        LIST_HEAD(graveyard);
1099
1100        _enter("");
1101
1102        spin_lock(&rxnet->client_conn_cache_lock);
1103
1104        list_for_each_entry_safe(conn, tmp, &rxnet->idle_client_conns,
1105                                 cache_link) {
1106                if (conn->params.local == local) {
1107                        trace_rxrpc_client(conn, -1, rxrpc_client_discard);
1108                        list_move(&conn->cache_link, &graveyard);
1109                }
1110        }
1111
1112        spin_unlock(&rxnet->client_conn_cache_lock);
1113
1114        while (!list_empty(&graveyard)) {
1115                conn = list_entry(graveyard.next,
1116                                  struct rxrpc_connection, cache_link);
1117                list_del_init(&conn->cache_link);
1118                rxrpc_unbundle_conn(conn);
1119                rxrpc_put_connection(conn);
1120        }
1121
1122        _leave(" [culled]");
1123}
1124