linux/net/rxrpc/recvmsg.c
<<
>>
Prefs
   1/* RxRPC recvmsg() implementation
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  13
  14#include <linux/net.h>
  15#include <linux/skbuff.h>
  16#include <linux/export.h>
  17#include <linux/sched/signal.h>
  18
  19#include <net/sock.h>
  20#include <net/af_rxrpc.h>
  21#include "ar-internal.h"
  22
  23/*
  24 * Post a call for attention by the socket or kernel service.  Further
  25 * notifications are suppressed by putting recvmsg_link on a dummy queue.
  26 */
  27void rxrpc_notify_socket(struct rxrpc_call *call)
  28{
  29        struct rxrpc_sock *rx;
  30        struct sock *sk;
  31
  32        _enter("%d", call->debug_id);
  33
  34        if (!list_empty(&call->recvmsg_link))
  35                return;
  36
  37        rcu_read_lock();
  38
  39        rx = rcu_dereference(call->socket);
  40        sk = &rx->sk;
  41        if (rx && sk->sk_state < RXRPC_CLOSE) {
  42                if (call->notify_rx) {
  43                        spin_lock_bh(&call->notify_lock);
  44                        call->notify_rx(sk, call, call->user_call_ID);
  45                        spin_unlock_bh(&call->notify_lock);
  46                } else {
  47                        write_lock_bh(&rx->recvmsg_lock);
  48                        if (list_empty(&call->recvmsg_link)) {
  49                                rxrpc_get_call(call, rxrpc_call_got);
  50                                list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
  51                        }
  52                        write_unlock_bh(&rx->recvmsg_lock);
  53
  54                        if (!sock_flag(sk, SOCK_DEAD)) {
  55                                _debug("call %ps", sk->sk_data_ready);
  56                                sk->sk_data_ready(sk);
  57                        }
  58                }
  59        }
  60
  61        rcu_read_unlock();
  62        _leave("");
  63}
  64
  65/*
  66 * Pass a call terminating message to userspace.
  67 */
  68static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
  69{
  70        u32 tmp = 0;
  71        int ret;
  72
  73        switch (call->completion) {
  74        case RXRPC_CALL_SUCCEEDED:
  75                ret = 0;
  76                if (rxrpc_is_service_call(call))
  77                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
  78                break;
  79        case RXRPC_CALL_REMOTELY_ABORTED:
  80                tmp = call->abort_code;
  81                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
  82                break;
  83        case RXRPC_CALL_LOCALLY_ABORTED:
  84                tmp = call->abort_code;
  85                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
  86                break;
  87        case RXRPC_CALL_NETWORK_ERROR:
  88                tmp = -call->error;
  89                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
  90                break;
  91        case RXRPC_CALL_LOCAL_ERROR:
  92                tmp = -call->error;
  93                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
  94                break;
  95        default:
  96                pr_err("Invalid terminal call state %u\n", call->state);
  97                BUG();
  98                break;
  99        }
 100
 101        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_terminal, call->rx_hard_ack,
 102                            call->rx_pkt_offset, call->rx_pkt_len, ret);
 103        return ret;
 104}
 105
 106/*
 107 * Pass back notification of a new call.  The call is added to the
 108 * to-be-accepted list.  This means that the next call to be accepted might not
 109 * be the last call seen awaiting acceptance, but unless we leave this on the
 110 * front of the queue and block all other messages until someone gives us a
 111 * user_ID for it, there's not a lot we can do.
 112 */
 113static int rxrpc_recvmsg_new_call(struct rxrpc_sock *rx,
 114                                  struct rxrpc_call *call,
 115                                  struct msghdr *msg, int flags)
 116{
 117        int tmp = 0, ret;
 118
 119        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &tmp);
 120
 121        if (ret == 0 && !(flags & MSG_PEEK)) {
 122                _debug("to be accepted");
 123                write_lock_bh(&rx->recvmsg_lock);
 124                list_del_init(&call->recvmsg_link);
 125                write_unlock_bh(&rx->recvmsg_lock);
 126
 127                rxrpc_get_call(call, rxrpc_call_got);
 128                write_lock(&rx->call_lock);
 129                list_add_tail(&call->accept_link, &rx->to_be_accepted);
 130                write_unlock(&rx->call_lock);
 131        }
 132
 133        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_to_be_accepted, 1, 0, 0, ret);
 134        return ret;
 135}
 136
 137/*
 138 * End the packet reception phase.
 139 */
 140static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
 141{
 142        _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
 143
 144        trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
 145        ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
 146
 147        if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
 148                rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, serial, false, true,
 149                                  rxrpc_propose_ack_terminal_ack);
 150                //rxrpc_send_ack_packet(call, false, NULL);
 151        }
 152
 153        write_lock_bh(&call->state_lock);
 154
 155        switch (call->state) {
 156        case RXRPC_CALL_CLIENT_RECV_REPLY:
 157                __rxrpc_call_completed(call);
 158                write_unlock_bh(&call->state_lock);
 159                break;
 160
 161        case RXRPC_CALL_SERVER_RECV_REQUEST:
 162                call->tx_phase = true;
 163                call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
 164                call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
 165                write_unlock_bh(&call->state_lock);
 166                rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, 0, serial, false, true,
 167                                  rxrpc_propose_ack_processing_op);
 168                break;
 169        default:
 170                write_unlock_bh(&call->state_lock);
 171                break;
 172        }
 173}
 174
 175/*
 176 * Discard a packet we've used up and advance the Rx window by one.
 177 */
 178static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
 179{
 180        struct rxrpc_skb_priv *sp;
 181        struct sk_buff *skb;
 182        rxrpc_serial_t serial;
 183        rxrpc_seq_t hard_ack, top;
 184        u8 flags;
 185        int ix;
 186
 187        _enter("%d", call->debug_id);
 188
 189        hard_ack = call->rx_hard_ack;
 190        top = smp_load_acquire(&call->rx_top);
 191        ASSERT(before(hard_ack, top));
 192
 193        hard_ack++;
 194        ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
 195        skb = call->rxtx_buffer[ix];
 196        rxrpc_see_skb(skb, rxrpc_skb_rx_rotated);
 197        sp = rxrpc_skb(skb);
 198        flags = sp->hdr.flags;
 199        serial = sp->hdr.serial;
 200        if (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO)
 201                serial += (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO) - 1;
 202
 203        call->rxtx_buffer[ix] = NULL;
 204        call->rxtx_annotations[ix] = 0;
 205        /* Barrier against rxrpc_input_data(). */
 206        smp_store_release(&call->rx_hard_ack, hard_ack);
 207
 208        rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 209
 210        _debug("%u,%u,%02x", hard_ack, top, flags);
 211        trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
 212        if (flags & RXRPC_LAST_PACKET) {
 213                rxrpc_end_rx_phase(call, serial);
 214        } else {
 215                /* Check to see if there's an ACK that needs sending. */
 216                if (after_eq(hard_ack, call->ackr_consumed + 2) ||
 217                    after_eq(top, call->ackr_seen + 2) ||
 218                    (hard_ack == top && after(hard_ack, call->ackr_consumed)))
 219                        rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, 0, serial,
 220                                          true, true,
 221                                          rxrpc_propose_ack_rotate_rx);
 222                if (call->ackr_reason && call->ackr_reason != RXRPC_ACK_DELAY)
 223                        rxrpc_send_ack_packet(call, false, NULL);
 224        }
 225}
 226
 227/*
 228 * Decrypt and verify a (sub)packet.  The packet's length may be changed due to
 229 * padding, but if this is the case, the packet length will be resident in the
 230 * socket buffer.  Note that we can't modify the master skb info as the skb may
 231 * be the home to multiple subpackets.
 232 */
 233static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb,
 234                               u8 annotation,
 235                               unsigned int offset, unsigned int len)
 236{
 237        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 238        rxrpc_seq_t seq = sp->hdr.seq;
 239        u16 cksum = sp->hdr.cksum;
 240
 241        _enter("");
 242
 243        /* For all but the head jumbo subpacket, the security checksum is in a
 244         * jumbo header immediately prior to the data.
 245         */
 246        if ((annotation & RXRPC_RX_ANNO_JUMBO) > 1) {
 247                __be16 tmp;
 248                if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0)
 249                        BUG();
 250                cksum = ntohs(tmp);
 251                seq += (annotation & RXRPC_RX_ANNO_JUMBO) - 1;
 252        }
 253
 254        return call->conn->security->verify_packet(call, skb, offset, len,
 255                                                   seq, cksum);
 256}
 257
 258/*
 259 * Locate the data within a packet.  This is complicated by:
 260 *
 261 * (1) An skb may contain a jumbo packet - so we have to find the appropriate
 262 *     subpacket.
 263 *
 264 * (2) The (sub)packets may be encrypted and, if so, the encrypted portion
 265 *     contains an extra header which includes the true length of the data,
 266 *     excluding any encrypted padding.
 267 */
 268static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
 269                             u8 *_annotation,
 270                             unsigned int *_offset, unsigned int *_len)
 271{
 272        unsigned int offset = sizeof(struct rxrpc_wire_header);
 273        unsigned int len;
 274        int ret;
 275        u8 annotation = *_annotation;
 276
 277        /* Locate the subpacket */
 278        len = skb->len - offset;
 279        if ((annotation & RXRPC_RX_ANNO_JUMBO) > 0) {
 280                offset += (((annotation & RXRPC_RX_ANNO_JUMBO) - 1) *
 281                           RXRPC_JUMBO_SUBPKTLEN);
 282                len = (annotation & RXRPC_RX_ANNO_JLAST) ?
 283                        skb->len - offset : RXRPC_JUMBO_SUBPKTLEN;
 284        }
 285
 286        if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) {
 287                ret = rxrpc_verify_packet(call, skb, annotation, offset, len);
 288                if (ret < 0)
 289                        return ret;
 290                *_annotation |= RXRPC_RX_ANNO_VERIFIED;
 291        }
 292
 293        *_offset = offset;
 294        *_len = len;
 295        call->conn->security->locate_data(call, skb, _offset, _len);
 296        return 0;
 297}
 298
 299/*
 300 * Deliver messages to a call.  This keeps processing packets until the buffer
 301 * is filled and we find either more DATA (returns 0) or the end of the DATA
 302 * (returns 1).  If more packets are required, it returns -EAGAIN.
 303 */
 304static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
 305                              struct msghdr *msg, struct iov_iter *iter,
 306                              size_t len, int flags, size_t *_offset)
 307{
 308        struct rxrpc_skb_priv *sp;
 309        struct sk_buff *skb;
 310        rxrpc_seq_t hard_ack, top, seq;
 311        size_t remain;
 312        bool last;
 313        unsigned int rx_pkt_offset, rx_pkt_len;
 314        int ix, copy, ret = -EAGAIN, ret2;
 315
 316        if (test_and_clear_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags) &&
 317            call->ackr_reason)
 318                rxrpc_send_ack_packet(call, false, NULL);
 319
 320        rx_pkt_offset = call->rx_pkt_offset;
 321        rx_pkt_len = call->rx_pkt_len;
 322
 323        if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
 324                seq = call->rx_hard_ack;
 325                ret = 1;
 326                goto done;
 327        }
 328
 329        /* Barriers against rxrpc_input_data(). */
 330        hard_ack = call->rx_hard_ack;
 331        seq = hard_ack + 1;
 332        while (top = smp_load_acquire(&call->rx_top),
 333               before_eq(seq, top)
 334               ) {
 335                ix = seq & RXRPC_RXTX_BUFF_MASK;
 336                skb = call->rxtx_buffer[ix];
 337                if (!skb) {
 338                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_hole, seq,
 339                                            rx_pkt_offset, rx_pkt_len, 0);
 340                        break;
 341                }
 342                smp_rmb();
 343                rxrpc_see_skb(skb, rxrpc_skb_rx_seen);
 344                sp = rxrpc_skb(skb);
 345
 346                if (!(flags & MSG_PEEK))
 347                        trace_rxrpc_receive(call, rxrpc_receive_front,
 348                                            sp->hdr.serial, seq);
 349
 350                if (msg)
 351                        sock_recv_timestamp(msg, sock->sk, skb);
 352
 353                if (rx_pkt_offset == 0) {
 354                        ret2 = rxrpc_locate_data(call, skb,
 355                                                 &call->rxtx_annotations[ix],
 356                                                 &rx_pkt_offset, &rx_pkt_len);
 357                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_next, seq,
 358                                            rx_pkt_offset, rx_pkt_len, ret2);
 359                        if (ret2 < 0) {
 360                                ret = ret2;
 361                                goto out;
 362                        }
 363                } else {
 364                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_cont, seq,
 365                                            rx_pkt_offset, rx_pkt_len, 0);
 366                }
 367
 368                /* We have to handle short, empty and used-up DATA packets. */
 369                remain = len - *_offset;
 370                copy = rx_pkt_len;
 371                if (copy > remain)
 372                        copy = remain;
 373                if (copy > 0) {
 374                        ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
 375                                                      copy);
 376                        if (ret2 < 0) {
 377                                ret = ret2;
 378                                goto out;
 379                        }
 380
 381                        /* handle piecemeal consumption of data packets */
 382                        rx_pkt_offset += copy;
 383                        rx_pkt_len -= copy;
 384                        *_offset += copy;
 385                }
 386
 387                if (rx_pkt_len > 0) {
 388                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_full, seq,
 389                                            rx_pkt_offset, rx_pkt_len, 0);
 390                        ASSERTCMP(*_offset, ==, len);
 391                        ret = 0;
 392                        break;
 393                }
 394
 395                /* The whole packet has been transferred. */
 396                last = sp->hdr.flags & RXRPC_LAST_PACKET;
 397                if (!(flags & MSG_PEEK))
 398                        rxrpc_rotate_rx_window(call);
 399                rx_pkt_offset = 0;
 400                rx_pkt_len = 0;
 401
 402                if (last) {
 403                        ASSERTCMP(seq, ==, READ_ONCE(call->rx_top));
 404                        ret = 1;
 405                        goto out;
 406                }
 407
 408                seq++;
 409        }
 410
 411out:
 412        if (!(flags & MSG_PEEK)) {
 413                call->rx_pkt_offset = rx_pkt_offset;
 414                call->rx_pkt_len = rx_pkt_len;
 415        }
 416done:
 417        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_data_return, seq,
 418                            rx_pkt_offset, rx_pkt_len, ret);
 419        if (ret == -EAGAIN)
 420                set_bit(RXRPC_CALL_RX_UNDERRUN, &call->flags);
 421        return ret;
 422}
 423
 424/*
 425 * Receive a message from an RxRPC socket
 426 * - we need to be careful about two or more threads calling recvmsg
 427 *   simultaneously
 428 */
 429int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 430                  int flags)
 431{
 432        struct rxrpc_call *call;
 433        struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
 434        struct list_head *l;
 435        size_t copied = 0;
 436        long timeo;
 437        int ret;
 438
 439        DEFINE_WAIT(wait);
 440
 441        trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0, 0, 0, 0);
 442
 443        if (flags & (MSG_OOB | MSG_TRUNC))
 444                return -EOPNOTSUPP;
 445
 446        timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
 447
 448try_again:
 449        lock_sock(&rx->sk);
 450
 451        /* Return immediately if a client socket has no outstanding calls */
 452        if (RB_EMPTY_ROOT(&rx->calls) &&
 453            list_empty(&rx->recvmsg_q) &&
 454            rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
 455                release_sock(&rx->sk);
 456                return -ENODATA;
 457        }
 458
 459        if (list_empty(&rx->recvmsg_q)) {
 460                ret = -EWOULDBLOCK;
 461                if (timeo == 0) {
 462                        call = NULL;
 463                        goto error_no_call;
 464                }
 465
 466                release_sock(&rx->sk);
 467
 468                /* Wait for something to happen */
 469                prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
 470                                          TASK_INTERRUPTIBLE);
 471                ret = sock_error(&rx->sk);
 472                if (ret)
 473                        goto wait_error;
 474
 475                if (list_empty(&rx->recvmsg_q)) {
 476                        if (signal_pending(current))
 477                                goto wait_interrupted;
 478                        trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait,
 479                                            0, 0, 0, 0);
 480                        timeo = schedule_timeout(timeo);
 481                }
 482                finish_wait(sk_sleep(&rx->sk), &wait);
 483                goto try_again;
 484        }
 485
 486        /* Find the next call and dequeue it if we're not just peeking.  If we
 487         * do dequeue it, that comes with a ref that we will need to release.
 488         */
 489        write_lock_bh(&rx->recvmsg_lock);
 490        l = rx->recvmsg_q.next;
 491        call = list_entry(l, struct rxrpc_call, recvmsg_link);
 492        if (!(flags & MSG_PEEK))
 493                list_del_init(&call->recvmsg_link);
 494        else
 495                rxrpc_get_call(call, rxrpc_call_got);
 496        write_unlock_bh(&rx->recvmsg_lock);
 497
 498        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0, 0, 0, 0);
 499
 500        /* We're going to drop the socket lock, so we need to lock the call
 501         * against interference by sendmsg.
 502         */
 503        if (!mutex_trylock(&call->user_mutex)) {
 504                ret = -EWOULDBLOCK;
 505                if (flags & MSG_DONTWAIT)
 506                        goto error_requeue_call;
 507                ret = -ERESTARTSYS;
 508                if (mutex_lock_interruptible(&call->user_mutex) < 0)
 509                        goto error_requeue_call;
 510        }
 511
 512        release_sock(&rx->sk);
 513
 514        if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
 515                BUG();
 516
 517        if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
 518                if (flags & MSG_CMSG_COMPAT) {
 519                        unsigned int id32 = call->user_call_ID;
 520
 521                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 522                                       sizeof(unsigned int), &id32);
 523                } else {
 524                        unsigned long idl = call->user_call_ID;
 525
 526                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 527                                       sizeof(unsigned long), &idl);
 528                }
 529                if (ret < 0)
 530                        goto error_unlock_call;
 531        }
 532
 533        if (msg->msg_name) {
 534                struct sockaddr_rxrpc *srx = msg->msg_name;
 535                size_t len = sizeof(call->peer->srx);
 536
 537                memcpy(msg->msg_name, &call->peer->srx, len);
 538                srx->srx_service = call->service_id;
 539                msg->msg_namelen = len;
 540        }
 541
 542        switch (READ_ONCE(call->state)) {
 543        case RXRPC_CALL_SERVER_ACCEPTING:
 544                ret = rxrpc_recvmsg_new_call(rx, call, msg, flags);
 545                break;
 546        case RXRPC_CALL_CLIENT_RECV_REPLY:
 547        case RXRPC_CALL_SERVER_RECV_REQUEST:
 548        case RXRPC_CALL_SERVER_ACK_REQUEST:
 549                ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
 550                                         flags, &copied);
 551                if (ret == -EAGAIN)
 552                        ret = 0;
 553
 554                if (after(call->rx_top, call->rx_hard_ack) &&
 555                    call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
 556                        rxrpc_notify_socket(call);
 557                break;
 558        default:
 559                ret = 0;
 560                break;
 561        }
 562
 563        if (ret < 0)
 564                goto error_unlock_call;
 565
 566        if (call->state == RXRPC_CALL_COMPLETE) {
 567                ret = rxrpc_recvmsg_term(call, msg);
 568                if (ret < 0)
 569                        goto error_unlock_call;
 570                if (!(flags & MSG_PEEK))
 571                        rxrpc_release_call(rx, call);
 572                msg->msg_flags |= MSG_EOR;
 573                ret = 1;
 574        }
 575
 576        if (ret == 0)
 577                msg->msg_flags |= MSG_MORE;
 578        else
 579                msg->msg_flags &= ~MSG_MORE;
 580        ret = copied;
 581
 582error_unlock_call:
 583        mutex_unlock(&call->user_mutex);
 584        rxrpc_put_call(call, rxrpc_call_put);
 585        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
 586        return ret;
 587
 588error_requeue_call:
 589        if (!(flags & MSG_PEEK)) {
 590                write_lock_bh(&rx->recvmsg_lock);
 591                list_add(&call->recvmsg_link, &rx->recvmsg_q);
 592                write_unlock_bh(&rx->recvmsg_lock);
 593                trace_rxrpc_recvmsg(call, rxrpc_recvmsg_requeue, 0, 0, 0, 0);
 594        } else {
 595                rxrpc_put_call(call, rxrpc_call_put);
 596        }
 597error_no_call:
 598        release_sock(&rx->sk);
 599error_trace:
 600        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
 601        return ret;
 602
 603wait_interrupted:
 604        ret = sock_intr_errno(timeo);
 605wait_error:
 606        finish_wait(sk_sleep(&rx->sk), &wait);
 607        call = NULL;
 608        goto error_trace;
 609}
 610
 611/**
 612 * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
 613 * @sock: The socket that the call exists on
 614 * @call: The call to send data through
 615 * @iter: The buffer to receive into
 616 * @want_more: True if more data is expected to be read
 617 * @_abort: Where the abort code is stored if -ECONNABORTED is returned
 618 * @_service: Where to store the actual service ID (may be upgraded)
 619 *
 620 * Allow a kernel service to receive data and pick up information about the
 621 * state of a call.  Returns 0 if got what was asked for and there's more
 622 * available, 1 if we got what was asked for and we're at the end of the data
 623 * and -EAGAIN if we need more data.
 624 *
 625 * Note that we may return -EAGAIN to drain empty packets at the end of the
 626 * data, even if we've already copied over the requested data.
 627 *
 628 * *_abort should also be initialised to 0.
 629 */
 630int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
 631                           struct iov_iter *iter,
 632                           bool want_more, u32 *_abort, u16 *_service)
 633{
 634        size_t offset = 0;
 635        int ret;
 636
 637        _enter("{%d,%s},%zu,%d",
 638               call->debug_id, rxrpc_call_states[call->state],
 639               iov_iter_count(iter), want_more);
 640
 641        ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING);
 642
 643        mutex_lock(&call->user_mutex);
 644
 645        switch (READ_ONCE(call->state)) {
 646        case RXRPC_CALL_CLIENT_RECV_REPLY:
 647        case RXRPC_CALL_SERVER_RECV_REQUEST:
 648        case RXRPC_CALL_SERVER_ACK_REQUEST:
 649                ret = rxrpc_recvmsg_data(sock, call, NULL, iter,
 650                                         iov_iter_count(iter), 0,
 651                                         &offset);
 652                if (ret < 0)
 653                        goto out;
 654
 655                /* We can only reach here with a partially full buffer if we
 656                 * have reached the end of the data.  We must otherwise have a
 657                 * full buffer or have been given -EAGAIN.
 658                 */
 659                if (ret == 1) {
 660                        if (iov_iter_count(iter) > 0)
 661                                goto short_data;
 662                        if (!want_more)
 663                                goto read_phase_complete;
 664                        ret = 0;
 665                        goto out;
 666                }
 667
 668                if (!want_more)
 669                        goto excess_data;
 670                goto out;
 671
 672        case RXRPC_CALL_COMPLETE:
 673                goto call_complete;
 674
 675        default:
 676                ret = -EINPROGRESS;
 677                goto out;
 678        }
 679
 680read_phase_complete:
 681        ret = 1;
 682out:
 683        switch (call->ackr_reason) {
 684        case RXRPC_ACK_IDLE:
 685                break;
 686        case RXRPC_ACK_DELAY:
 687                if (ret != -EAGAIN)
 688                        break;
 689                /* Fall through */
 690        default:
 691                rxrpc_send_ack_packet(call, false, NULL);
 692        }
 693
 694        if (_service)
 695                *_service = call->service_id;
 696        mutex_unlock(&call->user_mutex);
 697        _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
 698        return ret;
 699
 700short_data:
 701        trace_rxrpc_rx_eproto(call, 0, tracepoint_string("short_data"));
 702        ret = -EBADMSG;
 703        goto out;
 704excess_data:
 705        trace_rxrpc_rx_eproto(call, 0, tracepoint_string("excess_data"));
 706        ret = -EMSGSIZE;
 707        goto out;
 708call_complete:
 709        *_abort = call->abort_code;
 710        ret = call->error;
 711        if (call->completion == RXRPC_CALL_SUCCEEDED) {
 712                ret = 1;
 713                if (iov_iter_count(iter) > 0)
 714                        ret = -ECONNRESET;
 715        }
 716        goto out;
 717}
 718EXPORT_SYMBOL(rxrpc_kernel_recv_data);
 719
 720/**
 721 * rxrpc_kernel_get_reply_time - Get timestamp on first reply packet
 722 * @sock: The socket that the call exists on
 723 * @call: The call to query
 724 * @_ts: Where to put the timestamp
 725 *
 726 * Retrieve the timestamp from the first DATA packet of the reply if it is
 727 * in the ring.  Returns true if successful, false if not.
 728 */
 729bool rxrpc_kernel_get_reply_time(struct socket *sock, struct rxrpc_call *call,
 730                                 ktime_t *_ts)
 731{
 732        struct sk_buff *skb;
 733        rxrpc_seq_t hard_ack, top, seq;
 734        bool success = false;
 735
 736        mutex_lock(&call->user_mutex);
 737
 738        if (READ_ONCE(call->state) != RXRPC_CALL_CLIENT_RECV_REPLY)
 739                goto out;
 740
 741        hard_ack = call->rx_hard_ack;
 742        if (hard_ack != 0)
 743                goto out;
 744
 745        seq = hard_ack + 1;
 746        top = smp_load_acquire(&call->rx_top);
 747        if (after(seq, top))
 748                goto out;
 749
 750        skb = call->rxtx_buffer[seq & RXRPC_RXTX_BUFF_MASK];
 751        if (!skb)
 752                goto out;
 753
 754        *_ts = skb_get_ktime(skb);
 755        success = true;
 756
 757out:
 758        mutex_unlock(&call->user_mutex);
 759        return success;
 760}
 761EXPORT_SYMBOL(rxrpc_kernel_get_reply_time);
 762