linux/net/rxrpc/ar-call.c
<<
>>
Prefs
   1/* RxRPC individual remote procedure call handling
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#include <linux/module.h>
  13#include <linux/circ_buf.h>
  14#include <net/sock.h>
  15#include <net/af_rxrpc.h>
  16#include "ar-internal.h"
  17
  18const char *const rxrpc_call_states[] = {
  19        [RXRPC_CALL_CLIENT_SEND_REQUEST]        = "ClSndReq",
  20        [RXRPC_CALL_CLIENT_AWAIT_REPLY]         = "ClAwtRpl",
  21        [RXRPC_CALL_CLIENT_RECV_REPLY]          = "ClRcvRpl",
  22        [RXRPC_CALL_CLIENT_FINAL_ACK]           = "ClFnlACK",
  23        [RXRPC_CALL_SERVER_SECURING]            = "SvSecure",
  24        [RXRPC_CALL_SERVER_ACCEPTING]           = "SvAccept",
  25        [RXRPC_CALL_SERVER_RECV_REQUEST]        = "SvRcvReq",
  26        [RXRPC_CALL_SERVER_ACK_REQUEST]         = "SvAckReq",
  27        [RXRPC_CALL_SERVER_SEND_REPLY]          = "SvSndRpl",
  28        [RXRPC_CALL_SERVER_AWAIT_ACK]           = "SvAwtACK",
  29        [RXRPC_CALL_COMPLETE]                   = "Complete",
  30        [RXRPC_CALL_SERVER_BUSY]                = "SvBusy  ",
  31        [RXRPC_CALL_REMOTELY_ABORTED]           = "RmtAbort",
  32        [RXRPC_CALL_LOCALLY_ABORTED]            = "LocAbort",
  33        [RXRPC_CALL_NETWORK_ERROR]              = "NetError",
  34        [RXRPC_CALL_DEAD]                       = "Dead    ",
  35};
  36
  37struct kmem_cache *rxrpc_call_jar;
  38LIST_HEAD(rxrpc_calls);
  39DEFINE_RWLOCK(rxrpc_call_lock);
  40static unsigned rxrpc_call_max_lifetime = 60;
  41static unsigned rxrpc_dead_call_timeout = 2;
  42
  43static void rxrpc_destroy_call(struct work_struct *work);
  44static void rxrpc_call_life_expired(unsigned long _call);
  45static void rxrpc_dead_call_expired(unsigned long _call);
  46static void rxrpc_ack_time_expired(unsigned long _call);
  47static void rxrpc_resend_time_expired(unsigned long _call);
  48
  49/*
  50 * allocate a new call
  51 */
  52static struct rxrpc_call *rxrpc_alloc_call(gfp_t gfp)
  53{
  54        struct rxrpc_call *call;
  55
  56        call = kmem_cache_zalloc(rxrpc_call_jar, gfp);
  57        if (!call)
  58                return NULL;
  59
  60        call->acks_winsz = 16;
  61        call->acks_window = kmalloc(call->acks_winsz * sizeof(unsigned long),
  62                                    gfp);
  63        if (!call->acks_window) {
  64                kmem_cache_free(rxrpc_call_jar, call);
  65                return NULL;
  66        }
  67
  68        setup_timer(&call->lifetimer, &rxrpc_call_life_expired,
  69                    (unsigned long) call);
  70        setup_timer(&call->deadspan, &rxrpc_dead_call_expired,
  71                    (unsigned long) call);
  72        setup_timer(&call->ack_timer, &rxrpc_ack_time_expired,
  73                    (unsigned long) call);
  74        setup_timer(&call->resend_timer, &rxrpc_resend_time_expired,
  75                    (unsigned long) call);
  76        INIT_WORK(&call->destroyer, &rxrpc_destroy_call);
  77        INIT_WORK(&call->processor, &rxrpc_process_call);
  78        INIT_LIST_HEAD(&call->accept_link);
  79        skb_queue_head_init(&call->rx_queue);
  80        skb_queue_head_init(&call->rx_oos_queue);
  81        init_waitqueue_head(&call->tx_waitq);
  82        spin_lock_init(&call->lock);
  83        rwlock_init(&call->state_lock);
  84        atomic_set(&call->usage, 1);
  85        call->debug_id = atomic_inc_return(&rxrpc_debug_id);
  86        call->state = RXRPC_CALL_CLIENT_SEND_REQUEST;
  87
  88        memset(&call->sock_node, 0xed, sizeof(call->sock_node));
  89
  90        call->rx_data_expect = 1;
  91        call->rx_data_eaten = 0;
  92        call->rx_first_oos = 0;
  93        call->ackr_win_top = call->rx_data_eaten + 1 + RXRPC_MAXACKS;
  94        call->creation_jif = jiffies;
  95        return call;
  96}
  97
  98/*
  99 * allocate a new client call and attempt to get a connection slot for it
 100 */
 101static struct rxrpc_call *rxrpc_alloc_client_call(
 102        struct rxrpc_sock *rx,
 103        struct rxrpc_transport *trans,
 104        struct rxrpc_conn_bundle *bundle,
 105        gfp_t gfp)
 106{
 107        struct rxrpc_call *call;
 108        int ret;
 109
 110        _enter("");
 111
 112        ASSERT(rx != NULL);
 113        ASSERT(trans != NULL);
 114        ASSERT(bundle != NULL);
 115
 116        call = rxrpc_alloc_call(gfp);
 117        if (!call)
 118                return ERR_PTR(-ENOMEM);
 119
 120        sock_hold(&rx->sk);
 121        call->socket = rx;
 122        call->rx_data_post = 1;
 123
 124        ret = rxrpc_connect_call(rx, trans, bundle, call, gfp);
 125        if (ret < 0) {
 126                kmem_cache_free(rxrpc_call_jar, call);
 127                return ERR_PTR(ret);
 128        }
 129
 130        spin_lock(&call->conn->trans->peer->lock);
 131        list_add(&call->error_link, &call->conn->trans->peer->error_targets);
 132        spin_unlock(&call->conn->trans->peer->lock);
 133
 134        call->lifetimer.expires = jiffies + rxrpc_call_max_lifetime * HZ;
 135        add_timer(&call->lifetimer);
 136
 137        _leave(" = %p", call);
 138        return call;
 139}
 140
 141/*
 142 * set up a call for the given data
 143 * - called in process context with IRQs enabled
 144 */
 145struct rxrpc_call *rxrpc_get_client_call(struct rxrpc_sock *rx,
 146                                         struct rxrpc_transport *trans,
 147                                         struct rxrpc_conn_bundle *bundle,
 148                                         unsigned long user_call_ID,
 149                                         int create,
 150                                         gfp_t gfp)
 151{
 152        struct rxrpc_call *call, *candidate;
 153        struct rb_node *p, *parent, **pp;
 154
 155        _enter("%p,%d,%d,%lx,%d",
 156               rx, trans ? trans->debug_id : -1, bundle ? bundle->debug_id : -1,
 157               user_call_ID, create);
 158
 159        /* search the extant calls first for one that matches the specified
 160         * user ID */
 161        read_lock(&rx->call_lock);
 162
 163        p = rx->calls.rb_node;
 164        while (p) {
 165                call = rb_entry(p, struct rxrpc_call, sock_node);
 166
 167                if (user_call_ID < call->user_call_ID)
 168                        p = p->rb_left;
 169                else if (user_call_ID > call->user_call_ID)
 170                        p = p->rb_right;
 171                else
 172                        goto found_extant_call;
 173        }
 174
 175        read_unlock(&rx->call_lock);
 176
 177        if (!create || !trans)
 178                return ERR_PTR(-EBADSLT);
 179
 180        /* not yet present - create a candidate for a new record and then
 181         * redo the search */
 182        candidate = rxrpc_alloc_client_call(rx, trans, bundle, gfp);
 183        if (IS_ERR(candidate)) {
 184                _leave(" = %ld", PTR_ERR(candidate));
 185                return candidate;
 186        }
 187
 188        candidate->user_call_ID = user_call_ID;
 189        __set_bit(RXRPC_CALL_HAS_USERID, &candidate->flags);
 190
 191        write_lock(&rx->call_lock);
 192
 193        pp = &rx->calls.rb_node;
 194        parent = NULL;
 195        while (*pp) {
 196                parent = *pp;
 197                call = rb_entry(parent, struct rxrpc_call, sock_node);
 198
 199                if (user_call_ID < call->user_call_ID)
 200                        pp = &(*pp)->rb_left;
 201                else if (user_call_ID > call->user_call_ID)
 202                        pp = &(*pp)->rb_right;
 203                else
 204                        goto found_extant_second;
 205        }
 206
 207        /* second search also failed; add the new call */
 208        call = candidate;
 209        candidate = NULL;
 210        rxrpc_get_call(call);
 211
 212        rb_link_node(&call->sock_node, parent, pp);
 213        rb_insert_color(&call->sock_node, &rx->calls);
 214        write_unlock(&rx->call_lock);
 215
 216        write_lock_bh(&rxrpc_call_lock);
 217        list_add_tail(&call->link, &rxrpc_calls);
 218        write_unlock_bh(&rxrpc_call_lock);
 219
 220        _net("CALL new %d on CONN %d", call->debug_id, call->conn->debug_id);
 221
 222        _leave(" = %p [new]", call);
 223        return call;
 224
 225        /* we found the call in the list immediately */
 226found_extant_call:
 227        rxrpc_get_call(call);
 228        read_unlock(&rx->call_lock);
 229        _leave(" = %p [extant %d]", call, atomic_read(&call->usage));
 230        return call;
 231
 232        /* we found the call on the second time through the list */
 233found_extant_second:
 234        rxrpc_get_call(call);
 235        write_unlock(&rx->call_lock);
 236        rxrpc_put_call(candidate);
 237        _leave(" = %p [second %d]", call, atomic_read(&call->usage));
 238        return call;
 239}
 240
 241/*
 242 * set up an incoming call
 243 * - called in process context with IRQs enabled
 244 */
 245struct rxrpc_call *rxrpc_incoming_call(struct rxrpc_sock *rx,
 246                                       struct rxrpc_connection *conn,
 247                                       struct rxrpc_header *hdr,
 248                                       gfp_t gfp)
 249{
 250        struct rxrpc_call *call, *candidate;
 251        struct rb_node **p, *parent;
 252        __be32 call_id;
 253
 254        _enter(",%d,,%x", conn->debug_id, gfp);
 255
 256        ASSERT(rx != NULL);
 257
 258        candidate = rxrpc_alloc_call(gfp);
 259        if (!candidate)
 260                return ERR_PTR(-EBUSY);
 261
 262        candidate->socket = rx;
 263        candidate->conn = conn;
 264        candidate->cid = hdr->cid;
 265        candidate->call_id = hdr->callNumber;
 266        candidate->channel = ntohl(hdr->cid) & RXRPC_CHANNELMASK;
 267        candidate->rx_data_post = 0;
 268        candidate->state = RXRPC_CALL_SERVER_ACCEPTING;
 269        if (conn->security_ix > 0)
 270                candidate->state = RXRPC_CALL_SERVER_SECURING;
 271
 272        write_lock_bh(&conn->lock);
 273
 274        /* set the channel for this call */
 275        call = conn->channels[candidate->channel];
 276        _debug("channel[%u] is %p", candidate->channel, call);
 277        if (call && call->call_id == hdr->callNumber) {
 278                /* already set; must've been a duplicate packet */
 279                _debug("extant call [%d]", call->state);
 280                ASSERTCMP(call->conn, ==, conn);
 281
 282                read_lock(&call->state_lock);
 283                switch (call->state) {
 284                case RXRPC_CALL_LOCALLY_ABORTED:
 285                        if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
 286                                rxrpc_queue_call(call);
 287                case RXRPC_CALL_REMOTELY_ABORTED:
 288                        read_unlock(&call->state_lock);
 289                        goto aborted_call;
 290                default:
 291                        rxrpc_get_call(call);
 292                        read_unlock(&call->state_lock);
 293                        goto extant_call;
 294                }
 295        }
 296
 297        if (call) {
 298                /* it seems the channel is still in use from the previous call
 299                 * - ditch the old binding if its call is now complete */
 300                _debug("CALL: %u { %s }",
 301                       call->debug_id, rxrpc_call_states[call->state]);
 302
 303                if (call->state >= RXRPC_CALL_COMPLETE) {
 304                        conn->channels[call->channel] = NULL;
 305                } else {
 306                        write_unlock_bh(&conn->lock);
 307                        kmem_cache_free(rxrpc_call_jar, candidate);
 308                        _leave(" = -EBUSY");
 309                        return ERR_PTR(-EBUSY);
 310                }
 311        }
 312
 313        /* check the call number isn't duplicate */
 314        _debug("check dup");
 315        call_id = hdr->callNumber;
 316        p = &conn->calls.rb_node;
 317        parent = NULL;
 318        while (*p) {
 319                parent = *p;
 320                call = rb_entry(parent, struct rxrpc_call, conn_node);
 321
 322                if (call_id < call->call_id)
 323                        p = &(*p)->rb_left;
 324                else if (call_id > call->call_id)
 325                        p = &(*p)->rb_right;
 326                else
 327                        goto old_call;
 328        }
 329
 330        /* make the call available */
 331        _debug("new call");
 332        call = candidate;
 333        candidate = NULL;
 334        rb_link_node(&call->conn_node, parent, p);
 335        rb_insert_color(&call->conn_node, &conn->calls);
 336        conn->channels[call->channel] = call;
 337        sock_hold(&rx->sk);
 338        atomic_inc(&conn->usage);
 339        write_unlock_bh(&conn->lock);
 340
 341        spin_lock(&conn->trans->peer->lock);
 342        list_add(&call->error_link, &conn->trans->peer->error_targets);
 343        spin_unlock(&conn->trans->peer->lock);
 344
 345        write_lock_bh(&rxrpc_call_lock);
 346        list_add_tail(&call->link, &rxrpc_calls);
 347        write_unlock_bh(&rxrpc_call_lock);
 348
 349        _net("CALL incoming %d on CONN %d", call->debug_id, call->conn->debug_id);
 350
 351        call->lifetimer.expires = jiffies + rxrpc_call_max_lifetime * HZ;
 352        add_timer(&call->lifetimer);
 353        _leave(" = %p {%d} [new]", call, call->debug_id);
 354        return call;
 355
 356extant_call:
 357        write_unlock_bh(&conn->lock);
 358        kmem_cache_free(rxrpc_call_jar, candidate);
 359        _leave(" = %p {%d} [extant]", call, call ? call->debug_id : -1);
 360        return call;
 361
 362aborted_call:
 363        write_unlock_bh(&conn->lock);
 364        kmem_cache_free(rxrpc_call_jar, candidate);
 365        _leave(" = -ECONNABORTED");
 366        return ERR_PTR(-ECONNABORTED);
 367
 368old_call:
 369        write_unlock_bh(&conn->lock);
 370        kmem_cache_free(rxrpc_call_jar, candidate);
 371        _leave(" = -ECONNRESET [old]");
 372        return ERR_PTR(-ECONNRESET);
 373}
 374
 375/*
 376 * find an extant server call
 377 * - called in process context with IRQs enabled
 378 */
 379struct rxrpc_call *rxrpc_find_server_call(struct rxrpc_sock *rx,
 380                                          unsigned long user_call_ID)
 381{
 382        struct rxrpc_call *call;
 383        struct rb_node *p;
 384
 385        _enter("%p,%lx", rx, user_call_ID);
 386
 387        /* search the extant calls for one that matches the specified user
 388         * ID */
 389        read_lock(&rx->call_lock);
 390
 391        p = rx->calls.rb_node;
 392        while (p) {
 393                call = rb_entry(p, struct rxrpc_call, sock_node);
 394
 395                if (user_call_ID < call->user_call_ID)
 396                        p = p->rb_left;
 397                else if (user_call_ID > call->user_call_ID)
 398                        p = p->rb_right;
 399                else
 400                        goto found_extant_call;
 401        }
 402
 403        read_unlock(&rx->call_lock);
 404        _leave(" = NULL");
 405        return NULL;
 406
 407        /* we found the call in the list immediately */
 408found_extant_call:
 409        rxrpc_get_call(call);
 410        read_unlock(&rx->call_lock);
 411        _leave(" = %p [%d]", call, atomic_read(&call->usage));
 412        return call;
 413}
 414
 415/*
 416 * detach a call from a socket and set up for release
 417 */
 418void rxrpc_release_call(struct rxrpc_call *call)
 419{
 420        struct rxrpc_connection *conn = call->conn;
 421        struct rxrpc_sock *rx = call->socket;
 422
 423        _enter("{%d,%d,%d,%d}",
 424               call->debug_id, atomic_read(&call->usage),
 425               atomic_read(&call->ackr_not_idle),
 426               call->rx_first_oos);
 427
 428        spin_lock_bh(&call->lock);
 429        if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags))
 430                BUG();
 431        spin_unlock_bh(&call->lock);
 432
 433        /* dissociate from the socket
 434         * - the socket's ref on the call is passed to the death timer
 435         */
 436        _debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn);
 437
 438        write_lock_bh(&rx->call_lock);
 439        if (!list_empty(&call->accept_link)) {
 440                _debug("unlinking once-pending call %p { e=%lx f=%lx }",
 441                       call, call->events, call->flags);
 442                ASSERT(!test_bit(RXRPC_CALL_HAS_USERID, &call->flags));
 443                list_del_init(&call->accept_link);
 444                sk_acceptq_removed(&rx->sk);
 445        } else if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
 446                rb_erase(&call->sock_node, &rx->calls);
 447                memset(&call->sock_node, 0xdd, sizeof(call->sock_node));
 448                clear_bit(RXRPC_CALL_HAS_USERID, &call->flags);
 449        }
 450        write_unlock_bh(&rx->call_lock);
 451
 452        /* free up the channel for reuse */
 453        spin_lock(&conn->trans->client_lock);
 454        write_lock_bh(&conn->lock);
 455        write_lock(&call->state_lock);
 456
 457        if (conn->channels[call->channel] == call)
 458                conn->channels[call->channel] = NULL;
 459
 460        if (conn->out_clientflag && conn->bundle) {
 461                conn->avail_calls++;
 462                switch (conn->avail_calls) {
 463                case 1:
 464                        list_move_tail(&conn->bundle_link,
 465                                       &conn->bundle->avail_conns);
 466                case 2 ... RXRPC_MAXCALLS - 1:
 467                        ASSERT(conn->channels[0] == NULL ||
 468                               conn->channels[1] == NULL ||
 469                               conn->channels[2] == NULL ||
 470                               conn->channels[3] == NULL);
 471                        break;
 472                case RXRPC_MAXCALLS:
 473                        list_move_tail(&conn->bundle_link,
 474                                       &conn->bundle->unused_conns);
 475                        ASSERT(conn->channels[0] == NULL &&
 476                               conn->channels[1] == NULL &&
 477                               conn->channels[2] == NULL &&
 478                               conn->channels[3] == NULL);
 479                        break;
 480                default:
 481                        printk(KERN_ERR "RxRPC: conn->avail_calls=%d\n",
 482                               conn->avail_calls);
 483                        BUG();
 484                }
 485        }
 486
 487        spin_unlock(&conn->trans->client_lock);
 488
 489        if (call->state < RXRPC_CALL_COMPLETE &&
 490            call->state != RXRPC_CALL_CLIENT_FINAL_ACK) {
 491                _debug("+++ ABORTING STATE %d +++\n", call->state);
 492                call->state = RXRPC_CALL_LOCALLY_ABORTED;
 493                call->abort_code = RX_CALL_DEAD;
 494                set_bit(RXRPC_CALL_ABORT, &call->events);
 495                rxrpc_queue_call(call);
 496        }
 497        write_unlock(&call->state_lock);
 498        write_unlock_bh(&conn->lock);
 499
 500        /* clean up the Rx queue */
 501        if (!skb_queue_empty(&call->rx_queue) ||
 502            !skb_queue_empty(&call->rx_oos_queue)) {
 503                struct rxrpc_skb_priv *sp;
 504                struct sk_buff *skb;
 505
 506                _debug("purge Rx queues");
 507
 508                spin_lock_bh(&call->lock);
 509                while ((skb = skb_dequeue(&call->rx_queue)) ||
 510                       (skb = skb_dequeue(&call->rx_oos_queue))) {
 511                        sp = rxrpc_skb(skb);
 512                        if (sp->call) {
 513                                ASSERTCMP(sp->call, ==, call);
 514                                rxrpc_put_call(call);
 515                                sp->call = NULL;
 516                        }
 517                        skb->destructor = NULL;
 518                        spin_unlock_bh(&call->lock);
 519
 520                        _debug("- zap %s %%%u #%u",
 521                               rxrpc_pkts[sp->hdr.type],
 522                               ntohl(sp->hdr.serial),
 523                               ntohl(sp->hdr.seq));
 524                        rxrpc_free_skb(skb);
 525                        spin_lock_bh(&call->lock);
 526                }
 527                spin_unlock_bh(&call->lock);
 528
 529                ASSERTCMP(call->state, !=, RXRPC_CALL_COMPLETE);
 530        }
 531
 532        del_timer_sync(&call->resend_timer);
 533        del_timer_sync(&call->ack_timer);
 534        del_timer_sync(&call->lifetimer);
 535        call->deadspan.expires = jiffies + rxrpc_dead_call_timeout * HZ;
 536        add_timer(&call->deadspan);
 537
 538        _leave("");
 539}
 540
 541/*
 542 * handle a dead call being ready for reaping
 543 */
 544static void rxrpc_dead_call_expired(unsigned long _call)
 545{
 546        struct rxrpc_call *call = (struct rxrpc_call *) _call;
 547
 548        _enter("{%d}", call->debug_id);
 549
 550        write_lock_bh(&call->state_lock);
 551        call->state = RXRPC_CALL_DEAD;
 552        write_unlock_bh(&call->state_lock);
 553        rxrpc_put_call(call);
 554}
 555
 556/*
 557 * mark a call as to be released, aborting it if it's still in progress
 558 * - called with softirqs disabled
 559 */
 560static void rxrpc_mark_call_released(struct rxrpc_call *call)
 561{
 562        bool sched;
 563
 564        write_lock(&call->state_lock);
 565        if (call->state < RXRPC_CALL_DEAD) {
 566                sched = false;
 567                if (call->state < RXRPC_CALL_COMPLETE) {
 568                        _debug("abort call %p", call);
 569                        call->state = RXRPC_CALL_LOCALLY_ABORTED;
 570                        call->abort_code = RX_CALL_DEAD;
 571                        if (!test_and_set_bit(RXRPC_CALL_ABORT, &call->events))
 572                                sched = true;
 573                }
 574                if (!test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
 575                        sched = true;
 576                if (sched)
 577                        rxrpc_queue_call(call);
 578        }
 579        write_unlock(&call->state_lock);
 580}
 581
 582/*
 583 * release all the calls associated with a socket
 584 */
 585void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
 586{
 587        struct rxrpc_call *call;
 588        struct rb_node *p;
 589
 590        _enter("%p", rx);
 591
 592        read_lock_bh(&rx->call_lock);
 593
 594        /* mark all the calls as no longer wanting incoming packets */
 595        for (p = rb_first(&rx->calls); p; p = rb_next(p)) {
 596                call = rb_entry(p, struct rxrpc_call, sock_node);
 597                rxrpc_mark_call_released(call);
 598        }
 599
 600        /* kill the not-yet-accepted incoming calls */
 601        list_for_each_entry(call, &rx->secureq, accept_link) {
 602                rxrpc_mark_call_released(call);
 603        }
 604
 605        list_for_each_entry(call, &rx->acceptq, accept_link) {
 606                rxrpc_mark_call_released(call);
 607        }
 608
 609        read_unlock_bh(&rx->call_lock);
 610        _leave("");
 611}
 612
 613/*
 614 * release a call
 615 */
 616void __rxrpc_put_call(struct rxrpc_call *call)
 617{
 618        ASSERT(call != NULL);
 619
 620        _enter("%p{u=%d}", call, atomic_read(&call->usage));
 621
 622        ASSERTCMP(atomic_read(&call->usage), >, 0);
 623
 624        if (atomic_dec_and_test(&call->usage)) {
 625                _debug("call %d dead", call->debug_id);
 626                ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
 627                rxrpc_queue_work(&call->destroyer);
 628        }
 629        _leave("");
 630}
 631
 632/*
 633 * clean up a call
 634 */
 635static void rxrpc_cleanup_call(struct rxrpc_call *call)
 636{
 637        _net("DESTROY CALL %d", call->debug_id);
 638
 639        ASSERT(call->socket);
 640
 641        memset(&call->sock_node, 0xcd, sizeof(call->sock_node));
 642
 643        del_timer_sync(&call->lifetimer);
 644        del_timer_sync(&call->deadspan);
 645        del_timer_sync(&call->ack_timer);
 646        del_timer_sync(&call->resend_timer);
 647
 648        ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags));
 649        ASSERTCMP(call->events, ==, 0);
 650        if (work_pending(&call->processor)) {
 651                _debug("defer destroy");
 652                rxrpc_queue_work(&call->destroyer);
 653                return;
 654        }
 655
 656        if (call->conn) {
 657                spin_lock(&call->conn->trans->peer->lock);
 658                list_del(&call->error_link);
 659                spin_unlock(&call->conn->trans->peer->lock);
 660
 661                write_lock_bh(&call->conn->lock);
 662                rb_erase(&call->conn_node, &call->conn->calls);
 663                write_unlock_bh(&call->conn->lock);
 664                rxrpc_put_connection(call->conn);
 665        }
 666
 667        if (call->acks_window) {
 668                _debug("kill Tx window %d",
 669                       CIRC_CNT(call->acks_head, call->acks_tail,
 670                                call->acks_winsz));
 671                smp_mb();
 672                while (CIRC_CNT(call->acks_head, call->acks_tail,
 673                                call->acks_winsz) > 0) {
 674                        struct rxrpc_skb_priv *sp;
 675                        unsigned long _skb;
 676
 677                        _skb = call->acks_window[call->acks_tail] & ~1;
 678                        sp = rxrpc_skb((struct sk_buff *) _skb);
 679                        _debug("+++ clear Tx %u", ntohl(sp->hdr.seq));
 680                        rxrpc_free_skb((struct sk_buff *) _skb);
 681                        call->acks_tail =
 682                                (call->acks_tail + 1) & (call->acks_winsz - 1);
 683                }
 684
 685                kfree(call->acks_window);
 686        }
 687
 688        rxrpc_free_skb(call->tx_pending);
 689
 690        rxrpc_purge_queue(&call->rx_queue);
 691        ASSERT(skb_queue_empty(&call->rx_oos_queue));
 692        sock_put(&call->socket->sk);
 693        kmem_cache_free(rxrpc_call_jar, call);
 694}
 695
 696/*
 697 * destroy a call
 698 */
 699static void rxrpc_destroy_call(struct work_struct *work)
 700{
 701        struct rxrpc_call *call =
 702                container_of(work, struct rxrpc_call, destroyer);
 703
 704        _enter("%p{%d,%d,%p}",
 705               call, atomic_read(&call->usage), call->channel, call->conn);
 706
 707        ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
 708
 709        write_lock_bh(&rxrpc_call_lock);
 710        list_del_init(&call->link);
 711        write_unlock_bh(&rxrpc_call_lock);
 712
 713        rxrpc_cleanup_call(call);
 714        _leave("");
 715}
 716
 717/*
 718 * preemptively destroy all the call records from a transport endpoint rather
 719 * than waiting for them to time out
 720 */
 721void __exit rxrpc_destroy_all_calls(void)
 722{
 723        struct rxrpc_call *call;
 724
 725        _enter("");
 726        write_lock_bh(&rxrpc_call_lock);
 727
 728        while (!list_empty(&rxrpc_calls)) {
 729                call = list_entry(rxrpc_calls.next, struct rxrpc_call, link);
 730                _debug("Zapping call %p", call);
 731
 732                list_del_init(&call->link);
 733
 734                switch (atomic_read(&call->usage)) {
 735                case 0:
 736                        ASSERTCMP(call->state, ==, RXRPC_CALL_DEAD);
 737                        break;
 738                case 1:
 739                        if (del_timer_sync(&call->deadspan) != 0 &&
 740                            call->state != RXRPC_CALL_DEAD)
 741                                rxrpc_dead_call_expired((unsigned long) call);
 742                        if (call->state != RXRPC_CALL_DEAD)
 743                                break;
 744                default:
 745                        printk(KERN_ERR "RXRPC:"
 746                               " Call %p still in use (%d,%d,%s,%lx,%lx)!\n",
 747                               call, atomic_read(&call->usage),
 748                               atomic_read(&call->ackr_not_idle),
 749                               rxrpc_call_states[call->state],
 750                               call->flags, call->events);
 751                        if (!skb_queue_empty(&call->rx_queue))
 752                                printk(KERN_ERR"RXRPC: Rx queue occupied\n");
 753                        if (!skb_queue_empty(&call->rx_oos_queue))
 754                                printk(KERN_ERR"RXRPC: OOS queue occupied\n");
 755                        break;
 756                }
 757
 758                write_unlock_bh(&rxrpc_call_lock);
 759                cond_resched();
 760                write_lock_bh(&rxrpc_call_lock);
 761        }
 762
 763        write_unlock_bh(&rxrpc_call_lock);
 764        _leave("");
 765}
 766
 767/*
 768 * handle call lifetime being exceeded
 769 */
 770static void rxrpc_call_life_expired(unsigned long _call)
 771{
 772        struct rxrpc_call *call = (struct rxrpc_call *) _call;
 773
 774        if (call->state >= RXRPC_CALL_COMPLETE)
 775                return;
 776
 777        _enter("{%d}", call->debug_id);
 778        read_lock_bh(&call->state_lock);
 779        if (call->state < RXRPC_CALL_COMPLETE) {
 780                set_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
 781                rxrpc_queue_call(call);
 782        }
 783        read_unlock_bh(&call->state_lock);
 784}
 785
 786/*
 787 * handle resend timer expiry
 788 */
 789static void rxrpc_resend_time_expired(unsigned long _call)
 790{
 791        struct rxrpc_call *call = (struct rxrpc_call *) _call;
 792
 793        _enter("{%d}", call->debug_id);
 794
 795        if (call->state >= RXRPC_CALL_COMPLETE)
 796                return;
 797
 798        read_lock_bh(&call->state_lock);
 799        clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 800        if (call->state < RXRPC_CALL_COMPLETE &&
 801            !test_and_set_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
 802                rxrpc_queue_call(call);
 803        read_unlock_bh(&call->state_lock);
 804}
 805
 806/*
 807 * handle ACK timer expiry
 808 */
 809static void rxrpc_ack_time_expired(unsigned long _call)
 810{
 811        struct rxrpc_call *call = (struct rxrpc_call *) _call;
 812
 813        _enter("{%d}", call->debug_id);
 814
 815        if (call->state >= RXRPC_CALL_COMPLETE)
 816                return;
 817
 818        read_lock_bh(&call->state_lock);
 819        if (call->state < RXRPC_CALL_COMPLETE &&
 820            !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
 821                rxrpc_queue_call(call);
 822        read_unlock_bh(&call->state_lock);
 823}
 824