linux/net/rxrpc/ar-ack.c
<<
>>
Prefs
   1/* Management of Tx window, Tx resend, ACKs and out-of-sequence reception
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#include <linux/module.h>
  13#include <linux/circ_buf.h>
  14#include <linux/net.h>
  15#include <linux/skbuff.h>
  16#include <linux/udp.h>
  17#include <net/sock.h>
  18#include <net/af_rxrpc.h>
  19#include "ar-internal.h"
  20
  21static unsigned rxrpc_ack_defer = 1;
  22
  23static const char *const rxrpc_acks[] = {
  24        "---", "REQ", "DUP", "OOS", "WIN", "MEM", "PNG", "PNR", "DLY", "IDL",
  25        "-?-"
  26};
  27
  28static const s8 rxrpc_ack_priority[] = {
  29        [0]                             = 0,
  30        [RXRPC_ACK_DELAY]               = 1,
  31        [RXRPC_ACK_REQUESTED]           = 2,
  32        [RXRPC_ACK_IDLE]                = 3,
  33        [RXRPC_ACK_PING_RESPONSE]       = 4,
  34        [RXRPC_ACK_DUPLICATE]           = 5,
  35        [RXRPC_ACK_OUT_OF_SEQUENCE]     = 6,
  36        [RXRPC_ACK_EXCEEDS_WINDOW]      = 7,
  37        [RXRPC_ACK_NOSPACE]             = 8,
  38};
  39
  40/*
  41 * propose an ACK be sent
  42 */
  43void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
  44                         __be32 serial, bool immediate)
  45{
  46        unsigned long expiry;
  47        s8 prior = rxrpc_ack_priority[ack_reason];
  48
  49        ASSERTCMP(prior, >, 0);
  50
  51        _enter("{%d},%s,%%%x,%u",
  52               call->debug_id, rxrpc_acks[ack_reason], ntohl(serial),
  53               immediate);
  54
  55        if (prior < rxrpc_ack_priority[call->ackr_reason]) {
  56                if (immediate)
  57                        goto cancel_timer;
  58                return;
  59        }
  60
  61        /* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial
  62         * numbers */
  63        if (prior == rxrpc_ack_priority[call->ackr_reason]) {
  64                if (prior <= 4)
  65                        call->ackr_serial = serial;
  66                if (immediate)
  67                        goto cancel_timer;
  68                return;
  69        }
  70
  71        call->ackr_reason = ack_reason;
  72        call->ackr_serial = serial;
  73
  74        switch (ack_reason) {
  75        case RXRPC_ACK_DELAY:
  76                _debug("run delay timer");
  77                call->ack_timer.expires = jiffies + rxrpc_ack_timeout * HZ;
  78                add_timer(&call->ack_timer);
  79                return;
  80
  81        case RXRPC_ACK_IDLE:
  82                if (!immediate) {
  83                        _debug("run defer timer");
  84                        expiry = 1;
  85                        goto run_timer;
  86                }
  87                goto cancel_timer;
  88
  89        case RXRPC_ACK_REQUESTED:
  90                if (!rxrpc_ack_defer)
  91                        goto cancel_timer;
  92                if (!immediate || serial == cpu_to_be32(1)) {
  93                        _debug("run defer timer");
  94                        expiry = rxrpc_ack_defer;
  95                        goto run_timer;
  96                }
  97
  98        default:
  99                _debug("immediate ACK");
 100                goto cancel_timer;
 101        }
 102
 103run_timer:
 104        expiry += jiffies;
 105        if (!timer_pending(&call->ack_timer) ||
 106            time_after(call->ack_timer.expires, expiry))
 107                mod_timer(&call->ack_timer, expiry);
 108        return;
 109
 110cancel_timer:
 111        _debug("cancel timer %%%u", ntohl(serial));
 112        try_to_del_timer_sync(&call->ack_timer);
 113        read_lock_bh(&call->state_lock);
 114        if (call->state <= RXRPC_CALL_COMPLETE &&
 115            !test_and_set_bit(RXRPC_CALL_ACK, &call->events))
 116                rxrpc_queue_call(call);
 117        read_unlock_bh(&call->state_lock);
 118}
 119
 120/*
 121 * propose an ACK be sent, locking the call structure
 122 */
 123void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
 124                       __be32 serial, bool immediate)
 125{
 126        s8 prior = rxrpc_ack_priority[ack_reason];
 127
 128        if (prior > rxrpc_ack_priority[call->ackr_reason]) {
 129                spin_lock_bh(&call->lock);
 130                __rxrpc_propose_ACK(call, ack_reason, serial, immediate);
 131                spin_unlock_bh(&call->lock);
 132        }
 133}
 134
 135/*
 136 * set the resend timer
 137 */
 138static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend,
 139                             unsigned long resend_at)
 140{
 141        read_lock_bh(&call->state_lock);
 142        if (call->state >= RXRPC_CALL_COMPLETE)
 143                resend = 0;
 144
 145        if (resend & 1) {
 146                _debug("SET RESEND");
 147                set_bit(RXRPC_CALL_RESEND, &call->events);
 148        }
 149
 150        if (resend & 2) {
 151                _debug("MODIFY RESEND TIMER");
 152                set_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 153                mod_timer(&call->resend_timer, resend_at);
 154        } else {
 155                _debug("KILL RESEND TIMER");
 156                del_timer_sync(&call->resend_timer);
 157                clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
 158                clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 159        }
 160        read_unlock_bh(&call->state_lock);
 161}
 162
 163/*
 164 * resend packets
 165 */
 166static void rxrpc_resend(struct rxrpc_call *call)
 167{
 168        struct rxrpc_skb_priv *sp;
 169        struct rxrpc_header *hdr;
 170        struct sk_buff *txb;
 171        unsigned long *p_txb, resend_at;
 172        int loop, stop;
 173        u8 resend;
 174
 175        _enter("{%d,%d,%d,%d},",
 176               call->acks_hard, call->acks_unacked,
 177               atomic_read(&call->sequence),
 178               CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
 179
 180        stop = 0;
 181        resend = 0;
 182        resend_at = 0;
 183
 184        for (loop = call->acks_tail;
 185             loop != call->acks_head || stop;
 186             loop = (loop + 1) &  (call->acks_winsz - 1)
 187             ) {
 188                p_txb = call->acks_window + loop;
 189                smp_read_barrier_depends();
 190                if (*p_txb & 1)
 191                        continue;
 192
 193                txb = (struct sk_buff *) *p_txb;
 194                sp = rxrpc_skb(txb);
 195
 196                if (sp->need_resend) {
 197                        sp->need_resend = 0;
 198
 199                        /* each Tx packet has a new serial number */
 200                        sp->hdr.serial =
 201                                htonl(atomic_inc_return(&call->conn->serial));
 202
 203                        hdr = (struct rxrpc_header *) txb->head;
 204                        hdr->serial = sp->hdr.serial;
 205
 206                        _proto("Tx DATA %%%u { #%d }",
 207                               ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
 208                        if (rxrpc_send_packet(call->conn->trans, txb) < 0) {
 209                                stop = 0;
 210                                sp->resend_at = jiffies + 3;
 211                        } else {
 212                                sp->resend_at =
 213                                        jiffies + rxrpc_resend_timeout * HZ;
 214                        }
 215                }
 216
 217                if (time_after_eq(jiffies + 1, sp->resend_at)) {
 218                        sp->need_resend = 1;
 219                        resend |= 1;
 220                } else if (resend & 2) {
 221                        if (time_before(sp->resend_at, resend_at))
 222                                resend_at = sp->resend_at;
 223                } else {
 224                        resend_at = sp->resend_at;
 225                        resend |= 2;
 226                }
 227        }
 228
 229        rxrpc_set_resend(call, resend, resend_at);
 230        _leave("");
 231}
 232
 233/*
 234 * handle resend timer expiry
 235 */
 236static void rxrpc_resend_timer(struct rxrpc_call *call)
 237{
 238        struct rxrpc_skb_priv *sp;
 239        struct sk_buff *txb;
 240        unsigned long *p_txb, resend_at;
 241        int loop;
 242        u8 resend;
 243
 244        _enter("%d,%d,%d",
 245               call->acks_tail, call->acks_unacked, call->acks_head);
 246
 247        resend = 0;
 248        resend_at = 0;
 249
 250        for (loop = call->acks_unacked;
 251             loop != call->acks_head;
 252             loop = (loop + 1) &  (call->acks_winsz - 1)
 253             ) {
 254                p_txb = call->acks_window + loop;
 255                smp_read_barrier_depends();
 256                txb = (struct sk_buff *) (*p_txb & ~1);
 257                sp = rxrpc_skb(txb);
 258
 259                ASSERT(!(*p_txb & 1));
 260
 261                if (sp->need_resend) {
 262                        ;
 263                } else if (time_after_eq(jiffies + 1, sp->resend_at)) {
 264                        sp->need_resend = 1;
 265                        resend |= 1;
 266                } else if (resend & 2) {
 267                        if (time_before(sp->resend_at, resend_at))
 268                                resend_at = sp->resend_at;
 269                } else {
 270                        resend_at = sp->resend_at;
 271                        resend |= 2;
 272                }
 273        }
 274
 275        rxrpc_set_resend(call, resend, resend_at);
 276        _leave("");
 277}
 278
 279/*
 280 * process soft ACKs of our transmitted packets
 281 * - these indicate packets the peer has or has not received, but hasn't yet
 282 *   given to the consumer, and so can still be discarded and re-requested
 283 */
 284static int rxrpc_process_soft_ACKs(struct rxrpc_call *call,
 285                                   struct rxrpc_ackpacket *ack,
 286                                   struct sk_buff *skb)
 287{
 288        struct rxrpc_skb_priv *sp;
 289        struct sk_buff *txb;
 290        unsigned long *p_txb, resend_at;
 291        int loop;
 292        u8 sacks[RXRPC_MAXACKS], resend;
 293
 294        _enter("{%d,%d},{%d},",
 295               call->acks_hard,
 296               CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz),
 297               ack->nAcks);
 298
 299        if (skb_copy_bits(skb, 0, sacks, ack->nAcks) < 0)
 300                goto protocol_error;
 301
 302        resend = 0;
 303        resend_at = 0;
 304        for (loop = 0; loop < ack->nAcks; loop++) {
 305                p_txb = call->acks_window;
 306                p_txb += (call->acks_tail + loop) & (call->acks_winsz - 1);
 307                smp_read_barrier_depends();
 308                txb = (struct sk_buff *) (*p_txb & ~1);
 309                sp = rxrpc_skb(txb);
 310
 311                switch (sacks[loop]) {
 312                case RXRPC_ACK_TYPE_ACK:
 313                        sp->need_resend = 0;
 314                        *p_txb |= 1;
 315                        break;
 316                case RXRPC_ACK_TYPE_NACK:
 317                        sp->need_resend = 1;
 318                        *p_txb &= ~1;
 319                        resend = 1;
 320                        break;
 321                default:
 322                        _debug("Unsupported ACK type %d", sacks[loop]);
 323                        goto protocol_error;
 324                }
 325        }
 326
 327        smp_mb();
 328        call->acks_unacked = (call->acks_tail + loop) & (call->acks_winsz - 1);
 329
 330        /* anything not explicitly ACK'd is implicitly NACK'd, but may just not
 331         * have been received or processed yet by the far end */
 332        for (loop = call->acks_unacked;
 333             loop != call->acks_head;
 334             loop = (loop + 1) &  (call->acks_winsz - 1)
 335             ) {
 336                p_txb = call->acks_window + loop;
 337                smp_read_barrier_depends();
 338                txb = (struct sk_buff *) (*p_txb & ~1);
 339                sp = rxrpc_skb(txb);
 340
 341                if (*p_txb & 1) {
 342                        /* packet must have been discarded */
 343                        sp->need_resend = 1;
 344                        *p_txb &= ~1;
 345                        resend |= 1;
 346                } else if (sp->need_resend) {
 347                        ;
 348                } else if (time_after_eq(jiffies + 1, sp->resend_at)) {
 349                        sp->need_resend = 1;
 350                        resend |= 1;
 351                } else if (resend & 2) {
 352                        if (time_before(sp->resend_at, resend_at))
 353                                resend_at = sp->resend_at;
 354                } else {
 355                        resend_at = sp->resend_at;
 356                        resend |= 2;
 357                }
 358        }
 359
 360        rxrpc_set_resend(call, resend, resend_at);
 361        _leave(" = 0");
 362        return 0;
 363
 364protocol_error:
 365        _leave(" = -EPROTO");
 366        return -EPROTO;
 367}
 368
 369/*
 370 * discard hard-ACK'd packets from the Tx window
 371 */
 372static void rxrpc_rotate_tx_window(struct rxrpc_call *call, u32 hard)
 373{
 374        struct rxrpc_skb_priv *sp;
 375        unsigned long _skb;
 376        int tail = call->acks_tail, old_tail;
 377        int win = CIRC_CNT(call->acks_head, tail, call->acks_winsz);
 378
 379        _enter("{%u,%u},%u", call->acks_hard, win, hard);
 380
 381        ASSERTCMP(hard - call->acks_hard, <=, win);
 382
 383        while (call->acks_hard < hard) {
 384                smp_read_barrier_depends();
 385                _skb = call->acks_window[tail] & ~1;
 386                sp = rxrpc_skb((struct sk_buff *) _skb);
 387                rxrpc_free_skb((struct sk_buff *) _skb);
 388                old_tail = tail;
 389                tail = (tail + 1) & (call->acks_winsz - 1);
 390                call->acks_tail = tail;
 391                if (call->acks_unacked == old_tail)
 392                        call->acks_unacked = tail;
 393                call->acks_hard++;
 394        }
 395
 396        wake_up(&call->tx_waitq);
 397}
 398
 399/*
 400 * clear the Tx window in the event of a failure
 401 */
 402static void rxrpc_clear_tx_window(struct rxrpc_call *call)
 403{
 404        rxrpc_rotate_tx_window(call, atomic_read(&call->sequence));
 405}
 406
 407/*
 408 * drain the out of sequence received packet queue into the packet Rx queue
 409 */
 410static int rxrpc_drain_rx_oos_queue(struct rxrpc_call *call)
 411{
 412        struct rxrpc_skb_priv *sp;
 413        struct sk_buff *skb;
 414        bool terminal;
 415        int ret;
 416
 417        _enter("{%d,%d}", call->rx_data_post, call->rx_first_oos);
 418
 419        spin_lock_bh(&call->lock);
 420
 421        ret = -ECONNRESET;
 422        if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
 423                goto socket_unavailable;
 424
 425        skb = skb_dequeue(&call->rx_oos_queue);
 426        if (skb) {
 427                sp = rxrpc_skb(skb);
 428
 429                _debug("drain OOS packet %d [%d]",
 430                       ntohl(sp->hdr.seq), call->rx_first_oos);
 431
 432                if (ntohl(sp->hdr.seq) != call->rx_first_oos) {
 433                        skb_queue_head(&call->rx_oos_queue, skb);
 434                        call->rx_first_oos = ntohl(rxrpc_skb(skb)->hdr.seq);
 435                        _debug("requeue %p {%u}", skb, call->rx_first_oos);
 436                } else {
 437                        skb->mark = RXRPC_SKB_MARK_DATA;
 438                        terminal = ((sp->hdr.flags & RXRPC_LAST_PACKET) &&
 439                                !(sp->hdr.flags & RXRPC_CLIENT_INITIATED));
 440                        ret = rxrpc_queue_rcv_skb(call, skb, true, terminal);
 441                        BUG_ON(ret < 0);
 442                        _debug("drain #%u", call->rx_data_post);
 443                        call->rx_data_post++;
 444
 445                        /* find out what the next packet is */
 446                        skb = skb_peek(&call->rx_oos_queue);
 447                        if (skb)
 448                                call->rx_first_oos =
 449                                        ntohl(rxrpc_skb(skb)->hdr.seq);
 450                        else
 451                                call->rx_first_oos = 0;
 452                        _debug("peek %p {%u}", skb, call->rx_first_oos);
 453                }
 454        }
 455
 456        ret = 0;
 457socket_unavailable:
 458        spin_unlock_bh(&call->lock);
 459        _leave(" = %d", ret);
 460        return ret;
 461}
 462
 463/*
 464 * insert an out of sequence packet into the buffer
 465 */
 466static void rxrpc_insert_oos_packet(struct rxrpc_call *call,
 467                                    struct sk_buff *skb)
 468{
 469        struct rxrpc_skb_priv *sp, *psp;
 470        struct sk_buff *p;
 471        u32 seq;
 472
 473        sp = rxrpc_skb(skb);
 474        seq = ntohl(sp->hdr.seq);
 475        _enter(",,{%u}", seq);
 476
 477        skb->destructor = rxrpc_packet_destructor;
 478        ASSERTCMP(sp->call, ==, NULL);
 479        sp->call = call;
 480        rxrpc_get_call(call);
 481
 482        /* insert into the buffer in sequence order */
 483        spin_lock_bh(&call->lock);
 484
 485        skb_queue_walk(&call->rx_oos_queue, p) {
 486                psp = rxrpc_skb(p);
 487                if (ntohl(psp->hdr.seq) > seq) {
 488                        _debug("insert oos #%u before #%u",
 489                               seq, ntohl(psp->hdr.seq));
 490                        skb_insert(p, skb, &call->rx_oos_queue);
 491                        goto inserted;
 492                }
 493        }
 494
 495        _debug("append oos #%u", seq);
 496        skb_queue_tail(&call->rx_oos_queue, skb);
 497inserted:
 498
 499        /* we might now have a new front to the queue */
 500        if (call->rx_first_oos == 0 || seq < call->rx_first_oos)
 501                call->rx_first_oos = seq;
 502
 503        read_lock(&call->state_lock);
 504        if (call->state < RXRPC_CALL_COMPLETE &&
 505            call->rx_data_post == call->rx_first_oos) {
 506                _debug("drain rx oos now");
 507                set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
 508        }
 509        read_unlock(&call->state_lock);
 510
 511        spin_unlock_bh(&call->lock);
 512        _leave(" [stored #%u]", call->rx_first_oos);
 513}
 514
 515/*
 516 * clear the Tx window on final ACK reception
 517 */
 518static void rxrpc_zap_tx_window(struct rxrpc_call *call)
 519{
 520        struct rxrpc_skb_priv *sp;
 521        struct sk_buff *skb;
 522        unsigned long _skb, *acks_window;
 523        u8 winsz = call->acks_winsz;
 524        int tail;
 525
 526        acks_window = call->acks_window;
 527        call->acks_window = NULL;
 528
 529        while (CIRC_CNT(call->acks_head, call->acks_tail, winsz) > 0) {
 530                tail = call->acks_tail;
 531                smp_read_barrier_depends();
 532                _skb = acks_window[tail] & ~1;
 533                smp_mb();
 534                call->acks_tail = (call->acks_tail + 1) & (winsz - 1);
 535
 536                skb = (struct sk_buff *) _skb;
 537                sp = rxrpc_skb(skb);
 538                _debug("+++ clear Tx %u", ntohl(sp->hdr.seq));
 539                rxrpc_free_skb(skb);
 540        }
 541
 542        kfree(acks_window);
 543}
 544
 545/*
 546 * process the extra information that may be appended to an ACK packet
 547 */
 548static void rxrpc_extract_ackinfo(struct rxrpc_call *call, struct sk_buff *skb,
 549                                  unsigned latest, int nAcks)
 550{
 551        struct rxrpc_ackinfo ackinfo;
 552        struct rxrpc_peer *peer;
 553        unsigned mtu;
 554
 555        if (skb_copy_bits(skb, nAcks + 3, &ackinfo, sizeof(ackinfo)) < 0) {
 556                _leave(" [no ackinfo]");
 557                return;
 558        }
 559
 560        _proto("Rx ACK %%%u Info { rx=%u max=%u rwin=%u jm=%u }",
 561               latest,
 562               ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU),
 563               ntohl(ackinfo.rwind), ntohl(ackinfo.jumbo_max));
 564
 565        mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU));
 566
 567        peer = call->conn->trans->peer;
 568        if (mtu < peer->maxdata) {
 569                spin_lock_bh(&peer->lock);
 570                peer->maxdata = mtu;
 571                peer->mtu = mtu + peer->hdrsize;
 572                spin_unlock_bh(&peer->lock);
 573                _net("Net MTU %u (maxdata %u)", peer->mtu, peer->maxdata);
 574        }
 575}
 576
 577/*
 578 * process packets in the reception queue
 579 */
 580static int rxrpc_process_rx_queue(struct rxrpc_call *call,
 581                                  u32 *_abort_code)
 582{
 583        struct rxrpc_ackpacket ack;
 584        struct rxrpc_skb_priv *sp;
 585        struct sk_buff *skb;
 586        bool post_ACK;
 587        int latest;
 588        u32 hard, tx;
 589
 590        _enter("");
 591
 592process_further:
 593        skb = skb_dequeue(&call->rx_queue);
 594        if (!skb)
 595                return -EAGAIN;
 596
 597        _net("deferred skb %p", skb);
 598
 599        sp = rxrpc_skb(skb);
 600
 601        _debug("process %s [st %d]", rxrpc_pkts[sp->hdr.type], call->state);
 602
 603        post_ACK = false;
 604
 605        switch (sp->hdr.type) {
 606                /* data packets that wind up here have been received out of
 607                 * order, need security processing or are jumbo packets */
 608        case RXRPC_PACKET_TYPE_DATA:
 609                _proto("OOSQ DATA %%%u { #%u }",
 610                       ntohl(sp->hdr.serial), ntohl(sp->hdr.seq));
 611
 612                /* secured packets must be verified and possibly decrypted */
 613                if (rxrpc_verify_packet(call, skb, _abort_code) < 0)
 614                        goto protocol_error;
 615
 616                rxrpc_insert_oos_packet(call, skb);
 617                goto process_further;
 618
 619                /* partial ACK to process */
 620        case RXRPC_PACKET_TYPE_ACK:
 621                if (skb_copy_bits(skb, 0, &ack, sizeof(ack)) < 0) {
 622                        _debug("extraction failure");
 623                        goto protocol_error;
 624                }
 625                if (!skb_pull(skb, sizeof(ack)))
 626                        BUG();
 627
 628                latest = ntohl(sp->hdr.serial);
 629                hard = ntohl(ack.firstPacket);
 630                tx = atomic_read(&call->sequence);
 631
 632                _proto("Rx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
 633                       latest,
 634                       ntohs(ack.maxSkew),
 635                       hard,
 636                       ntohl(ack.previousPacket),
 637                       ntohl(ack.serial),
 638                       rxrpc_acks[ack.reason],
 639                       ack.nAcks);
 640
 641                rxrpc_extract_ackinfo(call, skb, latest, ack.nAcks);
 642
 643                if (ack.reason == RXRPC_ACK_PING) {
 644                        _proto("Rx ACK %%%u PING Request", latest);
 645                        rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
 646                                          sp->hdr.serial, true);
 647                }
 648
 649                /* discard any out-of-order or duplicate ACKs */
 650                if (latest - call->acks_latest <= 0) {
 651                        _debug("discard ACK %d <= %d",
 652                               latest, call->acks_latest);
 653                        goto discard;
 654                }
 655                call->acks_latest = latest;
 656
 657                if (call->state != RXRPC_CALL_CLIENT_SEND_REQUEST &&
 658                    call->state != RXRPC_CALL_CLIENT_AWAIT_REPLY &&
 659                    call->state != RXRPC_CALL_SERVER_SEND_REPLY &&
 660                    call->state != RXRPC_CALL_SERVER_AWAIT_ACK)
 661                        goto discard;
 662
 663                _debug("Tx=%d H=%u S=%d", tx, call->acks_hard, call->state);
 664
 665                if (hard > 0) {
 666                        if (hard - 1 > tx) {
 667                                _debug("hard-ACK'd packet %d not transmitted"
 668                                       " (%d top)",
 669                                       hard - 1, tx);
 670                                goto protocol_error;
 671                        }
 672
 673                        if ((call->state == RXRPC_CALL_CLIENT_AWAIT_REPLY ||
 674                             call->state == RXRPC_CALL_SERVER_AWAIT_ACK) &&
 675                            hard > tx)
 676                                goto all_acked;
 677
 678                        smp_rmb();
 679                        rxrpc_rotate_tx_window(call, hard - 1);
 680                }
 681
 682                if (ack.nAcks > 0) {
 683                        if (hard - 1 + ack.nAcks > tx) {
 684                                _debug("soft-ACK'd packet %d+%d not"
 685                                       " transmitted (%d top)",
 686                                       hard - 1, ack.nAcks, tx);
 687                                goto protocol_error;
 688                        }
 689
 690                        if (rxrpc_process_soft_ACKs(call, &ack, skb) < 0)
 691                                goto protocol_error;
 692                }
 693                goto discard;
 694
 695                /* complete ACK to process */
 696        case RXRPC_PACKET_TYPE_ACKALL:
 697                goto all_acked;
 698
 699                /* abort and busy are handled elsewhere */
 700        case RXRPC_PACKET_TYPE_BUSY:
 701        case RXRPC_PACKET_TYPE_ABORT:
 702                BUG();
 703
 704                /* connection level events - also handled elsewhere */
 705        case RXRPC_PACKET_TYPE_CHALLENGE:
 706        case RXRPC_PACKET_TYPE_RESPONSE:
 707        case RXRPC_PACKET_TYPE_DEBUG:
 708                BUG();
 709        }
 710
 711        /* if we've had a hard ACK that covers all the packets we've sent, then
 712         * that ends that phase of the operation */
 713all_acked:
 714        write_lock_bh(&call->state_lock);
 715        _debug("ack all %d", call->state);
 716
 717        switch (call->state) {
 718        case RXRPC_CALL_CLIENT_AWAIT_REPLY:
 719                call->state = RXRPC_CALL_CLIENT_RECV_REPLY;
 720                break;
 721        case RXRPC_CALL_SERVER_AWAIT_ACK:
 722                _debug("srv complete");
 723                call->state = RXRPC_CALL_COMPLETE;
 724                post_ACK = true;
 725                break;
 726        case RXRPC_CALL_CLIENT_SEND_REQUEST:
 727        case RXRPC_CALL_SERVER_RECV_REQUEST:
 728                goto protocol_error_unlock; /* can't occur yet */
 729        default:
 730                write_unlock_bh(&call->state_lock);
 731                goto discard; /* assume packet left over from earlier phase */
 732        }
 733
 734        write_unlock_bh(&call->state_lock);
 735
 736        /* if all the packets we sent are hard-ACK'd, then we can discard
 737         * whatever we've got left */
 738        _debug("clear Tx %d",
 739               CIRC_CNT(call->acks_head, call->acks_tail, call->acks_winsz));
 740
 741        del_timer_sync(&call->resend_timer);
 742        clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 743        clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events);
 744
 745        if (call->acks_window)
 746                rxrpc_zap_tx_window(call);
 747
 748        if (post_ACK) {
 749                /* post the final ACK message for userspace to pick up */
 750                _debug("post ACK");
 751                skb->mark = RXRPC_SKB_MARK_FINAL_ACK;
 752                sp->call = call;
 753                rxrpc_get_call(call);
 754                spin_lock_bh(&call->lock);
 755                if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
 756                        BUG();
 757                spin_unlock_bh(&call->lock);
 758                goto process_further;
 759        }
 760
 761discard:
 762        rxrpc_free_skb(skb);
 763        goto process_further;
 764
 765protocol_error_unlock:
 766        write_unlock_bh(&call->state_lock);
 767protocol_error:
 768        rxrpc_free_skb(skb);
 769        _leave(" = -EPROTO");
 770        return -EPROTO;
 771}
 772
 773/*
 774 * post a message to the socket Rx queue for recvmsg() to pick up
 775 */
 776static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
 777                              bool fatal)
 778{
 779        struct rxrpc_skb_priv *sp;
 780        struct sk_buff *skb;
 781        int ret;
 782
 783        _enter("{%d,%lx},%u,%u,%d",
 784               call->debug_id, call->flags, mark, error, fatal);
 785
 786        /* remove timers and things for fatal messages */
 787        if (fatal) {
 788                del_timer_sync(&call->resend_timer);
 789                del_timer_sync(&call->ack_timer);
 790                clear_bit(RXRPC_CALL_RUN_RTIMER, &call->flags);
 791        }
 792
 793        if (mark != RXRPC_SKB_MARK_NEW_CALL &&
 794            !test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
 795                _leave("[no userid]");
 796                return 0;
 797        }
 798
 799        if (!test_bit(RXRPC_CALL_TERMINAL_MSG, &call->flags)) {
 800                skb = alloc_skb(0, GFP_NOFS);
 801                if (!skb)
 802                        return -ENOMEM;
 803
 804                rxrpc_new_skb(skb);
 805
 806                skb->mark = mark;
 807
 808                sp = rxrpc_skb(skb);
 809                memset(sp, 0, sizeof(*sp));
 810                sp->error = error;
 811                sp->call = call;
 812                rxrpc_get_call(call);
 813
 814                spin_lock_bh(&call->lock);
 815                ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
 816                spin_unlock_bh(&call->lock);
 817                BUG_ON(ret < 0);
 818        }
 819
 820        return 0;
 821}
 822
 823/*
 824 * handle background processing of incoming call packets and ACK / abort
 825 * generation
 826 */
 827void rxrpc_process_call(struct work_struct *work)
 828{
 829        struct rxrpc_call *call =
 830                container_of(work, struct rxrpc_call, processor);
 831        struct rxrpc_ackpacket ack;
 832        struct rxrpc_ackinfo ackinfo;
 833        struct rxrpc_header hdr;
 834        struct msghdr msg;
 835        struct kvec iov[5];
 836        unsigned long bits;
 837        __be32 data, pad;
 838        size_t len;
 839        int genbit, loop, nbit, ioc, ret, mtu;
 840        u32 abort_code = RX_PROTOCOL_ERROR;
 841        u8 *acks = NULL;
 842
 843        //printk("\n--------------------\n");
 844        _enter("{%d,%s,%lx} [%lu]",
 845               call->debug_id, rxrpc_call_states[call->state], call->events,
 846               (jiffies - call->creation_jif) / (HZ / 10));
 847
 848        if (test_and_set_bit(RXRPC_CALL_PROC_BUSY, &call->flags)) {
 849                _debug("XXXXXXXXXXXXX RUNNING ON MULTIPLE CPUS XXXXXXXXXXXXX");
 850                return;
 851        }
 852
 853        /* there's a good chance we're going to have to send a message, so set
 854         * one up in advance */
 855        msg.msg_name    = &call->conn->trans->peer->srx.transport.sin;
 856        msg.msg_namelen = sizeof(call->conn->trans->peer->srx.transport.sin);
 857        msg.msg_control = NULL;
 858        msg.msg_controllen = 0;
 859        msg.msg_flags   = 0;
 860
 861        hdr.epoch       = call->conn->epoch;
 862        hdr.cid         = call->cid;
 863        hdr.callNumber  = call->call_id;
 864        hdr.seq         = 0;
 865        hdr.type        = RXRPC_PACKET_TYPE_ACK;
 866        hdr.flags       = call->conn->out_clientflag;
 867        hdr.userStatus  = 0;
 868        hdr.securityIndex = call->conn->security_ix;
 869        hdr._rsvd       = 0;
 870        hdr.serviceId   = call->conn->service_id;
 871
 872        memset(iov, 0, sizeof(iov));
 873        iov[0].iov_base = &hdr;
 874        iov[0].iov_len  = sizeof(hdr);
 875
 876        /* deal with events of a final nature */
 877        if (test_bit(RXRPC_CALL_RELEASE, &call->events)) {
 878                rxrpc_release_call(call);
 879                clear_bit(RXRPC_CALL_RELEASE, &call->events);
 880        }
 881
 882        if (test_bit(RXRPC_CALL_RCVD_ERROR, &call->events)) {
 883                int error;
 884
 885                clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
 886                clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
 887                clear_bit(RXRPC_CALL_ABORT, &call->events);
 888
 889                error = call->conn->trans->peer->net_error;
 890                _debug("post net error %d", error);
 891
 892                if (rxrpc_post_message(call, RXRPC_SKB_MARK_NET_ERROR,
 893                                       error, true) < 0)
 894                        goto no_mem;
 895                clear_bit(RXRPC_CALL_RCVD_ERROR, &call->events);
 896                goto kill_ACKs;
 897        }
 898
 899        if (test_bit(RXRPC_CALL_CONN_ABORT, &call->events)) {
 900                ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
 901
 902                clear_bit(RXRPC_CALL_REJECT_BUSY, &call->events);
 903                clear_bit(RXRPC_CALL_ABORT, &call->events);
 904
 905                _debug("post conn abort");
 906
 907                if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 908                                       call->conn->error, true) < 0)
 909                        goto no_mem;
 910                clear_bit(RXRPC_CALL_CONN_ABORT, &call->events);
 911                goto kill_ACKs;
 912        }
 913
 914        if (test_bit(RXRPC_CALL_REJECT_BUSY, &call->events)) {
 915                hdr.type = RXRPC_PACKET_TYPE_BUSY;
 916                genbit = RXRPC_CALL_REJECT_BUSY;
 917                goto send_message;
 918        }
 919
 920        if (test_bit(RXRPC_CALL_ABORT, &call->events)) {
 921                ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
 922
 923                if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 924                                       ECONNABORTED, true) < 0)
 925                        goto no_mem;
 926                hdr.type = RXRPC_PACKET_TYPE_ABORT;
 927                data = htonl(call->abort_code);
 928                iov[1].iov_base = &data;
 929                iov[1].iov_len = sizeof(data);
 930                genbit = RXRPC_CALL_ABORT;
 931                goto send_message;
 932        }
 933
 934        if (test_bit(RXRPC_CALL_ACK_FINAL, &call->events)) {
 935                genbit = RXRPC_CALL_ACK_FINAL;
 936
 937                ack.bufferSpace = htons(8);
 938                ack.maxSkew     = 0;
 939                ack.serial      = 0;
 940                ack.reason      = RXRPC_ACK_IDLE;
 941                ack.nAcks       = 0;
 942                call->ackr_reason = 0;
 943
 944                spin_lock_bh(&call->lock);
 945                ack.serial = call->ackr_serial;
 946                ack.previousPacket = call->ackr_prev_seq;
 947                ack.firstPacket = htonl(call->rx_data_eaten + 1);
 948                spin_unlock_bh(&call->lock);
 949
 950                pad = 0;
 951
 952                iov[1].iov_base = &ack;
 953                iov[1].iov_len  = sizeof(ack);
 954                iov[2].iov_base = &pad;
 955                iov[2].iov_len  = 3;
 956                iov[3].iov_base = &ackinfo;
 957                iov[3].iov_len  = sizeof(ackinfo);
 958                goto send_ACK;
 959        }
 960
 961        if (call->events & ((1 << RXRPC_CALL_RCVD_BUSY) |
 962                            (1 << RXRPC_CALL_RCVD_ABORT))
 963            ) {
 964                u32 mark;
 965
 966                if (test_bit(RXRPC_CALL_RCVD_ABORT, &call->events))
 967                        mark = RXRPC_SKB_MARK_REMOTE_ABORT;
 968                else
 969                        mark = RXRPC_SKB_MARK_BUSY;
 970
 971                _debug("post abort/busy");
 972                rxrpc_clear_tx_window(call);
 973                if (rxrpc_post_message(call, mark, ECONNABORTED, true) < 0)
 974                        goto no_mem;
 975
 976                clear_bit(RXRPC_CALL_RCVD_BUSY, &call->events);
 977                clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
 978                goto kill_ACKs;
 979        }
 980
 981        if (test_and_clear_bit(RXRPC_CALL_RCVD_ACKALL, &call->events)) {
 982                _debug("do implicit ackall");
 983                rxrpc_clear_tx_window(call);
 984        }
 985
 986        if (test_bit(RXRPC_CALL_LIFE_TIMER, &call->events)) {
 987                write_lock_bh(&call->state_lock);
 988                if (call->state <= RXRPC_CALL_COMPLETE) {
 989                        call->state = RXRPC_CALL_LOCALLY_ABORTED;
 990                        call->abort_code = RX_CALL_TIMEOUT;
 991                        set_bit(RXRPC_CALL_ABORT, &call->events);
 992                }
 993                write_unlock_bh(&call->state_lock);
 994
 995                _debug("post timeout");
 996                if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
 997                                       ETIME, true) < 0)
 998                        goto no_mem;
 999
1000                clear_bit(RXRPC_CALL_LIFE_TIMER, &call->events);
1001                goto kill_ACKs;
1002        }
1003
1004        /* deal with assorted inbound messages */
1005        if (!skb_queue_empty(&call->rx_queue)) {
1006                switch (rxrpc_process_rx_queue(call, &abort_code)) {
1007                case 0:
1008                case -EAGAIN:
1009                        break;
1010                case -ENOMEM:
1011                        goto no_mem;
1012                case -EKEYEXPIRED:
1013                case -EKEYREJECTED:
1014                case -EPROTO:
1015                        rxrpc_abort_call(call, abort_code);
1016                        goto kill_ACKs;
1017                }
1018        }
1019
1020        /* handle resending */
1021        if (test_and_clear_bit(RXRPC_CALL_RESEND_TIMER, &call->events))
1022                rxrpc_resend_timer(call);
1023        if (test_and_clear_bit(RXRPC_CALL_RESEND, &call->events))
1024                rxrpc_resend(call);
1025
1026        /* consider sending an ordinary ACK */
1027        if (test_bit(RXRPC_CALL_ACK, &call->events)) {
1028                _debug("send ACK: window: %d - %d { %lx }",
1029                       call->rx_data_eaten, call->ackr_win_top,
1030                       call->ackr_window[0]);
1031
1032                if (call->state > RXRPC_CALL_SERVER_ACK_REQUEST &&
1033                    call->ackr_reason != RXRPC_ACK_PING_RESPONSE) {
1034                        /* ACK by sending reply DATA packet in this state */
1035                        clear_bit(RXRPC_CALL_ACK, &call->events);
1036                        goto maybe_reschedule;
1037                }
1038
1039                genbit = RXRPC_CALL_ACK;
1040
1041                acks = kzalloc(call->ackr_win_top - call->rx_data_eaten,
1042                               GFP_NOFS);
1043                if (!acks)
1044                        goto no_mem;
1045
1046                //hdr.flags     = RXRPC_SLOW_START_OK;
1047                ack.bufferSpace = htons(8);
1048                ack.maxSkew     = 0;
1049                ack.serial      = 0;
1050                ack.reason      = 0;
1051
1052                spin_lock_bh(&call->lock);
1053                ack.reason = call->ackr_reason;
1054                ack.serial = call->ackr_serial;
1055                ack.previousPacket = call->ackr_prev_seq;
1056                ack.firstPacket = htonl(call->rx_data_eaten + 1);
1057
1058                ack.nAcks = 0;
1059                for (loop = 0; loop < RXRPC_ACKR_WINDOW_ASZ; loop++) {
1060                        nbit = loop * BITS_PER_LONG;
1061                        for (bits = call->ackr_window[loop]; bits; bits >>= 1
1062                             ) {
1063                                _debug("- l=%d n=%d b=%lx", loop, nbit, bits);
1064                                if (bits & 1) {
1065                                        acks[nbit] = RXRPC_ACK_TYPE_ACK;
1066                                        ack.nAcks = nbit + 1;
1067                                }
1068                                nbit++;
1069                        }
1070                }
1071                call->ackr_reason = 0;
1072                spin_unlock_bh(&call->lock);
1073
1074                pad = 0;
1075
1076                iov[1].iov_base = &ack;
1077                iov[1].iov_len  = sizeof(ack);
1078                iov[2].iov_base = acks;
1079                iov[2].iov_len  = ack.nAcks;
1080                iov[3].iov_base = &pad;
1081                iov[3].iov_len  = 3;
1082                iov[4].iov_base = &ackinfo;
1083                iov[4].iov_len  = sizeof(ackinfo);
1084
1085                switch (ack.reason) {
1086                case RXRPC_ACK_REQUESTED:
1087                case RXRPC_ACK_DUPLICATE:
1088                case RXRPC_ACK_OUT_OF_SEQUENCE:
1089                case RXRPC_ACK_EXCEEDS_WINDOW:
1090                case RXRPC_ACK_NOSPACE:
1091                case RXRPC_ACK_PING:
1092                case RXRPC_ACK_PING_RESPONSE:
1093                        goto send_ACK_with_skew;
1094                case RXRPC_ACK_DELAY:
1095                case RXRPC_ACK_IDLE:
1096                        goto send_ACK;
1097                }
1098        }
1099
1100        /* handle completion of security negotiations on an incoming
1101         * connection */
1102        if (test_and_clear_bit(RXRPC_CALL_SECURED, &call->events)) {
1103                _debug("secured");
1104                spin_lock_bh(&call->lock);
1105
1106                if (call->state == RXRPC_CALL_SERVER_SECURING) {
1107                        _debug("securing");
1108                        write_lock(&call->conn->lock);
1109                        if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1110                            !test_bit(RXRPC_CALL_RELEASE, &call->events)) {
1111                                _debug("not released");
1112                                call->state = RXRPC_CALL_SERVER_ACCEPTING;
1113                                list_move_tail(&call->accept_link,
1114                                               &call->socket->acceptq);
1115                        }
1116                        write_unlock(&call->conn->lock);
1117                        read_lock(&call->state_lock);
1118                        if (call->state < RXRPC_CALL_COMPLETE)
1119                                set_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1120                        read_unlock(&call->state_lock);
1121                }
1122
1123                spin_unlock_bh(&call->lock);
1124                if (!test_bit(RXRPC_CALL_POST_ACCEPT, &call->events))
1125                        goto maybe_reschedule;
1126        }
1127
1128        /* post a notification of an acceptable connection to the app */
1129        if (test_bit(RXRPC_CALL_POST_ACCEPT, &call->events)) {
1130                _debug("post accept");
1131                if (rxrpc_post_message(call, RXRPC_SKB_MARK_NEW_CALL,
1132                                       0, false) < 0)
1133                        goto no_mem;
1134                clear_bit(RXRPC_CALL_POST_ACCEPT, &call->events);
1135                goto maybe_reschedule;
1136        }
1137
1138        /* handle incoming call acceptance */
1139        if (test_and_clear_bit(RXRPC_CALL_ACCEPTED, &call->events)) {
1140                _debug("accepted");
1141                ASSERTCMP(call->rx_data_post, ==, 0);
1142                call->rx_data_post = 1;
1143                read_lock_bh(&call->state_lock);
1144                if (call->state < RXRPC_CALL_COMPLETE)
1145                        set_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events);
1146                read_unlock_bh(&call->state_lock);
1147        }
1148
1149        /* drain the out of sequence received packet queue into the packet Rx
1150         * queue */
1151        if (test_and_clear_bit(RXRPC_CALL_DRAIN_RX_OOS, &call->events)) {
1152                while (call->rx_data_post == call->rx_first_oos)
1153                        if (rxrpc_drain_rx_oos_queue(call) < 0)
1154                                break;
1155                goto maybe_reschedule;
1156        }
1157
1158        /* other events may have been raised since we started checking */
1159        goto maybe_reschedule;
1160
1161send_ACK_with_skew:
1162        ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) -
1163                            ntohl(ack.serial));
1164send_ACK:
1165        mtu = call->conn->trans->peer->if_mtu;
1166        mtu -= call->conn->trans->peer->hdrsize;
1167        ackinfo.maxMTU  = htonl(mtu);
1168        ackinfo.rwind   = htonl(32);
1169
1170        /* permit the peer to send us jumbo packets if it wants to */
1171        ackinfo.rxMTU   = htonl(5692);
1172        ackinfo.jumbo_max = htonl(4);
1173
1174        hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1175        _proto("Tx ACK %%%u { m=%hu f=#%u p=#%u s=%%%u r=%s n=%u }",
1176               ntohl(hdr.serial),
1177               ntohs(ack.maxSkew),
1178               ntohl(ack.firstPacket),
1179               ntohl(ack.previousPacket),
1180               ntohl(ack.serial),
1181               rxrpc_acks[ack.reason],
1182               ack.nAcks);
1183
1184        del_timer_sync(&call->ack_timer);
1185        if (ack.nAcks > 0)
1186                set_bit(RXRPC_CALL_TX_SOFT_ACK, &call->flags);
1187        goto send_message_2;
1188
1189send_message:
1190        _debug("send message");
1191
1192        hdr.serial = htonl(atomic_inc_return(&call->conn->serial));
1193        _proto("Tx %s %%%u", rxrpc_pkts[hdr.type], ntohl(hdr.serial));
1194send_message_2:
1195
1196        len = iov[0].iov_len;
1197        ioc = 1;
1198        if (iov[4].iov_len) {
1199                ioc = 5;
1200                len += iov[4].iov_len;
1201                len += iov[3].iov_len;
1202                len += iov[2].iov_len;
1203                len += iov[1].iov_len;
1204        } else if (iov[3].iov_len) {
1205                ioc = 4;
1206                len += iov[3].iov_len;
1207                len += iov[2].iov_len;
1208                len += iov[1].iov_len;
1209        } else if (iov[2].iov_len) {
1210                ioc = 3;
1211                len += iov[2].iov_len;
1212                len += iov[1].iov_len;
1213        } else if (iov[1].iov_len) {
1214                ioc = 2;
1215                len += iov[1].iov_len;
1216        }
1217
1218        ret = kernel_sendmsg(call->conn->trans->local->socket,
1219                             &msg, iov, ioc, len);
1220        if (ret < 0) {
1221                _debug("sendmsg failed: %d", ret);
1222                read_lock_bh(&call->state_lock);
1223                if (call->state < RXRPC_CALL_DEAD)
1224                        rxrpc_queue_call(call);
1225                read_unlock_bh(&call->state_lock);
1226                goto error;
1227        }
1228
1229        switch (genbit) {
1230        case RXRPC_CALL_ABORT:
1231                clear_bit(genbit, &call->events);
1232                clear_bit(RXRPC_CALL_RCVD_ABORT, &call->events);
1233                goto kill_ACKs;
1234
1235        case RXRPC_CALL_ACK_FINAL:
1236                write_lock_bh(&call->state_lock);
1237                if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK)
1238                        call->state = RXRPC_CALL_COMPLETE;
1239                write_unlock_bh(&call->state_lock);
1240                goto kill_ACKs;
1241
1242        default:
1243                clear_bit(genbit, &call->events);
1244                switch (call->state) {
1245                case RXRPC_CALL_CLIENT_AWAIT_REPLY:
1246                case RXRPC_CALL_CLIENT_RECV_REPLY:
1247                case RXRPC_CALL_SERVER_RECV_REQUEST:
1248                case RXRPC_CALL_SERVER_ACK_REQUEST:
1249                        _debug("start ACK timer");
1250                        rxrpc_propose_ACK(call, RXRPC_ACK_DELAY,
1251                                          call->ackr_serial, false);
1252                default:
1253                        break;
1254                }
1255                goto maybe_reschedule;
1256        }
1257
1258kill_ACKs:
1259        del_timer_sync(&call->ack_timer);
1260        if (test_and_clear_bit(RXRPC_CALL_ACK_FINAL, &call->events))
1261                rxrpc_put_call(call);
1262        clear_bit(RXRPC_CALL_ACK, &call->events);
1263
1264maybe_reschedule:
1265        if (call->events || !skb_queue_empty(&call->rx_queue)) {
1266                read_lock_bh(&call->state_lock);
1267                if (call->state < RXRPC_CALL_DEAD)
1268                        rxrpc_queue_call(call);
1269                read_unlock_bh(&call->state_lock);
1270        }
1271
1272        /* don't leave aborted connections on the accept queue */
1273        if (call->state >= RXRPC_CALL_COMPLETE &&
1274            !list_empty(&call->accept_link)) {
1275                _debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }",
1276                       call, call->events, call->flags,
1277                       ntohl(call->conn->cid));
1278
1279                read_lock_bh(&call->state_lock);
1280                if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
1281                    !test_and_set_bit(RXRPC_CALL_RELEASE, &call->events))
1282                        rxrpc_queue_call(call);
1283                read_unlock_bh(&call->state_lock);
1284        }
1285
1286error:
1287        clear_bit(RXRPC_CALL_PROC_BUSY, &call->flags);
1288        kfree(acks);
1289
1290        /* because we don't want two CPUs both processing the work item for one
1291         * call at the same time, we use a flag to note when it's busy; however
1292         * this means there's a race between clearing the flag and setting the
1293         * work pending bit and the work item being processed again */
1294        if (call->events && !work_pending(&call->processor)) {
1295                _debug("jumpstart %x", ntohl(call->conn->cid));
1296                rxrpc_queue_call(call);
1297        }
1298
1299        _leave("");
1300        return;
1301
1302no_mem:
1303        _debug("out of memory");
1304        goto maybe_reschedule;
1305}
1306