linux/net/rxrpc/recvmsg.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/* RxRPC recvmsg() implementation
   3 *
   4 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   5 * Written by David Howells (dhowells@redhat.com)
   6 */
   7
   8#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
   9
  10#include <linux/net.h>
  11#include <linux/skbuff.h>
  12#include <linux/export.h>
  13#include <linux/sched/signal.h>
  14
  15#include <net/sock.h>
  16#include <net/af_rxrpc.h>
  17#include "ar-internal.h"
  18
  19/*
  20 * Post a call for attention by the socket or kernel service.  Further
  21 * notifications are suppressed by putting recvmsg_link on a dummy queue.
  22 */
  23void rxrpc_notify_socket(struct rxrpc_call *call)
  24{
  25        struct rxrpc_sock *rx;
  26        struct sock *sk;
  27
  28        _enter("%d", call->debug_id);
  29
  30        if (!list_empty(&call->recvmsg_link))
  31                return;
  32
  33        rcu_read_lock();
  34
  35        rx = rcu_dereference(call->socket);
  36        sk = &rx->sk;
  37        if (rx && sk->sk_state < RXRPC_CLOSE) {
  38                if (call->notify_rx) {
  39                        spin_lock_bh(&call->notify_lock);
  40                        call->notify_rx(sk, call, call->user_call_ID);
  41                        spin_unlock_bh(&call->notify_lock);
  42                } else {
  43                        write_lock_bh(&rx->recvmsg_lock);
  44                        if (list_empty(&call->recvmsg_link)) {
  45                                rxrpc_get_call(call, rxrpc_call_got);
  46                                list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
  47                        }
  48                        write_unlock_bh(&rx->recvmsg_lock);
  49
  50                        if (!sock_flag(sk, SOCK_DEAD)) {
  51                                _debug("call %ps", sk->sk_data_ready);
  52                                sk->sk_data_ready(sk);
  53                        }
  54                }
  55        }
  56
  57        rcu_read_unlock();
  58        _leave("");
  59}
  60
  61/*
  62 * Pass a call terminating message to userspace.
  63 */
  64static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
  65{
  66        u32 tmp = 0;
  67        int ret;
  68
  69        switch (call->completion) {
  70        case RXRPC_CALL_SUCCEEDED:
  71                ret = 0;
  72                if (rxrpc_is_service_call(call))
  73                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
  74                break;
  75        case RXRPC_CALL_REMOTELY_ABORTED:
  76                tmp = call->abort_code;
  77                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
  78                break;
  79        case RXRPC_CALL_LOCALLY_ABORTED:
  80                tmp = call->abort_code;
  81                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
  82                break;
  83        case RXRPC_CALL_NETWORK_ERROR:
  84                tmp = -call->error;
  85                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
  86                break;
  87        case RXRPC_CALL_LOCAL_ERROR:
  88                tmp = -call->error;
  89                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
  90                break;
  91        default:
  92                pr_err("Invalid terminal call state %u\n", call->state);
  93                BUG();
  94                break;
  95        }
  96
  97        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_terminal, call->rx_hard_ack,
  98                            call->rx_pkt_offset, call->rx_pkt_len, ret);
  99        return ret;
 100}
 101
 102/*
 103 * Pass back notification of a new call.  The call is added to the
 104 * to-be-accepted list.  This means that the next call to be accepted might not
 105 * be the last call seen awaiting acceptance, but unless we leave this on the
 106 * front of the queue and block all other messages until someone gives us a
 107 * user_ID for it, there's not a lot we can do.
 108 */
 109static int rxrpc_recvmsg_new_call(struct rxrpc_sock *rx,
 110                                  struct rxrpc_call *call,
 111                                  struct msghdr *msg, int flags)
 112{
 113        int tmp = 0, ret;
 114
 115        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &tmp);
 116
 117        if (ret == 0 && !(flags & MSG_PEEK)) {
 118                _debug("to be accepted");
 119                write_lock_bh(&rx->recvmsg_lock);
 120                list_del_init(&call->recvmsg_link);
 121                write_unlock_bh(&rx->recvmsg_lock);
 122
 123                rxrpc_get_call(call, rxrpc_call_got);
 124                write_lock(&rx->call_lock);
 125                list_add_tail(&call->accept_link, &rx->to_be_accepted);
 126                write_unlock(&rx->call_lock);
 127        }
 128
 129        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_to_be_accepted, 1, 0, 0, ret);
 130        return ret;
 131}
 132
 133/*
 134 * End the packet reception phase.
 135 */
 136static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
 137{
 138        _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
 139
 140        trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
 141        ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
 142
 143        if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
 144                rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, serial, false, true,
 145                                  rxrpc_propose_ack_terminal_ack);
 146                //rxrpc_send_ack_packet(call, false, NULL);
 147        }
 148
 149        write_lock_bh(&call->state_lock);
 150
 151        switch (call->state) {
 152        case RXRPC_CALL_CLIENT_RECV_REPLY:
 153                __rxrpc_call_completed(call);
 154                write_unlock_bh(&call->state_lock);
 155                break;
 156
 157        case RXRPC_CALL_SERVER_RECV_REQUEST:
 158                call->tx_phase = true;
 159                call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
 160                call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
 161                write_unlock_bh(&call->state_lock);
 162                rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, 0, serial, false, true,
 163                                  rxrpc_propose_ack_processing_op);
 164                break;
 165        default:
 166                write_unlock_bh(&call->state_lock);
 167                break;
 168        }
 169}
 170
 171/*
 172 * Discard a packet we've used up and advance the Rx window by one.
 173 */
 174static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
 175{
 176        struct rxrpc_skb_priv *sp;
 177        struct sk_buff *skb;
 178        rxrpc_serial_t serial;
 179        rxrpc_seq_t hard_ack, top;
 180        u8 flags;
 181        int ix;
 182
 183        _enter("%d", call->debug_id);
 184
 185        hard_ack = call->rx_hard_ack;
 186        top = smp_load_acquire(&call->rx_top);
 187        ASSERT(before(hard_ack, top));
 188
 189        hard_ack++;
 190        ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
 191        skb = call->rxtx_buffer[ix];
 192        rxrpc_see_skb(skb, rxrpc_skb_rx_rotated);
 193        sp = rxrpc_skb(skb);
 194        flags = sp->hdr.flags;
 195        serial = sp->hdr.serial;
 196        if (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO)
 197                serial += (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO) - 1;
 198
 199        call->rxtx_buffer[ix] = NULL;
 200        call->rxtx_annotations[ix] = 0;
 201        /* Barrier against rxrpc_input_data(). */
 202        smp_store_release(&call->rx_hard_ack, hard_ack);
 203
 204        rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 205
 206        _debug("%u,%u,%02x", hard_ack, top, flags);
 207        trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
 208        if (flags & RXRPC_LAST_PACKET) {
 209                rxrpc_end_rx_phase(call, serial);
 210        } else {
 211                /* Check to see if there's an ACK that needs sending. */
 212                if (after_eq(hard_ack, call->ackr_consumed + 2) ||
 213                    after_eq(top, call->ackr_seen + 2) ||
 214                    (hard_ack == top && after(hard_ack, call->ackr_consumed)))
 215                        rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, 0, serial,
 216                                          true, true,
 217                                          rxrpc_propose_ack_rotate_rx);
 218                if (call->ackr_reason && call->ackr_reason != RXRPC_ACK_DELAY)
 219                        rxrpc_send_ack_packet(call, false, NULL);
 220        }
 221}
 222
 223/*
 224 * Decrypt and verify a (sub)packet.  The packet's length may be changed due to
 225 * padding, but if this is the case, the packet length will be resident in the
 226 * socket buffer.  Note that we can't modify the master skb info as the skb may
 227 * be the home to multiple subpackets.
 228 */
 229static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb,
 230                               u8 annotation,
 231                               unsigned int offset, unsigned int len)
 232{
 233        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 234        rxrpc_seq_t seq = sp->hdr.seq;
 235        u16 cksum = sp->hdr.cksum;
 236
 237        _enter("");
 238
 239        /* For all but the head jumbo subpacket, the security checksum is in a
 240         * jumbo header immediately prior to the data.
 241         */
 242        if ((annotation & RXRPC_RX_ANNO_JUMBO) > 1) {
 243                __be16 tmp;
 244                if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0)
 245                        BUG();
 246                cksum = ntohs(tmp);
 247                seq += (annotation & RXRPC_RX_ANNO_JUMBO) - 1;
 248        }
 249
 250        return call->conn->security->verify_packet(call, skb, offset, len,
 251                                                   seq, cksum);
 252}
 253
 254/*
 255 * Locate the data within a packet.  This is complicated by:
 256 *
 257 * (1) An skb may contain a jumbo packet - so we have to find the appropriate
 258 *     subpacket.
 259 *
 260 * (2) The (sub)packets may be encrypted and, if so, the encrypted portion
 261 *     contains an extra header which includes the true length of the data,
 262 *     excluding any encrypted padding.
 263 */
 264static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
 265                             u8 *_annotation,
 266                             unsigned int *_offset, unsigned int *_len)
 267{
 268        unsigned int offset = sizeof(struct rxrpc_wire_header);
 269        unsigned int len;
 270        int ret;
 271        u8 annotation = *_annotation;
 272
 273        /* Locate the subpacket */
 274        len = skb->len - offset;
 275        if ((annotation & RXRPC_RX_ANNO_JUMBO) > 0) {
 276                offset += (((annotation & RXRPC_RX_ANNO_JUMBO) - 1) *
 277                           RXRPC_JUMBO_SUBPKTLEN);
 278                len = (annotation & RXRPC_RX_ANNO_JLAST) ?
 279                        skb->len - offset : RXRPC_JUMBO_SUBPKTLEN;
 280        }
 281
 282        if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) {
 283                ret = rxrpc_verify_packet(call, skb, annotation, offset, len);
 284                if (ret < 0)
 285                        return ret;
 286                *_annotation |= RXRPC_RX_ANNO_VERIFIED;
 287        }
 288
 289        *_offset = offset;
 290        *_len = len;
 291        call->conn->security->locate_data(call, skb, _offset, _len);
 292        return 0;
 293}
 294
 295/*
 296 * Deliver messages to a call.  This keeps processing packets until the buffer
 297 * is filled and we find either more DATA (returns 0) or the end of the DATA
 298 * (returns 1).  If more packets are required, it returns -EAGAIN.
 299 */
 300static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
 301                              struct msghdr *msg, struct iov_iter *iter,
 302                              size_t len, int flags, size_t *_offset)
 303{
 304        struct rxrpc_skb_priv *sp;
 305        struct sk_buff *skb;
 306        rxrpc_seq_t hard_ack, top, seq;
 307        size_t remain;
 308        bool last;
 309        unsigned int rx_pkt_offset, rx_pkt_len;
 310        int ix, copy, ret = -EAGAIN, ret2;
 311
 312        if (test_and_clear_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags) &&
 313            call->ackr_reason)
 314                rxrpc_send_ack_packet(call, false, NULL);
 315
 316        rx_pkt_offset = call->rx_pkt_offset;
 317        rx_pkt_len = call->rx_pkt_len;
 318
 319        if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
 320                seq = call->rx_hard_ack;
 321                ret = 1;
 322                goto done;
 323        }
 324
 325        /* Barriers against rxrpc_input_data(). */
 326        hard_ack = call->rx_hard_ack;
 327        seq = hard_ack + 1;
 328        while (top = smp_load_acquire(&call->rx_top),
 329               before_eq(seq, top)
 330               ) {
 331                ix = seq & RXRPC_RXTX_BUFF_MASK;
 332                skb = call->rxtx_buffer[ix];
 333                if (!skb) {
 334                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_hole, seq,
 335                                            rx_pkt_offset, rx_pkt_len, 0);
 336                        break;
 337                }
 338                smp_rmb();
 339                rxrpc_see_skb(skb, rxrpc_skb_rx_seen);
 340                sp = rxrpc_skb(skb);
 341
 342                if (!(flags & MSG_PEEK))
 343                        trace_rxrpc_receive(call, rxrpc_receive_front,
 344                                            sp->hdr.serial, seq);
 345
 346                if (msg)
 347                        sock_recv_timestamp(msg, sock->sk, skb);
 348
 349                if (rx_pkt_offset == 0) {
 350                        ret2 = rxrpc_locate_data(call, skb,
 351                                                 &call->rxtx_annotations[ix],
 352                                                 &rx_pkt_offset, &rx_pkt_len);
 353                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_next, seq,
 354                                            rx_pkt_offset, rx_pkt_len, ret2);
 355                        if (ret2 < 0) {
 356                                ret = ret2;
 357                                goto out;
 358                        }
 359                } else {
 360                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_cont, seq,
 361                                            rx_pkt_offset, rx_pkt_len, 0);
 362                }
 363
 364                /* We have to handle short, empty and used-up DATA packets. */
 365                remain = len - *_offset;
 366                copy = rx_pkt_len;
 367                if (copy > remain)
 368                        copy = remain;
 369                if (copy > 0) {
 370                        ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
 371                                                      copy);
 372                        if (ret2 < 0) {
 373                                ret = ret2;
 374                                goto out;
 375                        }
 376
 377                        /* handle piecemeal consumption of data packets */
 378                        rx_pkt_offset += copy;
 379                        rx_pkt_len -= copy;
 380                        *_offset += copy;
 381                }
 382
 383                if (rx_pkt_len > 0) {
 384                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_full, seq,
 385                                            rx_pkt_offset, rx_pkt_len, 0);
 386                        ASSERTCMP(*_offset, ==, len);
 387                        ret = 0;
 388                        break;
 389                }
 390
 391                /* The whole packet has been transferred. */
 392                last = sp->hdr.flags & RXRPC_LAST_PACKET;
 393                if (!(flags & MSG_PEEK))
 394                        rxrpc_rotate_rx_window(call);
 395                rx_pkt_offset = 0;
 396                rx_pkt_len = 0;
 397
 398                if (last) {
 399                        ASSERTCMP(seq, ==, READ_ONCE(call->rx_top));
 400                        ret = 1;
 401                        goto out;
 402                }
 403
 404                seq++;
 405        }
 406
 407out:
 408        if (!(flags & MSG_PEEK)) {
 409                call->rx_pkt_offset = rx_pkt_offset;
 410                call->rx_pkt_len = rx_pkt_len;
 411        }
 412done:
 413        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_data_return, seq,
 414                            rx_pkt_offset, rx_pkt_len, ret);
 415        if (ret == -EAGAIN)
 416                set_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags);
 417        return ret;
 418}
 419
 420/*
 421 * Receive a message from an RxRPC socket
 422 * - we need to be careful about two or more threads calling recvmsg
 423 *   simultaneously
 424 */
 425int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 426                  int flags)
 427{
 428        struct rxrpc_call *call;
 429        struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
 430        struct list_head *l;
 431        size_t copied = 0;
 432        long timeo;
 433        int ret;
 434
 435        DEFINE_WAIT(wait);
 436
 437        trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0, 0, 0, 0);
 438
 439        if (flags & (MSG_OOB | MSG_TRUNC))
 440                return -EOPNOTSUPP;
 441
 442        timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
 443
 444try_again:
 445        lock_sock(&rx->sk);
 446
 447        /* Return immediately if a client socket has no outstanding calls */
 448        if (RB_EMPTY_ROOT(&rx->calls) &&
 449            list_empty(&rx->recvmsg_q) &&
 450            rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
 451                release_sock(&rx->sk);
 452                return -ENODATA;
 453        }
 454
 455        if (list_empty(&rx->recvmsg_q)) {
 456                ret = -EWOULDBLOCK;
 457                if (timeo == 0) {
 458                        call = NULL;
 459                        goto error_no_call;
 460                }
 461
 462                release_sock(&rx->sk);
 463
 464                /* Wait for something to happen */
 465                prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
 466                                          TASK_INTERRUPTIBLE);
 467                ret = sock_error(&rx->sk);
 468                if (ret)
 469                        goto wait_error;
 470
 471                if (list_empty(&rx->recvmsg_q)) {
 472                        if (signal_pending(current))
 473                                goto wait_interrupted;
 474                        trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait,
 475                                            0, 0, 0, 0);
 476                        timeo = schedule_timeout(timeo);
 477                }
 478                finish_wait(sk_sleep(&rx->sk), &wait);
 479                goto try_again;
 480        }
 481
 482        /* Find the next call and dequeue it if we're not just peeking.  If we
 483         * do dequeue it, that comes with a ref that we will need to release.
 484         */
 485        write_lock_bh(&rx->recvmsg_lock);
 486        l = rx->recvmsg_q.next;
 487        call = list_entry(l, struct rxrpc_call, recvmsg_link);
 488        if (!(flags & MSG_PEEK))
 489                list_del_init(&call->recvmsg_link);
 490        else
 491                rxrpc_get_call(call, rxrpc_call_got);
 492        write_unlock_bh(&rx->recvmsg_lock);
 493
 494        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0, 0, 0, 0);
 495
 496        /* We're going to drop the socket lock, so we need to lock the call
 497         * against interference by sendmsg.
 498         */
 499        if (!mutex_trylock(&call->user_mutex)) {
 500                ret = -EWOULDBLOCK;
 501                if (flags & MSG_DONTWAIT)
 502                        goto error_requeue_call;
 503                ret = -ERESTARTSYS;
 504                if (mutex_lock_interruptible(&call->user_mutex) < 0)
 505                        goto error_requeue_call;
 506        }
 507
 508        release_sock(&rx->sk);
 509
 510        if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
 511                BUG();
 512
 513        if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
 514                if (flags & MSG_CMSG_COMPAT) {
 515                        unsigned int id32 = call->user_call_ID;
 516
 517                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 518                                       sizeof(unsigned int), &id32);
 519                } else {
 520                        unsigned long idl = call->user_call_ID;
 521
 522                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 523                                       sizeof(unsigned long), &idl);
 524                }
 525                if (ret < 0)
 526                        goto error_unlock_call;
 527        }
 528
 529        if (msg->msg_name) {
 530                struct sockaddr_rxrpc *srx = msg->msg_name;
 531                size_t len = sizeof(call->peer->srx);
 532
 533                memcpy(msg->msg_name, &call->peer->srx, len);
 534                srx->srx_service = call->service_id;
 535                msg->msg_namelen = len;
 536        }
 537
 538        switch (READ_ONCE(call->state)) {
 539        case RXRPC_CALL_SERVER_ACCEPTING:
 540                ret = rxrpc_recvmsg_new_call(rx, call, msg, flags);
 541                break;
 542        case RXRPC_CALL_CLIENT_RECV_REPLY:
 543        case RXRPC_CALL_SERVER_RECV_REQUEST:
 544        case RXRPC_CALL_SERVER_ACK_REQUEST:
 545                ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
 546                                         flags, &copied);
 547                if (ret == -EAGAIN)
 548                        ret = 0;
 549
 550                if (after(call->rx_top, call->rx_hard_ack) &&
 551                    call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
 552                        rxrpc_notify_socket(call);
 553                break;
 554        default:
 555                ret = 0;
 556                break;
 557        }
 558
 559        if (ret < 0)
 560                goto error_unlock_call;
 561
 562        if (call->state == RXRPC_CALL_COMPLETE) {
 563                ret = rxrpc_recvmsg_term(call, msg);
 564                if (ret < 0)
 565                        goto error_unlock_call;
 566                if (!(flags & MSG_PEEK))
 567                        rxrpc_release_call(rx, call);
 568                msg->msg_flags |= MSG_EOR;
 569                ret = 1;
 570        }
 571
 572        if (ret == 0)
 573                msg->msg_flags |= MSG_MORE;
 574        else
 575                msg->msg_flags &= ~MSG_MORE;
 576        ret = copied;
 577
 578error_unlock_call:
 579        mutex_unlock(&call->user_mutex);
 580        rxrpc_put_call(call, rxrpc_call_put);
 581        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
 582        return ret;
 583
 584error_requeue_call:
 585        if (!(flags & MSG_PEEK)) {
 586                write_lock_bh(&rx->recvmsg_lock);
 587                list_add(&call->recvmsg_link, &rx->recvmsg_q);
 588                write_unlock_bh(&rx->recvmsg_lock);
 589                trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0, 0, 0, 0);
 590        } else {
 591                rxrpc_put_call(call, rxrpc_call_put);
 592        }
 593error_no_call:
 594        release_sock(&rx->sk);
 595error_trace:
 596        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
 597        return ret;
 598
 599wait_interrupted:
 600        ret = sock_intr_errno(timeo);
 601wait_error:
 602        finish_wait(sk_sleep(&rx->sk), &wait);
 603        call = NULL;
 604        goto error_trace;
 605}
 606
 607/**
 608 * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
 609 * @sock: The socket that the call exists on
 610 * @call: The call to send data through
 611 * @iter: The buffer to receive into
 612 * @want_more: True if more data is expected to be read
 613 * @_abort: Where the abort code is stored if -ECONNABORTED is returned
 614 * @_service: Where to store the actual service ID (may be upgraded)
 615 *
 616 * Allow a kernel service to receive data and pick up information about the
 617 * state of a call.  Returns 0 if got what was asked for and there's more
 618 * available, 1 if we got what was asked for and we're at the end of the data
 619 * and -EAGAIN if we need more data.
 620 *
 621 * Note that we may return -EAGAIN to drain empty packets at the end of the
 622 * data, even if we've already copied over the requested data.
 623 *
 624 * *_abort should also be initialised to 0.
 625 */
 626int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
 627                           struct iov_iter *iter,
 628                           bool want_more, u32 *_abort, u16 *_service)
 629{
 630        size_t offset = 0;
 631        int ret;
 632
 633        _enter("{%d,%s},%zu,%d",
 634               call->debug_id, rxrpc_call_states[call->state],
 635               iov_iter_count(iter), want_more);
 636
 637        ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING);
 638
 639        mutex_lock(&call->user_mutex);
 640
 641        switch (READ_ONCE(call->state)) {
 642        case RXRPC_CALL_CLIENT_RECV_REPLY:
 643        case RXRPC_CALL_SERVER_RECV_REQUEST:
 644        case RXRPC_CALL_SERVER_ACK_REQUEST:
 645                ret = rxrpc_recvmsg_data(sock, call, NULL, iter,
 646                                         iov_iter_count(iter), 0,
 647                                         &offset);
 648                if (ret < 0)
 649                        goto out;
 650
 651                /* We can only reach here with a partially full buffer if we
 652                 * have reached the end of the data.  We must otherwise have a
 653                 * full buffer or have been given -EAGAIN.
 654                 */
 655                if (ret == 1) {
 656                        if (iov_iter_count(iter) > 0)
 657                                goto short_data;
 658                        if (!want_more)
 659                                goto read_phase_complete;
 660                        ret = 0;
 661                        goto out;
 662                }
 663
 664                if (!want_more)
 665                        goto excess_data;
 666                goto out;
 667
 668        case RXRPC_CALL_COMPLETE:
 669                goto call_complete;
 670
 671        default:
 672                ret = -EINPROGRESS;
 673                goto out;
 674        }
 675
 676read_phase_complete:
 677        ret = 1;
 678out:
 679        switch (call->ackr_reason) {
 680        case RXRPC_ACK_IDLE:
 681                break;
 682        case RXRPC_ACK_DELAY:
 683                if (ret != -EAGAIN)
 684                        break;
 685                /* Fall through */
 686        default:
 687                rxrpc_send_ack_packet(call, false, NULL);
 688        }
 689
 690        if (_service)
 691                *_service = call->service_id;
 692        mutex_unlock(&call->user_mutex);
 693        _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
 694        return ret;
 695
 696short_data:
 697        trace_rxrpc_rx_eproto(call, 0, tracepoint_string("short_data"));
 698        ret = -EBADMSG;
 699        goto out;
 700excess_data:
 701        trace_rxrpc_rx_eproto(call, 0, tracepoint_string("excess_data"));
 702        ret = -EMSGSIZE;
 703        goto out;
 704call_complete:
 705        *_abort = call->abort_code;
 706        ret = call->error;
 707        if (call->completion == RXRPC_CALL_SUCCEEDED) {
 708                ret = 1;
 709                if (iov_iter_count(iter) > 0)
 710                        ret = -ECONNRESET;
 711        }
 712        goto out;
 713}
 714EXPORT_SYMBOL(rxrpc_kernel_recv_data);
 715
 716/**
 717 * rxrpc_kernel_get_reply_time - Get timestamp on first reply packet
 718 * @sock: The socket that the call exists on
 719 * @call: The call to query
 720 * @_ts: Where to put the timestamp
 721 *
 722 * Retrieve the timestamp from the first DATA packet of the reply if it is
 723 * in the ring.  Returns true if successful, false if not.
 724 */
 725bool rxrpc_kernel_get_reply_time(struct socket *sock, struct rxrpc_call *call,
 726                                 ktime_t *_ts)
 727{
 728        struct sk_buff *skb;
 729        rxrpc_seq_t hard_ack, top, seq;
 730        bool success = false;
 731
 732        mutex_lock(&call->user_mutex);
 733
 734        if (READ_ONCE(call->state) != RXRPC_CALL_CLIENT_RECV_REPLY)
 735                goto out;
 736
 737        hard_ack = call->rx_hard_ack;
 738        if (hard_ack != 0)
 739                goto out;
 740
 741        seq = hard_ack + 1;
 742        top = smp_load_acquire(&call->rx_top);
 743        if (after(seq, top))
 744                goto out;
 745
 746        skb = call->rxtx_buffer[seq & RXRPC_RXTX_BUFF_MASK];
 747        if (!skb)
 748                goto out;
 749
 750        *_ts = skb_get_ktime(skb);
 751        success = true;
 752
 753out:
 754        mutex_unlock(&call->user_mutex);
 755        return success;
 756}
 757EXPORT_SYMBOL(rxrpc_kernel_get_reply_time);
 758