linux/net/rxrpc/recvmsg.c
<<
>>
Prefs
   1/* RxRPC recvmsg() implementation
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  13
  14#include <linux/net.h>
  15#include <linux/skbuff.h>
  16#include <linux/export.h>
  17#include <net/sock.h>
  18#include <net/af_rxrpc.h>
  19#include "ar-internal.h"
  20
  21/*
  22 * Post a call for attention by the socket or kernel service.  Further
  23 * notifications are suppressed by putting recvmsg_link on a dummy queue.
  24 */
  25void rxrpc_notify_socket(struct rxrpc_call *call)
  26{
  27        struct rxrpc_sock *rx;
  28        struct sock *sk;
  29
  30        _enter("%d", call->debug_id);
  31
  32        if (!list_empty(&call->recvmsg_link))
  33                return;
  34
  35        rcu_read_lock();
  36
  37        rx = rcu_dereference(call->socket);
  38        sk = &rx->sk;
  39        if (rx && sk->sk_state < RXRPC_CLOSE) {
  40                if (call->notify_rx) {
  41                        call->notify_rx(sk, call, call->user_call_ID);
  42                } else {
  43                        write_lock_bh(&rx->recvmsg_lock);
  44                        if (list_empty(&call->recvmsg_link)) {
  45                                rxrpc_get_call(call, rxrpc_call_got);
  46                                list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
  47                        }
  48                        write_unlock_bh(&rx->recvmsg_lock);
  49
  50                        if (!sock_flag(sk, SOCK_DEAD)) {
  51                                _debug("call %ps", sk->sk_data_ready);
  52                                sk->sk_data_ready(sk);
  53                        }
  54                }
  55        }
  56
  57        rcu_read_unlock();
  58        _leave("");
  59}
  60
  61/*
  62 * Pass a call terminating message to userspace.
  63 */
  64static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
  65{
  66        u32 tmp = 0;
  67        int ret;
  68
  69        switch (call->completion) {
  70        case RXRPC_CALL_SUCCEEDED:
  71                ret = 0;
  72                if (rxrpc_is_service_call(call))
  73                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
  74                break;
  75        case RXRPC_CALL_REMOTELY_ABORTED:
  76                tmp = call->abort_code;
  77                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
  78                break;
  79        case RXRPC_CALL_LOCALLY_ABORTED:
  80                tmp = call->abort_code;
  81                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
  82                break;
  83        case RXRPC_CALL_NETWORK_ERROR:
  84                tmp = call->error;
  85                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
  86                break;
  87        case RXRPC_CALL_LOCAL_ERROR:
  88                tmp = call->error;
  89                ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
  90                break;
  91        default:
  92                pr_err("Invalid terminal call state %u\n", call->state);
  93                BUG();
  94                break;
  95        }
  96
  97        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_terminal, call->rx_hard_ack,
  98                            call->rx_pkt_offset, call->rx_pkt_len, ret);
  99        return ret;
 100}
 101
 102/*
 103 * Pass back notification of a new call.  The call is added to the
 104 * to-be-accepted list.  This means that the next call to be accepted might not
 105 * be the last call seen awaiting acceptance, but unless we leave this on the
 106 * front of the queue and block all other messages until someone gives us a
 107 * user_ID for it, there's not a lot we can do.
 108 */
 109static int rxrpc_recvmsg_new_call(struct rxrpc_sock *rx,
 110                                  struct rxrpc_call *call,
 111                                  struct msghdr *msg, int flags)
 112{
 113        int tmp = 0, ret;
 114
 115        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NEW_CALL, 0, &tmp);
 116
 117        if (ret == 0 && !(flags & MSG_PEEK)) {
 118                _debug("to be accepted");
 119                write_lock_bh(&rx->recvmsg_lock);
 120                list_del_init(&call->recvmsg_link);
 121                write_unlock_bh(&rx->recvmsg_lock);
 122
 123                rxrpc_get_call(call, rxrpc_call_got);
 124                write_lock(&rx->call_lock);
 125                list_add_tail(&call->accept_link, &rx->to_be_accepted);
 126                write_unlock(&rx->call_lock);
 127        }
 128
 129        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_to_be_accepted, 1, 0, 0, ret);
 130        return ret;
 131}
 132
 133/*
 134 * End the packet reception phase.
 135 */
 136static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
 137{
 138        _enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
 139
 140        trace_rxrpc_receive(call, rxrpc_receive_end, 0, call->rx_top);
 141        ASSERTCMP(call->rx_hard_ack, ==, call->rx_top);
 142
 143        if (call->state == RXRPC_CALL_CLIENT_RECV_REPLY) {
 144                rxrpc_propose_ACK(call, RXRPC_ACK_IDLE, 0, serial, true, false,
 145                                  rxrpc_propose_ack_terminal_ack);
 146                rxrpc_send_ack_packet(call, false);
 147        }
 148
 149        write_lock_bh(&call->state_lock);
 150
 151        switch (call->state) {
 152        case RXRPC_CALL_CLIENT_RECV_REPLY:
 153                __rxrpc_call_completed(call);
 154                write_unlock_bh(&call->state_lock);
 155                break;
 156
 157        case RXRPC_CALL_SERVER_RECV_REQUEST:
 158                call->tx_phase = true;
 159                call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
 160                call->ack_at = call->expire_at;
 161                write_unlock_bh(&call->state_lock);
 162                rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, 0, serial, false, true,
 163                                  rxrpc_propose_ack_processing_op);
 164                break;
 165        default:
 166                write_unlock_bh(&call->state_lock);
 167                break;
 168        }
 169}
 170
 171/*
 172 * Discard a packet we've used up and advance the Rx window by one.
 173 */
 174static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
 175{
 176        struct rxrpc_skb_priv *sp;
 177        struct sk_buff *skb;
 178        rxrpc_serial_t serial;
 179        rxrpc_seq_t hard_ack, top;
 180        u8 flags;
 181        int ix;
 182
 183        _enter("%d", call->debug_id);
 184
 185        hard_ack = call->rx_hard_ack;
 186        top = smp_load_acquire(&call->rx_top);
 187        ASSERT(before(hard_ack, top));
 188
 189        hard_ack++;
 190        ix = hard_ack & RXRPC_RXTX_BUFF_MASK;
 191        skb = call->rxtx_buffer[ix];
 192        rxrpc_see_skb(skb, rxrpc_skb_rx_rotated);
 193        sp = rxrpc_skb(skb);
 194        flags = sp->hdr.flags;
 195        serial = sp->hdr.serial;
 196        if (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO)
 197                serial += (call->rxtx_annotations[ix] & RXRPC_RX_ANNO_JUMBO) - 1;
 198
 199        call->rxtx_buffer[ix] = NULL;
 200        call->rxtx_annotations[ix] = 0;
 201        /* Barrier against rxrpc_input_data(). */
 202        smp_store_release(&call->rx_hard_ack, hard_ack);
 203
 204        rxrpc_free_skb(skb, rxrpc_skb_rx_freed);
 205
 206        _debug("%u,%u,%02x", hard_ack, top, flags);
 207        trace_rxrpc_receive(call, rxrpc_receive_rotate, serial, hard_ack);
 208        if (flags & RXRPC_LAST_PACKET) {
 209                rxrpc_end_rx_phase(call, serial);
 210        } else {
 211                /* Check to see if there's an ACK that needs sending. */
 212                if (after_eq(hard_ack, call->ackr_consumed + 2) ||
 213                    after_eq(top, call->ackr_seen + 2) ||
 214                    (hard_ack == top && after(hard_ack, call->ackr_consumed)))
 215                        rxrpc_propose_ACK(call, RXRPC_ACK_DELAY, 0, serial,
 216                                          true, false,
 217                                          rxrpc_propose_ack_rotate_rx);
 218                if (call->ackr_reason)
 219                        rxrpc_send_ack_packet(call, false);
 220        }
 221}
 222
 223/*
 224 * Decrypt and verify a (sub)packet.  The packet's length may be changed due to
 225 * padding, but if this is the case, the packet length will be resident in the
 226 * socket buffer.  Note that we can't modify the master skb info as the skb may
 227 * be the home to multiple subpackets.
 228 */
 229static int rxrpc_verify_packet(struct rxrpc_call *call, struct sk_buff *skb,
 230                               u8 annotation,
 231                               unsigned int offset, unsigned int len)
 232{
 233        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
 234        rxrpc_seq_t seq = sp->hdr.seq;
 235        u16 cksum = sp->hdr.cksum;
 236
 237        _enter("");
 238
 239        /* For all but the head jumbo subpacket, the security checksum is in a
 240         * jumbo header immediately prior to the data.
 241         */
 242        if ((annotation & RXRPC_RX_ANNO_JUMBO) > 1) {
 243                __be16 tmp;
 244                if (skb_copy_bits(skb, offset - 2, &tmp, 2) < 0)
 245                        BUG();
 246                cksum = ntohs(tmp);
 247                seq += (annotation & RXRPC_RX_ANNO_JUMBO) - 1;
 248        }
 249
 250        return call->conn->security->verify_packet(call, skb, offset, len,
 251                                                   seq, cksum);
 252}
 253
 254/*
 255 * Locate the data within a packet.  This is complicated by:
 256 *
 257 * (1) An skb may contain a jumbo packet - so we have to find the appropriate
 258 *     subpacket.
 259 *
 260 * (2) The (sub)packets may be encrypted and, if so, the encrypted portion
 261 *     contains an extra header which includes the true length of the data,
 262 *     excluding any encrypted padding.
 263 */
 264static int rxrpc_locate_data(struct rxrpc_call *call, struct sk_buff *skb,
 265                             u8 *_annotation,
 266                             unsigned int *_offset, unsigned int *_len)
 267{
 268        unsigned int offset = sizeof(struct rxrpc_wire_header);
 269        unsigned int len = *_len;
 270        int ret;
 271        u8 annotation = *_annotation;
 272
 273        /* Locate the subpacket */
 274        len = skb->len - offset;
 275        if ((annotation & RXRPC_RX_ANNO_JUMBO) > 0) {
 276                offset += (((annotation & RXRPC_RX_ANNO_JUMBO) - 1) *
 277                           RXRPC_JUMBO_SUBPKTLEN);
 278                len = (annotation & RXRPC_RX_ANNO_JLAST) ?
 279                        skb->len - offset : RXRPC_JUMBO_SUBPKTLEN;
 280        }
 281
 282        if (!(annotation & RXRPC_RX_ANNO_VERIFIED)) {
 283                ret = rxrpc_verify_packet(call, skb, annotation, offset, len);
 284                if (ret < 0)
 285                        return ret;
 286                *_annotation |= RXRPC_RX_ANNO_VERIFIED;
 287        }
 288
 289        *_offset = offset;
 290        *_len = len;
 291        call->conn->security->locate_data(call, skb, _offset, _len);
 292        return 0;
 293}
 294
 295/*
 296 * Deliver messages to a call.  This keeps processing packets until the buffer
 297 * is filled and we find either more DATA (returns 0) or the end of the DATA
 298 * (returns 1).  If more packets are required, it returns -EAGAIN.
 299 */
 300static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
 301                              struct msghdr *msg, struct iov_iter *iter,
 302                              size_t len, int flags, size_t *_offset)
 303{
 304        struct rxrpc_skb_priv *sp;
 305        struct sk_buff *skb;
 306        rxrpc_seq_t hard_ack, top, seq;
 307        size_t remain;
 308        bool last;
 309        unsigned int rx_pkt_offset, rx_pkt_len;
 310        int ix, copy, ret = -EAGAIN, ret2;
 311
 312        rx_pkt_offset = call->rx_pkt_offset;
 313        rx_pkt_len = call->rx_pkt_len;
 314
 315        if (call->state >= RXRPC_CALL_SERVER_ACK_REQUEST) {
 316                seq = call->rx_hard_ack;
 317                ret = 1;
 318                goto done;
 319        }
 320
 321        /* Barriers against rxrpc_input_data(). */
 322        hard_ack = call->rx_hard_ack;
 323        top = smp_load_acquire(&call->rx_top);
 324        for (seq = hard_ack + 1; before_eq(seq, top); seq++) {
 325                ix = seq & RXRPC_RXTX_BUFF_MASK;
 326                skb = call->rxtx_buffer[ix];
 327                if (!skb) {
 328                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_hole, seq,
 329                                            rx_pkt_offset, rx_pkt_len, 0);
 330                        break;
 331                }
 332                smp_rmb();
 333                rxrpc_see_skb(skb, rxrpc_skb_rx_seen);
 334                sp = rxrpc_skb(skb);
 335
 336                if (!(flags & MSG_PEEK))
 337                        trace_rxrpc_receive(call, rxrpc_receive_front,
 338                                            sp->hdr.serial, seq);
 339
 340                if (msg)
 341                        sock_recv_timestamp(msg, sock->sk, skb);
 342
 343                if (rx_pkt_offset == 0) {
 344                        ret2 = rxrpc_locate_data(call, skb,
 345                                                 &call->rxtx_annotations[ix],
 346                                                 &rx_pkt_offset, &rx_pkt_len);
 347                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_next, seq,
 348                                            rx_pkt_offset, rx_pkt_len, ret2);
 349                        if (ret2 < 0) {
 350                                ret = ret2;
 351                                goto out;
 352                        }
 353                } else {
 354                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_cont, seq,
 355                                            rx_pkt_offset, rx_pkt_len, 0);
 356                }
 357
 358                /* We have to handle short, empty and used-up DATA packets. */
 359                remain = len - *_offset;
 360                copy = rx_pkt_len;
 361                if (copy > remain)
 362                        copy = remain;
 363                if (copy > 0) {
 364                        ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
 365                                                      copy);
 366                        if (ret2 < 0) {
 367                                ret = ret2;
 368                                goto out;
 369                        }
 370
 371                        /* handle piecemeal consumption of data packets */
 372                        rx_pkt_offset += copy;
 373                        rx_pkt_len -= copy;
 374                        *_offset += copy;
 375                }
 376
 377                if (rx_pkt_len > 0) {
 378                        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_full, seq,
 379                                            rx_pkt_offset, rx_pkt_len, 0);
 380                        ASSERTCMP(*_offset, ==, len);
 381                        ret = 0;
 382                        break;
 383                }
 384
 385                /* The whole packet has been transferred. */
 386                last = sp->hdr.flags & RXRPC_LAST_PACKET;
 387                if (!(flags & MSG_PEEK))
 388                        rxrpc_rotate_rx_window(call);
 389                rx_pkt_offset = 0;
 390                rx_pkt_len = 0;
 391
 392                if (last) {
 393                        ASSERTCMP(seq, ==, READ_ONCE(call->rx_top));
 394                        ret = 1;
 395                        goto out;
 396                }
 397        }
 398
 399out:
 400        if (!(flags & MSG_PEEK)) {
 401                call->rx_pkt_offset = rx_pkt_offset;
 402                call->rx_pkt_len = rx_pkt_len;
 403        }
 404done:
 405        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_data_return, seq,
 406                            rx_pkt_offset, rx_pkt_len, ret);
 407        return ret;
 408}
 409
 410/*
 411 * Receive a message from an RxRPC socket
 412 * - we need to be careful about two or more threads calling recvmsg
 413 *   simultaneously
 414 */
 415int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
 416                  int flags)
 417{
 418        struct rxrpc_call *call;
 419        struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
 420        struct list_head *l;
 421        size_t copied = 0;
 422        long timeo;
 423        int ret;
 424
 425        DEFINE_WAIT(wait);
 426
 427        trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_enter, 0, 0, 0, 0);
 428
 429        if (flags & (MSG_OOB | MSG_TRUNC))
 430                return -EOPNOTSUPP;
 431
 432        timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
 433
 434try_again:
 435        lock_sock(&rx->sk);
 436
 437        /* Return immediately if a client socket has no outstanding calls */
 438        if (RB_EMPTY_ROOT(&rx->calls) &&
 439            list_empty(&rx->recvmsg_q) &&
 440            rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
 441                release_sock(&rx->sk);
 442                return -ENODATA;
 443        }
 444
 445        if (list_empty(&rx->recvmsg_q)) {
 446                ret = -EWOULDBLOCK;
 447                if (timeo == 0) {
 448                        call = NULL;
 449                        goto error_no_call;
 450                }
 451
 452                release_sock(&rx->sk);
 453
 454                /* Wait for something to happen */
 455                prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
 456                                          TASK_INTERRUPTIBLE);
 457                ret = sock_error(&rx->sk);
 458                if (ret)
 459                        goto wait_error;
 460
 461                if (list_empty(&rx->recvmsg_q)) {
 462                        if (signal_pending(current))
 463                                goto wait_interrupted;
 464                        trace_rxrpc_recvmsg(NULL, rxrpc_recvmsg_wait,
 465                                            0, 0, 0, 0);
 466                        timeo = schedule_timeout(timeo);
 467                }
 468                finish_wait(sk_sleep(&rx->sk), &wait);
 469                goto try_again;
 470        }
 471
 472        /* Find the next call and dequeue it if we're not just peeking.  If we
 473         * do dequeue it, that comes with a ref that we will need to release.
 474         */
 475        write_lock_bh(&rx->recvmsg_lock);
 476        l = rx->recvmsg_q.next;
 477        call = list_entry(l, struct rxrpc_call, recvmsg_link);
 478        if (!(flags & MSG_PEEK))
 479                list_del_init(&call->recvmsg_link);
 480        else
 481                rxrpc_get_call(call, rxrpc_call_got);
 482        write_unlock_bh(&rx->recvmsg_lock);
 483
 484        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_dequeue, 0, 0, 0, 0);
 485
 486        if (test_bit(RXRPC_CALL_RELEASED, &call->flags))
 487                BUG();
 488
 489        if (test_bit(RXRPC_CALL_HAS_USERID, &call->flags)) {
 490                if (flags & MSG_CMSG_COMPAT) {
 491                        unsigned int id32 = call->user_call_ID;
 492
 493                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 494                                       sizeof(unsigned int), &id32);
 495                } else {
 496                        ret = put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
 497                                       sizeof(unsigned long),
 498                                       &call->user_call_ID);
 499                }
 500                if (ret < 0)
 501                        goto error;
 502        }
 503
 504        if (msg->msg_name) {
 505                size_t len = sizeof(call->conn->params.peer->srx);
 506                memcpy(msg->msg_name, &call->conn->params.peer->srx, len);
 507                msg->msg_namelen = len;
 508        }
 509
 510        switch (call->state) {
 511        case RXRPC_CALL_SERVER_ACCEPTING:
 512                ret = rxrpc_recvmsg_new_call(rx, call, msg, flags);
 513                break;
 514        case RXRPC_CALL_CLIENT_RECV_REPLY:
 515        case RXRPC_CALL_SERVER_RECV_REQUEST:
 516        case RXRPC_CALL_SERVER_ACK_REQUEST:
 517                ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
 518                                         flags, &copied);
 519                if (ret == -EAGAIN)
 520                        ret = 0;
 521
 522                if (after(call->rx_top, call->rx_hard_ack) &&
 523                    call->rxtx_buffer[(call->rx_hard_ack + 1) & RXRPC_RXTX_BUFF_MASK])
 524                        rxrpc_notify_socket(call);
 525                break;
 526        default:
 527                ret = 0;
 528                break;
 529        }
 530
 531        if (ret < 0)
 532                goto error;
 533
 534        if (call->state == RXRPC_CALL_COMPLETE) {
 535                ret = rxrpc_recvmsg_term(call, msg);
 536                if (ret < 0)
 537                        goto error;
 538                if (!(flags & MSG_PEEK))
 539                        rxrpc_release_call(rx, call);
 540                msg->msg_flags |= MSG_EOR;
 541                ret = 1;
 542        }
 543
 544        if (ret == 0)
 545                msg->msg_flags |= MSG_MORE;
 546        else
 547                msg->msg_flags &= ~MSG_MORE;
 548        ret = copied;
 549
 550error:
 551        rxrpc_put_call(call, rxrpc_call_put);
 552error_no_call:
 553        release_sock(&rx->sk);
 554        trace_rxrpc_recvmsg(call, rxrpc_recvmsg_return, 0, 0, 0, ret);
 555        return ret;
 556
 557wait_interrupted:
 558        ret = sock_intr_errno(timeo);
 559wait_error:
 560        finish_wait(sk_sleep(&rx->sk), &wait);
 561        call = NULL;
 562        goto error_no_call;
 563}
 564
 565/**
 566 * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
 567 * @sock: The socket that the call exists on
 568 * @call: The call to send data through
 569 * @buf: The buffer to receive into
 570 * @size: The size of the buffer, including data already read
 571 * @_offset: The running offset into the buffer.
 572 * @want_more: True if more data is expected to be read
 573 * @_abort: Where the abort code is stored if -ECONNABORTED is returned
 574 *
 575 * Allow a kernel service to receive data and pick up information about the
 576 * state of a call.  Returns 0 if got what was asked for and there's more
 577 * available, 1 if we got what was asked for and we're at the end of the data
 578 * and -EAGAIN if we need more data.
 579 *
 580 * Note that we may return -EAGAIN to drain empty packets at the end of the
 581 * data, even if we've already copied over the requested data.
 582 *
 583 * This function adds the amount it transfers to *_offset, so this should be
 584 * precleared as appropriate.  Note that the amount remaining in the buffer is
 585 * taken to be size - *_offset.
 586 *
 587 * *_abort should also be initialised to 0.
 588 */
 589int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
 590                           void *buf, size_t size, size_t *_offset,
 591                           bool want_more, u32 *_abort)
 592{
 593        struct iov_iter iter;
 594        struct kvec iov;
 595        int ret;
 596
 597        _enter("{%d,%s},%zu/%zu,%d",
 598               call->debug_id, rxrpc_call_states[call->state],
 599               *_offset, size, want_more);
 600
 601        ASSERTCMP(*_offset, <=, size);
 602        ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_ACCEPTING);
 603
 604        iov.iov_base = buf + *_offset;
 605        iov.iov_len = size - *_offset;
 606        iov_iter_kvec(&iter, ITER_KVEC | READ, &iov, 1, size - *_offset);
 607
 608        lock_sock(sock->sk);
 609
 610        switch (call->state) {
 611        case RXRPC_CALL_CLIENT_RECV_REPLY:
 612        case RXRPC_CALL_SERVER_RECV_REQUEST:
 613        case RXRPC_CALL_SERVER_ACK_REQUEST:
 614                ret = rxrpc_recvmsg_data(sock, call, NULL, &iter, size, 0,
 615                                         _offset);
 616                if (ret < 0)
 617                        goto out;
 618
 619                /* We can only reach here with a partially full buffer if we
 620                 * have reached the end of the data.  We must otherwise have a
 621                 * full buffer or have been given -EAGAIN.
 622                 */
 623                if (ret == 1) {
 624                        if (*_offset < size)
 625                                goto short_data;
 626                        if (!want_more)
 627                                goto read_phase_complete;
 628                        ret = 0;
 629                        goto out;
 630                }
 631
 632                if (!want_more)
 633                        goto excess_data;
 634                goto out;
 635
 636        case RXRPC_CALL_COMPLETE:
 637                goto call_complete;
 638
 639        default:
 640                ret = -EINPROGRESS;
 641                goto out;
 642        }
 643
 644read_phase_complete:
 645        ret = 1;
 646out:
 647        release_sock(sock->sk);
 648        _leave(" = %d [%zu,%d]", ret, *_offset, *_abort);
 649        return ret;
 650
 651short_data:
 652        ret = -EBADMSG;
 653        goto out;
 654excess_data:
 655        ret = -EMSGSIZE;
 656        goto out;
 657call_complete:
 658        *_abort = call->abort_code;
 659        ret = -call->error;
 660        if (call->completion == RXRPC_CALL_SUCCEEDED) {
 661                ret = 1;
 662                if (size > 0)
 663                        ret = -ECONNRESET;
 664        }
 665        goto out;
 666}
 667EXPORT_SYMBOL(rxrpc_kernel_recv_data);
 668