linux/net/rxrpc/recvmsg.c
<<
>>
Prefs
   1/* RxRPC recvmsg() implementation
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  13
  14#include <linux/net.h>
  15#include <linux/skbuff.h>
  16#include <linux/export.h>
  17#include <linux/sched/signal.h>
  18
  19#include <net/sock.h>
  20#include <net/af_rxrpc.h>
  21#include "ar-internal.h"
  22
  23/*
  24 * Post a call for attention by the socket or kernel service.  Further
  25 * notifications are suppressed by putting recvmsg_link on a dummy queue.
  26 */
  27void rxrpc_notify_socket(struct rxrpc_call *call)
  28{
  29        struct rxrpc_sock *rx;
  30        struct sock *sk;
  31
  32        _enter("%d", call->debug_id);
  33
  34        if (!list_empty(&call->recvmsg_link))
  35                return;
  36
  37        rcu_read_lock();
  38
  39        rx = rcu_dereference(call->socket);
  40        sk = &rx->sk;
  41        if (rx && sk->sk_state < RXRPC_CLOSE) {
  42                if (call->notify_rx) {
  43                        spin_lock_bh(&call->notify_lock);
  44                        call->notify_rx(sk, call, call->user_call_ID);
  45                        spin_unlock_bh(&call->notify_lock);
  46                } else {
  47                        write_lock_bh(&rx->recvmsg_lock);
  48                        if (list_empty(&call->recvmsg_link)) {
  49                                rxrpc_get_call(call, rxrpc_call_got);
  50                                list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
  51                        }
  52                        write_unlock_bh(&rx->recvmsg_lock);
  53
  54                        if (!sock_flag(sk, SOCK_DEAD)) {
  55                                _debug("call %ps", sk->sk_data_ready);
  56                                sk->sk_data_ready(sk);
  57                        }
  58                }
  59        }
  60
  61        rcu_read_unlock();
  62        _leave("");
  63}
  64
  65/*
  66 * Pass a call terminating message to userspace.
  67 */
  68static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
  69{
  70        u32 tmp = 0;
  71        int ret;
  72
  73        switch (call->completion) {
  74        case RXRPC_CALL_SUCCEEDED:
  75                ret = 0;
  76                if (rxrpc_is_service_call(call))
  77                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
  78                break;
  79        case RXRPC_CALL_REMOTELY_ABORTED:
  80                tmp = call->abort_code;
  81                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
  82                break;
  83        case RXRPC_CALL_LOCALLY_ABORTED:
  84                tmp = call->abort_code;
  85                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
  86                break;
  87        case RXRPC_CALL_NETWORK_ERROR:
  88                tmp = -call->error;
  89                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
  90                break;
  91        case RXRPC_CALL_LOCAL_ERROR:
  92                tmp = -call->error;
  93                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
  94                break;
  95        default:
  96                pr_err("Invalid terminal call state %u\n", call->state);
  97                BUG();
  98                break;
  99        }
 100
 101        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_terminal, call->rx_hard_ack,
 102                            call->rx_pkt_offset, call->rx_pkt_len, ret);
 103        return ret;
 104}
 105
 106/*
 107 * Pass back notification of a new call.  The call is added to the
 108 * to-be-accepted list.  This means that the next call to be accepted might not
 109 * be the last call seen awaiting acceptance, but unless we leave this on the
 110 * front of the queue and block all other messages until someone gives us a
 111 * user_ID for it, there's not a lot we can do.
 112 */
 113static int rxrpc_recvmsg_new_call(struct rxrpc_sock *rx,
 114                                  struct rxrpc_call *call,
 115                                  struct msghdr *msg, int flags)
 116{
 117        int tmp = 0, ret;
 118
 119        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &tmp);
 120
 121        if (ret == 0 && !(flags & MSG_PEEK)) {
 122                _debug("to be accepted");
 123                write_lock_bh(&rx->recvmsg_lock);
 124                list_del_init(&call->recvmsg_link);
 125                write_unlock_bh(&rx->recvmsg_lock);
 126
 127                rxrpc_get_call(call, rxrpc_call_got);
 128                write_lock(&rx->call_lock);
 129                list_add_tail(&call->accept_link, &rx->to_be_accepted);
 130                write_unlock(&rx->call_lock);
 131        }
 132
 133        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_to_be_accepted, 1, 0, 0, ret);
 134        return ret;
 135}
 136
 137/*
 138 * End the packet reception phase.
 139 */
 140static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
 141{
 142        _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
 143
 144        trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
 145        ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
 146
 147        if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
 148                rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, serial, false, true,
 149                                  rxrpc_propose_ack_terminal_ack);
 150                //rxrpc_send_ack_packet(call, false, NULL);
 151        }
 152
 153        write_lock_bh(&call->state_lock);
 154
 155        switch (call->state) {
 156        case RXRPC_CALL_CLIENT_RECV_REPLY:
 157                __rxrpc_call_completed(call);
 158                write_unlock_bh(&call->state_lock);
 159                break;
 160
 161        case RXRPC_CALL_SERVER_RECV_REQUEST:
 162                call->tx_phase = true;
 163                call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
 164                call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
 165                write_unlock_bh(&call->state_lock);
 166                rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, 0, serial, false, true,
 167                                  rxrpc_propose_ack_processing_op);
 168                break;
 169        default:
 170                write_unlock_bh(&call->state_lock);
 171                break;
 172        }
 173}
 174
 175/*
 176 * Discard a packet we've used up and advance the Rx window by one.
 177 */
 178static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
 179{
 180        struct rxrpc_skb_priv *sp;
 181        struct sk_buff *skb;
 182        rxrpc_serial_t serial;
 183        rxrpc_seq_t hard_ack, top;
 184        u8 flags;
 185        int ix;
 186
 187        _enter("%d", call->debug_id);
 188
 189        hard_ack = call->rx_hard_ack;
 190        top = smp_load_acquire(&call->rx_top);
 191        ASSERT(before(hard_ack, top));
 192
 193        hard_ack++;
 194        ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
 195        skb = call->rxtx_buffer[ix];
 196        rxrpc_see_skb(skb, rxrpc_skb_rx_rotated);
 197        sp = rxrpc_skb(skb);
 198        flags = sp->hdr.flags;
 199        serial = sp->hdr.serial;
 200        if (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO)
 201                serial += (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO) - 1;
 202
 203        call->rxtx_buffer[ix] = NULL;
 204        call->rxtx_annotations[ix] = 0;
 205        /* Barrier against rxrpc_input_data(). */
 206        smp_store_release(&call->rx_hard_ack, hard_ack);
 207
 208        rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 209
 210        _debug("%u,%u,%02x", hard_ack, top, flags);
 211        trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
 212        if (flags & RXRPC_LAST_PACKET) {
 213                rxrpc_end_rx_phase(call, serial);
 214        } else {
 215                /* Check to see if there's an ACK that needs sending. */
 216                if (after_eq(hard_ack, call->ackr_consumed + 2) ||
 217                    after_eq(top, call->ackr_seen + 2) ||
 218                    (hard_ack == top && after(hard_ack, call->ackr_consumed)))
 219                        rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, 0, serial,
 220                                          true, true,
 221                                          rxrpc_propose_ack_rotate_rx);
 222                if (call->ackr_reason && call->ackr_reason != RXRPC_ACK_DELAY)
 223                        rxrpc_send_ack_packet(call, false, NULL);
 224        }
 225}
 226
 227/*
 228 * Decrypt and verify a (sub)packet.  The packet's length may be changed due to
 229 * padding, but if this is the case, the packet length will be resident in the
 230 * socket buffer.  Note that we can't modify the master skb info as the skb may
 231 * be the home to multiple subpackets.
 232 */
 233static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb,
 234                               u8 annotation,
 235                               unsigned int offset, unsigned int len)
 236{
 237        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 238        rxrpc_seq_t seq = sp->hdr.seq;
 239        u16 cksum = sp->hdr.cksum;
 240
 241        _enter("");
 242
 243        /* For all but the head jumbo subpacket, the security checksum is in a
 244         * jumbo header immediately prior to the data.
 245         */
 246        if ((annotation & RXRPC_RX_ANNO_JUMBO) > 1) {
 247                __be16 tmp;
 248                if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0)
 249                        BUG();
 250                cksum = ntohs(tmp);
 251                seq += (annotation & RXRPC_RX_ANNO_JUMBO) - 1;
 252        }
 253
 254        return call->conn->security->verify_packet(call, skb, offset, len,
 255                                                   seq, cksum);
 256}
 257
 258/*
 259 * Locate the data within a packet.  This is complicated by:
 260 *
 261 * (1) An skb may contain a jumbo packet - so we have to find the appropriate
 262 *     subpacket.
 263 *
 264 * (2) The (sub)packets may be encrypted and, if so, the encrypted portion
 265 *     contains an extra header which includes the true length of the data,
 266 *     excluding any encrypted padding.
 267 */
 268static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
 269                             u8 *_annotation,
 270                             unsigned int *_offset, unsigned int *_len)
 271{
 272        unsigned int offset = sizeof(struct rxrpc_wire_header);
 273        unsigned int len;
 274        int ret;
 275        u8 annotation = *_annotation;
 276
 277        /* Locate the subpacket */
 278        len = skb->len - offset;
 279        if ((annotation & RXRPC_RX_ANNO_JUMBO) > 0) {
 280                offset += (((annotation & RXRPC_RX_ANNO_JUMBO) - 1) *
 281                           RXRPC_JUMBO_SUBPKTLEN);
 282                len = (annotation & RXRPC_RX_ANNO_JLAST) ?
 283                        skb->len - offset : RXRPC_JUMBO_SUBPKTLEN;
 284        }
 285
 286        if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) {
 287                ret = rxrpc_verify_packet(call, skb, annotation, offset, len);
 288                if (ret < 0)
 289                        return ret;
 290                *_annotation |= RXRPC_RX_ANNO_VERIFIED;
 291        }
 292
 293        *_offset = offset;
 294        *_len = len;
 295        call->conn->security->locate_data(call, skb, _offset, _len);
 296        return 0;
 297}
 298
 299/*
 300 * Deliver messages to a call.  This keeps processing packets until the buffer
 301 * is filled and we find either more DATA (returns 0) or the end of the DATA
 302 * (returns 1).  If more packets are required, it returns -EAGAIN.
 303 */
 304static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
 305                              struct msghdr *msg, struct iov_iter *iter,
 306                              size_t len, int flags, size_t *_offset)
 307{
 308        struct rxrpc_skb_priv *sp;
 309        struct sk_buff *skb;
 310        rxrpc_seq_t hard_ack, top, seq;
 311        size_t remain;
 312        bool last;
 313        unsigned int rx_pkt_offset, rx_pkt_len;
 314        int ix, copy, ret = -EAGAIN, ret2;
 315
 316        if (test_and_clear_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags) &&
 317            call->ackr_reason)
 318                rxrpc_send_ack_packet(call, false, NULL);
 319
 320        rx_pkt_offset = call->rx_pkt_offset;
 321        rx_pkt_len = call->rx_pkt_len;
 322
 323        if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
 324                seq = call->rx_hard_ack;
 325                ret = 1;
 326                goto done;
 327        }
 328
 329        /* Barriers against rxrpc_input_data(). */
 330        hard_ack = call->rx_hard_ack;
 331        seq = hard_ack + 1;
 332        while (top = smp_load_acquire(&call->rx_top),
 333               before_eq(seq, top)
 334               ) {
 335                ix = seq & RXRPC_RXTX_BUFF_MASK;
 336                skb = call->rxtx_buffer[ix];
 337                if (!skb) {
 338                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_hole, seq,
 339                                            rx_pkt_offset, rx_pkt_len, 0);
 340                        break;
 341                }
 342                smp_rmb();
 343                rxrpc_see_skb(skb, rxrpc_skb_rx_seen);
 344                sp = rxrpc_skb(skb);
 345
 346                if (!(flags & MSG_PEEK))
 347                        trace_rxrpc_receive(call, rxrpc_receive_front,
 348                                            sp->hdr.serial, seq);
 349
 350                if (msg)
 351                        sock_recv_timestamp(msg, sock->sk, skb);
 352
 353                if (rx_pkt_offset == 0) {
 354                        ret2 = rxrpc_locate_data(call, skb,
 355                                                 &call->rxtx_annotations[ix],
 356                                                 &rx_pkt_offset, &rx_pkt_len);
 357                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_next, seq,
 358                                            rx_pkt_offset, rx_pkt_len, ret2);
 359                        if (ret2 < 0) {
 360                                ret = ret2;
 361                                goto out;
 362                        }
 363                } else {
 364                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_cont, seq,
 365                                            rx_pkt_offset, rx_pkt_len, 0);
 366                }
 367
 368                /* We have to handle short, empty and used-up DATA packets. */
 369                remain = len - *_offset;
 370                copy = rx_pkt_len;
 371                if (copy > remain)
 372                        copy = remain;
 373                if (copy > 0) {
 374                        ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
 375                                                      copy);
 376                        if (ret2 < 0) {
 377                                ret = ret2;
 378                                goto out;
 379                        }
 380
 381                        /* handle piecemeal consumption of data packets */
 382                        rx_pkt_offset += copy;
 383                        rx_pkt_len -= copy;
 384                        *_offset += copy;
 385                }
 386
 387                if (rx_pkt_len > 0) {
 388                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_full, seq,
 389                                            rx_pkt_offset, rx_pkt_len, 0);
 390                        ASSERTCMP(*_offset, ==, len);
 391                        ret = 0;
 392                        break;
 393                }
 394
 395                /* The whole packet has been transferred. */
 396                last = sp->hdr.flags & RXRPC_LAST_PACKET;
 397                if (!(flags & MSG_PEEK))
 398                        rxrpc_rotate_rx_window(call);
 399                rx_pkt_offset = 0;
 400                rx_pkt_len = 0;
 401
 402                if (last) {
 403                        ASSERTCMP(seq, ==, READ_ONCE(call->rx_top));
 404                        ret = 1;
 405                        goto out;
 406                }
 407
 408                seq++;
 409        }
 410
 411out:
 412        if (!(flags & MSG_PEEK)) {
 413                call->rx_pkt_offset = rx_pkt_offset;
 414                call->rx_pkt_len = rx_pkt_len;
 415        }
 416done:
 417        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_data_return, seq,
 418                            rx_pkt_offset, rx_pkt_len, ret);
 419        if (ret == -EAGAIN)
 420                set_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags);
 421        return ret;
 422}
 423
 424/*
 425 * Receive a message from an RxRPC socket
 426 * - we need to be careful about two or more threads calling recvmsg
 427 *   simultaneously
 428 */
 429int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 430                  int flags)
 431{
 432        struct rxrpc_call *call;
 433        struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
 434        struct list_head *l;
 435        size_t copied = 0;
 436        long timeo;
 437        int ret;
 438
 439        DEFINE_WAIT(wait);
 440
 441        trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0, 0, 0, 0);
 442
 443        if (flags & (MSG_OOB | MSG_TRUNC))
 444                return -EOPNOTSUPP;
 445
 446        timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
 447
 448try_again:
 449        lock_sock(&rx->sk);
 450
 451        /* Return immediately if a client socket has no outstanding calls */
 452        if (RB_EMPTY_ROOT(&rx->calls) &&
 453            list_empty(&rx->recvmsg_q) &&
 454            rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
 455                release_sock(&rx->sk);
 456                return -ENODATA;
 457        }
 458
 459        if (list_empty(&rx->recvmsg_q)) {
 460                ret = -EWOULDBLOCK;
 461                if (timeo == 0) {
 462                        call = NULL;
 463                        goto error_no_call;
 464                }
 465
 466                release_sock(&rx->sk);
 467
 468                /* Wait for something to happen */
 469                prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
 470                                          TASK_INTERRUPTIBLE);
 471                ret = sock_error(&rx->sk);
 472                if (ret)
 473                        goto wait_error;
 474
 475                if (list_empty(&rx->recvmsg_q)) {
 476                        if (signal_pending(current))
 477                                goto wait_interrupted;
 478                        trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait,
 479                                            0, 0, 0, 0);
 480                        timeo = schedule_timeout(timeo);
 481                }
 482                finish_wait(sk_sleep(&rx->sk), &wait);
 483                goto try_again;
 484        }
 485
 486        /* Find the next call and dequeue it if we're not just peeking.  If we
 487         * do dequeue it, that comes with a ref that we will need to release.
 488         */
 489        write_lock_bh(&rx->recvmsg_lock);
 490        l = rx->recvmsg_q.next;
 491        call = list_entry(l, struct rxrpc_call, recvmsg_link);
 492        if (!(flags & MSG_PEEK))
 493                list_del_init(&call->recvmsg_link);
 494        else
 495                rxrpc_get_call(call, rxrpc_call_got);
 496        write_unlock_bh(&rx->recvmsg_lock);
 497
 498        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0, 0, 0, 0);
 499
 500        /* We're going to drop the socket lock, so we need to lock the call
 501         * against interference by sendmsg.
 502         */
 503        if (!mutex_trylock(&call->user_mutex)) {
 504                ret = -EWOULDBLOCK;
 505                if (flags & MSG_DONTWAIT)
 506                        goto error_requeue_call;
 507                ret = -ERESTARTSYS;
 508                if (mutex_lock_interruptible(&call->user_mutex) < 0)
 509                        goto error_requeue_call;
 510        }
 511
 512        release_sock(&rx->sk);
 513
 514        if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
 515                BUG();
 516
 517        if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
 518                if (flags & MSG_CMSG_COMPAT) {
 519                        unsigned int id32 = call->user_call_ID;
 520
 521                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 522                                       sizeof(unsigned int), &id32);
 523                } else {
 524                        unsigned long idl = call->user_call_ID;
 525
 526                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 527                                       sizeof(unsigned long), &idl);
 528                }
 529                if (ret < 0)
 530                        goto error_unlock_call;
 531        }
 532
 533        if (msg->msg_name) {
 534                struct sockaddr_rxrpc *srx = msg->msg_name;
 535                size_t len = sizeof(call->peer->srx);
 536
 537                memcpy(msg->msg_name, &call->peer->srx, len);
 538                srx->srx_service = call->service_id;
 539                msg->msg_namelen = len;
 540        }
 541
 542        switch (READ_ONCE(call->state)) {
 543        case RXRPC_CALL_SERVER_ACCEPTING:
 544                ret = rxrpc_recvmsg_new_call(rx, call, msg, flags);
 545                break;
 546        case RXRPC_CALL_CLIENT_RECV_REPLY:
 547        case RXRPC_CALL_SERVER_RECV_REQUEST:
 548        case RXRPC_CALL_SERVER_ACK_REQUEST:
 549                ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
 550                                         flags, &copied);
 551                if (ret == -EAGAIN)
 552                        ret = 0;
 553
 554                if (after(call->rx_top, call->rx_hard_ack) &&
 555                    call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
 556                        rxrpc_notify_socket(call);
 557                break;
 558        default:
 559                ret = 0;
 560                break;
 561        }
 562
 563        if (ret < 0)
 564                goto error_unlock_call;
 565
 566        if (call->state == RXRPC_CALL_COMPLETE) {
 567                ret = rxrpc_recvmsg_term(call, msg);
 568                if (ret < 0)
 569                        goto error_unlock_call;
 570                if (!(flags & MSG_PEEK))
 571                        rxrpc_release_call(rx, call);
 572                msg->msg_flags |= MSG_EOR;
 573                ret = 1;
 574        }
 575
 576        if (ret == 0)
 577                msg->msg_flags |= MSG_MORE;
 578        else
 579                msg->msg_flags &= ~MSG_MORE;
 580        ret = copied;
 581
 582error_unlock_call:
 583        mutex_unlock(&call->user_mutex);
 584        rxrpc_put_call(call, rxrpc_call_put);
 585        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
 586        return ret;
 587
 588error_requeue_call:
 589        if (!(flags & MSG_PEEK)) {
 590                write_lock_bh(&rx->recvmsg_lock);
 591                list_add(&call->recvmsg_link, &rx->recvmsg_q);
 592                write_unlock_bh(&rx->recvmsg_lock);
 593                trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0, 0, 0, 0);
 594        } else {
 595                rxrpc_put_call(call, rxrpc_call_put);
 596        }
 597error_no_call:
 598        release_sock(&rx->sk);
 599        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
 600        return ret;
 601
 602wait_interrupted:
 603        ret = sock_intr_errno(timeo);
 604wait_error:
 605        finish_wait(sk_sleep(&rx->sk), &wait);
 606        call = NULL;
 607        goto error_no_call;
 608}
 609
 610/**
 611 * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
 612 * @sock: The socket that the call exists on
 613 * @call: The call to send data through
 614 * @iter: The buffer to receive into
 615 * @want_more: True if more data is expected to be read
 616 * @_abort: Where the abort code is stored if -ECONNABORTED is returned
 617 * @_service: Where to store the actual service ID (may be upgraded)
 618 *
 619 * Allow a kernel service to receive data and pick up information about the
 620 * state of a call.  Returns 0 if got what was asked for and there's more
 621 * available, 1 if we got what was asked for and we're at the end of the data
 622 * and -EAGAIN if we need more data.
 623 *
 624 * Note that we may return -EAGAIN to drain empty packets at the end of the
 625 * data, even if we've already copied over the requested data.
 626 *
 627 * *_abort should also be initialised to 0.
 628 */
 629int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
 630                           struct iov_iter *iter,
 631                           bool want_more, u32 *_abort, u16 *_service)
 632{
 633        size_t offset = 0;
 634        int ret;
 635
 636        _enter("{%d,%s},%zu,%d",
 637               call->debug_id, rxrpc_call_states[call->state],
 638               iov_iter_count(iter), want_more);
 639
 640        ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING);
 641
 642        mutex_lock(&call->user_mutex);
 643
 644        switch (READ_ONCE(call->state)) {
 645        case RXRPC_CALL_CLIENT_RECV_REPLY:
 646        case RXRPC_CALL_SERVER_RECV_REQUEST:
 647        case RXRPC_CALL_SERVER_ACK_REQUEST:
 648                ret = rxrpc_recvmsg_data(sock, call, NULL, iter,
 649                                         iov_iter_count(iter), 0,
 650                                         &offset);
 651                if (ret < 0)
 652                        goto out;
 653
 654                /* We can only reach here with a partially full buffer if we
 655                 * have reached the end of the data.  We must otherwise have a
 656                 * full buffer or have been given -EAGAIN.
 657                 */
 658                if (ret == 1) {
 659                        if (iov_iter_count(iter) > 0)
 660                                goto short_data;
 661                        if (!want_more)
 662                                goto read_phase_complete;
 663                        ret = 0;
 664                        goto out;
 665                }
 666
 667                if (!want_more)
 668                        goto excess_data;
 669                goto out;
 670
 671        case RXRPC_CALL_COMPLETE:
 672                goto call_complete;
 673
 674        default:
 675                ret = -EINPROGRESS;
 676                goto out;
 677        }
 678
 679read_phase_complete:
 680        ret = 1;
 681out:
 682        switch (call->ackr_reason) {
 683        case RXRPC_ACK_IDLE:
 684                break;
 685        case RXRPC_ACK_DELAY:
 686                if (ret != -EAGAIN)
 687                        break;
 688                /* Fall through */
 689        default:
 690                rxrpc_send_ack_packet(call, false, NULL);
 691        }
 692
 693        if (_service)
 694                *_service = call->service_id;
 695        mutex_unlock(&call->user_mutex);
 696        _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
 697        return ret;
 698
 699short_data:
 700        trace_rxrpc_rx_eproto(call, 0, tracepoint_string("short_data"));
 701        ret = -EBADMSG;
 702        goto out;
 703excess_data:
 704        trace_rxrpc_rx_eproto(call, 0, tracepoint_string("excess_data"));
 705        ret = -EMSGSIZE;
 706        goto out;
 707call_complete:
 708        *_abort = call->abort_code;
 709        ret = call->error;
 710        if (call->completion == RXRPC_CALL_SUCCEEDED) {
 711                ret = 1;
 712                if (iov_iter_count(iter) > 0)
 713                        ret = -ECONNRESET;
 714        }
 715        goto out;
 716}
 717EXPORT_SYMBOL(rxrpc_kernel_recv_data);
 718