linux/fs/afs/rxrpc.c
<<
>>
Prefs
   1/* Maintain an RxRPC server socket to do AFS communications through
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#include <linux/slab.h>
  13#include <linux/sched/signal.h>
  14
  15#include <net/sock.h>
  16#include <net/af_rxrpc.h>
  17#include <rxrpc/packet.h>
  18#include "internal.h"
  19#include "afs_cm.h"
  20
  21struct socket *afs_socket; /* my RxRPC socket */
  22static struct workqueue_struct *afs_async_calls;
  23static struct afs_call *afs_spare_incoming_call;
  24atomic_t afs_outstanding_calls;
  25
  26static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
  27static int afs_wait_for_call_to_complete(struct afs_call *);
  28static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
  29static void afs_process_async_call(struct work_struct *);
  30static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
  31static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
  32static int afs_deliver_cm_op_id(struct afs_call *);
  33
  34/* asynchronous incoming call initial processing */
  35static const struct afs_call_type afs_RXCMxxxx = {
  36        .name           = "CB.xxxx",
  37        .deliver        = afs_deliver_cm_op_id,
  38        .abort_to_error = afs_abort_to_error,
  39};
  40
  41static void afs_charge_preallocation(struct work_struct *);
  42
  43static DECLARE_WORK(afs_charge_preallocation_work, afs_charge_preallocation);
  44
  45static int afs_wait_atomic_t(atomic_t *p)
  46{
  47        schedule();
  48        return 0;
  49}
  50
  51/*
  52 * open an RxRPC socket and bind it to be a server for callback notifications
  53 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
  54 */
  55int afs_open_socket(void)
  56{
  57        struct sockaddr_rxrpc srx;
  58        struct socket *socket;
  59        int ret;
  60
  61        _enter("");
  62
  63        ret = -ENOMEM;
  64        afs_async_calls = alloc_workqueue("kafsd", WQ_MEM_RECLAIM, 0);
  65        if (!afs_async_calls)
  66                goto error_0;
  67
  68        ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET, &socket);
  69        if (ret < 0)
  70                goto error_1;
  71
  72        socket->sk->sk_allocation = GFP_NOFS;
  73
  74        /* bind the callback manager's address to make this a server socket */
  75        srx.srx_family                  = AF_RXRPC;
  76        srx.srx_service                 = CM_SERVICE;
  77        srx.transport_type              = SOCK_DGRAM;
  78        srx.transport_len               = sizeof(srx.transport.sin);
  79        srx.transport.sin.sin_family    = AF_INET;
  80        srx.transport.sin.sin_port      = htons(AFS_CM_PORT);
  81        memset(&srx.transport.sin.sin_addr, 0,
  82               sizeof(srx.transport.sin.sin_addr));
  83
  84        ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
  85        if (ret < 0)
  86                goto error_2;
  87
  88        rxrpc_kernel_new_call_notification(socket, afs_rx_new_call,
  89                                           afs_rx_discard_new_call);
  90
  91        ret = kernel_listen(socket, INT_MAX);
  92        if (ret < 0)
  93                goto error_2;
  94
  95        afs_socket = socket;
  96        afs_charge_preallocation(NULL);
  97        _leave(" = 0");
  98        return 0;
  99
 100error_2:
 101        sock_release(socket);
 102error_1:
 103        destroy_workqueue(afs_async_calls);
 104error_0:
 105        _leave(" = %d", ret);
 106        return ret;
 107}
 108
 109/*
 110 * close the RxRPC socket AFS was using
 111 */
 112void afs_close_socket(void)
 113{
 114        _enter("");
 115
 116        kernel_listen(afs_socket, 0);
 117        flush_workqueue(afs_async_calls);
 118
 119        if (afs_spare_incoming_call) {
 120                afs_put_call(afs_spare_incoming_call);
 121                afs_spare_incoming_call = NULL;
 122        }
 123
 124        _debug("outstanding %u", atomic_read(&afs_outstanding_calls));
 125        wait_on_atomic_t(&afs_outstanding_calls, afs_wait_atomic_t,
 126                         TASK_UNINTERRUPTIBLE);
 127        _debug("no outstanding calls");
 128
 129        kernel_sock_shutdown(afs_socket, SHUT_RDWR);
 130        flush_workqueue(afs_async_calls);
 131        sock_release(afs_socket);
 132
 133        _debug("dework");
 134        destroy_workqueue(afs_async_calls);
 135        _leave("");
 136}
 137
 138/*
 139 * Allocate a call.
 140 */
 141static struct afs_call *afs_alloc_call(const struct afs_call_type *type,
 142                                       gfp_t gfp)
 143{
 144        struct afs_call *call;
 145        int o;
 146
 147        call = kzalloc(sizeof(*call), gfp);
 148        if (!call)
 149                return NULL;
 150
 151        call->type = type;
 152        atomic_set(&call->usage, 1);
 153        INIT_WORK(&call->async_work, afs_process_async_call);
 154        init_waitqueue_head(&call->waitq);
 155
 156        o = atomic_inc_return(&afs_outstanding_calls);
 157        trace_afs_call(call, afs_call_trace_alloc, 1, o,
 158                       __builtin_return_address(0));
 159        return call;
 160}
 161
 162/*
 163 * Dispose of a reference on a call.
 164 */
 165void afs_put_call(struct afs_call *call)
 166{
 167        int n = atomic_dec_return(&call->usage);
 168        int o = atomic_read(&afs_outstanding_calls);
 169
 170        trace_afs_call(call, afs_call_trace_put, n + 1, o,
 171                       __builtin_return_address(0));
 172
 173        ASSERTCMP(n, >=, 0);
 174        if (n == 0) {
 175                ASSERT(!work_pending(&call->async_work));
 176                ASSERT(call->type->name != NULL);
 177
 178                if (call->rxcall) {
 179                        rxrpc_kernel_end_call(afs_socket, call->rxcall);
 180                        call->rxcall = NULL;
 181                }
 182                if (call->type->destructor)
 183                        call->type->destructor(call);
 184
 185                kfree(call->request);
 186                kfree(call);
 187
 188                o = atomic_dec_return(&afs_outstanding_calls);
 189                trace_afs_call(call, afs_call_trace_free, 0, o,
 190                               __builtin_return_address(0));
 191                if (o == 0)
 192                        wake_up_atomic_t(&afs_outstanding_calls);
 193        }
 194}
 195
 196/*
 197 * Queue the call for actual work.  Returns 0 unconditionally for convenience.
 198 */
 199int afs_queue_call_work(struct afs_call *call)
 200{
 201        int u = atomic_inc_return(&call->usage);
 202
 203        trace_afs_call(call, afs_call_trace_work, u,
 204                       atomic_read(&afs_outstanding_calls),
 205                       __builtin_return_address(0));
 206
 207        INIT_WORK(&call->work, call->type->work);
 208
 209        if (!queue_work(afs_wq, &call->work))
 210                afs_put_call(call);
 211        return 0;
 212}
 213
 214/*
 215 * allocate a call with flat request and reply buffers
 216 */
 217struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type,
 218                                     size_t request_size, size_t reply_max)
 219{
 220        struct afs_call *call;
 221
 222        call = afs_alloc_call(type, GFP_NOFS);
 223        if (!call)
 224                goto nomem_call;
 225
 226        if (request_size) {
 227                call->request_size = request_size;
 228                call->request = kmalloc(request_size, GFP_NOFS);
 229                if (!call->request)
 230                        goto nomem_free;
 231        }
 232
 233        if (reply_max) {
 234                call->reply_max = reply_max;
 235                call->buffer = kmalloc(reply_max, GFP_NOFS);
 236                if (!call->buffer)
 237                        goto nomem_free;
 238        }
 239
 240        init_waitqueue_head(&call->waitq);
 241        return call;
 242
 243nomem_free:
 244        afs_put_call(call);
 245nomem_call:
 246        return NULL;
 247}
 248
 249/*
 250 * clean up a call with flat buffer
 251 */
 252void afs_flat_call_destructor(struct afs_call *call)
 253{
 254        _enter("");
 255
 256        kfree(call->request);
 257        call->request = NULL;
 258        kfree(call->buffer);
 259        call->buffer = NULL;
 260}
 261
 262#define AFS_BVEC_MAX 8
 263
 264/*
 265 * Load the given bvec with the next few pages.
 266 */
 267static void afs_load_bvec(struct afs_call *call, struct msghdr *msg,
 268                          struct bio_vec *bv, pgoff_t first, pgoff_t last,
 269                          unsigned offset)
 270{
 271        struct page *pages[AFS_BVEC_MAX];
 272        unsigned int nr, n, i, to, bytes = 0;
 273
 274        nr = min_t(pgoff_t, last - first + 1, AFS_BVEC_MAX);
 275        n = find_get_pages_contig(call->mapping, first, nr, pages);
 276        ASSERTCMP(n, ==, nr);
 277
 278        msg->msg_flags |= MSG_MORE;
 279        for (i = 0; i < nr; i++) {
 280                to = PAGE_SIZE;
 281                if (first + i >= last) {
 282                        to = call->last_to;
 283                        msg->msg_flags &= ~MSG_MORE;
 284                }
 285                bv[i].bv_page = pages[i];
 286                bv[i].bv_len = to - offset;
 287                bv[i].bv_offset = offset;
 288                bytes += to - offset;
 289                offset = 0;
 290        }
 291
 292        iov_iter_bvec(&msg->msg_iter, WRITE | ITER_BVEC, bv, nr, bytes);
 293}
 294
 295/*
 296 * attach the data from a bunch of pages on an inode to a call
 297 */
 298static int afs_send_pages(struct afs_call *call, struct msghdr *msg)
 299{
 300        struct bio_vec bv[AFS_BVEC_MAX];
 301        unsigned int bytes, nr, loop, offset;
 302        pgoff_t first = call->first, last = call->last;
 303        int ret;
 304
 305        offset = call->first_offset;
 306        call->first_offset = 0;
 307
 308        do {
 309                afs_load_bvec(call, msg, bv, first, last, offset);
 310                offset = 0;
 311                bytes = msg->msg_iter.count;
 312                nr = msg->msg_iter.nr_segs;
 313
 314                /* Have to change the state *before* sending the last
 315                 * packet as RxRPC might give us the reply before it
 316                 * returns from sending the request.
 317                 */
 318                if (first + nr - 1 >= last)
 319                        call->state = AFS_CALL_AWAIT_REPLY;
 320                ret = rxrpc_kernel_send_data(afs_socket, call->rxcall,
 321                                             msg, bytes);
 322                for (loop = 0; loop < nr; loop++)
 323                        put_page(bv[loop].bv_page);
 324                if (ret < 0)
 325                        break;
 326
 327                first += nr;
 328        } while (first <= last);
 329
 330        return ret;
 331}
 332
 333/*
 334 * initiate a call
 335 */
 336int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
 337                  bool async)
 338{
 339        struct sockaddr_rxrpc srx;
 340        struct rxrpc_call *rxcall;
 341        struct msghdr msg;
 342        struct kvec iov[1];
 343        size_t offset;
 344        u32 abort_code;
 345        int ret;
 346
 347        _enter("%x,{%d},", addr->s_addr, ntohs(call->port));
 348
 349        ASSERT(call->type != NULL);
 350        ASSERT(call->type->name != NULL);
 351
 352        _debug("____MAKE %p{%s,%x} [%d]____",
 353               call, call->type->name, key_serial(call->key),
 354               atomic_read(&afs_outstanding_calls));
 355
 356        call->async = async;
 357
 358        memset(&srx, 0, sizeof(srx));
 359        srx.srx_family = AF_RXRPC;
 360        srx.srx_service = call->service_id;
 361        srx.transport_type = SOCK_DGRAM;
 362        srx.transport_len = sizeof(srx.transport.sin);
 363        srx.transport.sin.sin_family = AF_INET;
 364        srx.transport.sin.sin_port = call->port;
 365        memcpy(&srx.transport.sin.sin_addr, addr, 4);
 366
 367        /* create a call */
 368        rxcall = rxrpc_kernel_begin_call(afs_socket, &srx, call->key,
 369                                         (unsigned long) call, gfp,
 370                                         (async ?
 371                                          afs_wake_up_async_call :
 372                                          afs_wake_up_call_waiter));
 373        call->key = NULL;
 374        if (IS_ERR(rxcall)) {
 375                ret = PTR_ERR(rxcall);
 376                goto error_kill_call;
 377        }
 378
 379        call->rxcall = rxcall;
 380
 381        /* send the request */
 382        iov[0].iov_base = call->request;
 383        iov[0].iov_len  = call->request_size;
 384
 385        msg.msg_name            = NULL;
 386        msg.msg_namelen         = 0;
 387        iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1,
 388                      call->request_size);
 389        msg.msg_control         = NULL;
 390        msg.msg_controllen      = 0;
 391        msg.msg_flags           = (call->send_pages ? MSG_MORE : 0);
 392
 393        /* We have to change the state *before* sending the last packet as
 394         * rxrpc might give us the reply before it returns from sending the
 395         * request.  Further, if the send fails, we may already have been given
 396         * a notification and may have collected it.
 397         */
 398        if (!call->send_pages)
 399                call->state = AFS_CALL_AWAIT_REPLY;
 400        ret = rxrpc_kernel_send_data(afs_socket, rxcall,
 401                                     &msg, call->request_size);
 402        if (ret < 0)
 403                goto error_do_abort;
 404
 405        if (call->send_pages) {
 406                ret = afs_send_pages(call, &msg);
 407                if (ret < 0)
 408                        goto error_do_abort;
 409        }
 410
 411        /* at this point, an async call may no longer exist as it may have
 412         * already completed */
 413        if (call->async)
 414                return -EINPROGRESS;
 415
 416        return afs_wait_for_call_to_complete(call);
 417
 418error_do_abort:
 419        call->state = AFS_CALL_COMPLETE;
 420        if (ret != -ECONNABORTED) {
 421                rxrpc_kernel_abort_call(afs_socket, rxcall, RX_USER_ABORT,
 422                                        ret, "KSD");
 423        } else {
 424                abort_code = 0;
 425                offset = 0;
 426                rxrpc_kernel_recv_data(afs_socket, rxcall, NULL, 0, &offset,
 427                                       false, &abort_code);
 428                ret = call->type->abort_to_error(abort_code);
 429        }
 430error_kill_call:
 431        afs_put_call(call);
 432        _leave(" = %d", ret);
 433        return ret;
 434}
 435
 436/*
 437 * deliver messages to a call
 438 */
 439static void afs_deliver_to_call(struct afs_call *call)
 440{
 441        u32 abort_code;
 442        int ret;
 443
 444        _enter("%s", call->type->name);
 445
 446        while (call->state == AFS_CALL_AWAIT_REPLY ||
 447               call->state == AFS_CALL_AWAIT_OP_ID ||
 448               call->state == AFS_CALL_AWAIT_REQUEST ||
 449               call->state == AFS_CALL_AWAIT_ACK
 450               ) {
 451                if (call->state == AFS_CALL_AWAIT_ACK) {
 452                        size_t offset = 0;
 453                        ret = rxrpc_kernel_recv_data(afs_socket, call->rxcall,
 454                                                     NULL, 0, &offset, false,
 455                                                     &call->abort_code);
 456                        trace_afs_recv_data(call, 0, offset, false, ret);
 457
 458                        if (ret == -EINPROGRESS || ret == -EAGAIN)
 459                                return;
 460                        if (ret == 1 || ret < 0) {
 461                                call->state = AFS_CALL_COMPLETE;
 462                                goto done;
 463                        }
 464                        return;
 465                }
 466
 467                ret = call->type->deliver(call);
 468                switch (ret) {
 469                case 0:
 470                        if (call->state == AFS_CALL_AWAIT_REPLY)
 471                                call->state = AFS_CALL_COMPLETE;
 472                        goto done;
 473                case -EINPROGRESS:
 474                case -EAGAIN:
 475                        goto out;
 476                case -ECONNABORTED:
 477                        goto call_complete;
 478                case -ENOTCONN:
 479                        abort_code = RX_CALL_DEAD;
 480                        rxrpc_kernel_abort_call(afs_socket, call->rxcall,
 481                                                abort_code, ret, "KNC");
 482                        goto save_error;
 483                case -ENOTSUPP:
 484                        abort_code = RXGEN_OPCODE;
 485                        rxrpc_kernel_abort_call(afs_socket, call->rxcall,
 486                                                abort_code, ret, "KIV");
 487                        goto save_error;
 488                case -ENODATA:
 489                case -EBADMSG:
 490                case -EMSGSIZE:
 491                default:
 492                        abort_code = RXGEN_CC_UNMARSHAL;
 493                        if (call->state != AFS_CALL_AWAIT_REPLY)
 494                                abort_code = RXGEN_SS_UNMARSHAL;
 495                        rxrpc_kernel_abort_call(afs_socket, call->rxcall,
 496                                                abort_code, -EBADMSG, "KUM");
 497                        goto save_error;
 498                }
 499        }
 500
 501done:
 502        if (call->state == AFS_CALL_COMPLETE && call->incoming)
 503                afs_put_call(call);
 504out:
 505        _leave("");
 506        return;
 507
 508save_error:
 509        call->error = ret;
 510call_complete:
 511        call->state = AFS_CALL_COMPLETE;
 512        goto done;
 513}
 514
 515/*
 516 * wait synchronously for a call to complete
 517 */
 518static int afs_wait_for_call_to_complete(struct afs_call *call)
 519{
 520        int ret;
 521
 522        DECLARE_WAITQUEUE(myself, current);
 523
 524        _enter("");
 525
 526        add_wait_queue(&call->waitq, &myself);
 527        for (;;) {
 528                set_current_state(TASK_INTERRUPTIBLE);
 529
 530                /* deliver any messages that are in the queue */
 531                if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
 532                        call->need_attention = false;
 533                        __set_current_state(TASK_RUNNING);
 534                        afs_deliver_to_call(call);
 535                        continue;
 536                }
 537
 538                if (call->state == AFS_CALL_COMPLETE ||
 539                    signal_pending(current))
 540                        break;
 541                schedule();
 542        }
 543
 544        remove_wait_queue(&call->waitq, &myself);
 545        __set_current_state(TASK_RUNNING);
 546
 547        /* Kill off the call if it's still live. */
 548        if (call->state < AFS_CALL_COMPLETE) {
 549                _debug("call interrupted");
 550                rxrpc_kernel_abort_call(afs_socket, call->rxcall,
 551                                        RX_USER_ABORT, -EINTR, "KWI");
 552        }
 553
 554        ret = call->error;
 555        _debug("call complete");
 556        afs_put_call(call);
 557        _leave(" = %d", ret);
 558        return ret;
 559}
 560
 561/*
 562 * wake up a waiting call
 563 */
 564static void afs_wake_up_call_waiter(struct sock *sk, struct rxrpc_call *rxcall,
 565                                    unsigned long call_user_ID)
 566{
 567        struct afs_call *call = (struct afs_call *)call_user_ID;
 568
 569        call->need_attention = true;
 570        wake_up(&call->waitq);
 571}
 572
 573/*
 574 * wake up an asynchronous call
 575 */
 576static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
 577                                   unsigned long call_user_ID)
 578{
 579        struct afs_call *call = (struct afs_call *)call_user_ID;
 580        int u;
 581
 582        trace_afs_notify_call(rxcall, call);
 583        call->need_attention = true;
 584
 585        u = __atomic_add_unless(&call->usage, 1, 0);
 586        if (u != 0) {
 587                trace_afs_call(call, afs_call_trace_wake, u,
 588                               atomic_read(&afs_outstanding_calls),
 589                               __builtin_return_address(0));
 590
 591                if (!queue_work(afs_async_calls, &call->async_work))
 592                        afs_put_call(call);
 593        }
 594}
 595
 596/*
 597 * Delete an asynchronous call.  The work item carries a ref to the call struct
 598 * that we need to release.
 599 */
 600static void afs_delete_async_call(struct work_struct *work)
 601{
 602        struct afs_call *call = container_of(work, struct afs_call, async_work);
 603
 604        _enter("");
 605
 606        afs_put_call(call);
 607
 608        _leave("");
 609}
 610
 611/*
 612 * Perform I/O processing on an asynchronous call.  The work item carries a ref
 613 * to the call struct that we either need to release or to pass on.
 614 */
 615static void afs_process_async_call(struct work_struct *work)
 616{
 617        struct afs_call *call = container_of(work, struct afs_call, async_work);
 618
 619        _enter("");
 620
 621        if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
 622                call->need_attention = false;
 623                afs_deliver_to_call(call);
 624        }
 625
 626        if (call->state == AFS_CALL_COMPLETE) {
 627                call->reply = NULL;
 628
 629                /* We have two refs to release - one from the alloc and one
 630                 * queued with the work item - and we can't just deallocate the
 631                 * call because the work item may be queued again.
 632                 */
 633                call->async_work.func = afs_delete_async_call;
 634                if (!queue_work(afs_async_calls, &call->async_work))
 635                        afs_put_call(call);
 636        }
 637
 638        afs_put_call(call);
 639        _leave("");
 640}
 641
 642static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
 643{
 644        struct afs_call *call = (struct afs_call *)user_call_ID;
 645
 646        call->rxcall = rxcall;
 647}
 648
 649/*
 650 * Charge the incoming call preallocation.
 651 */
 652static void afs_charge_preallocation(struct work_struct *work)
 653{
 654        struct afs_call *call = afs_spare_incoming_call;
 655
 656        for (;;) {
 657                if (!call) {
 658                        call = afs_alloc_call(&afs_RXCMxxxx, GFP_KERNEL);
 659                        if (!call)
 660                                break;
 661
 662                        call->async = true;
 663                        call->state = AFS_CALL_AWAIT_OP_ID;
 664                        init_waitqueue_head(&call->waitq);
 665                }
 666
 667                if (rxrpc_kernel_charge_accept(afs_socket,
 668                                               afs_wake_up_async_call,
 669                                               afs_rx_attach,
 670                                               (unsigned long)call,
 671                                               GFP_KERNEL) < 0)
 672                        break;
 673                call = NULL;
 674        }
 675        afs_spare_incoming_call = call;
 676}
 677
 678/*
 679 * Discard a preallocated call when a socket is shut down.
 680 */
 681static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
 682                                    unsigned long user_call_ID)
 683{
 684        struct afs_call *call = (struct afs_call *)user_call_ID;
 685
 686        call->rxcall = NULL;
 687        afs_put_call(call);
 688}
 689
 690/*
 691 * Notification of an incoming call.
 692 */
 693static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
 694                            unsigned long user_call_ID)
 695{
 696        queue_work(afs_wq, &afs_charge_preallocation_work);
 697}
 698
 699/*
 700 * Grab the operation ID from an incoming cache manager call.  The socket
 701 * buffer is discarded on error or if we don't yet have sufficient data.
 702 */
 703static int afs_deliver_cm_op_id(struct afs_call *call)
 704{
 705        int ret;
 706
 707        _enter("{%zu}", call->offset);
 708
 709        ASSERTCMP(call->offset, <, 4);
 710
 711        /* the operation ID forms the first four bytes of the request data */
 712        ret = afs_extract_data(call, &call->tmp, 4, true);
 713        if (ret < 0)
 714                return ret;
 715
 716        call->operation_ID = ntohl(call->tmp);
 717        call->state = AFS_CALL_AWAIT_REQUEST;
 718        call->offset = 0;
 719
 720        /* ask the cache manager to route the call (it'll change the call type
 721         * if successful) */
 722        if (!afs_cm_incoming_call(call))
 723                return -ENOTSUPP;
 724
 725        trace_afs_cb_call(call);
 726
 727        /* pass responsibility for the remainer of this message off to the
 728         * cache manager op */
 729        return call->type->deliver(call);
 730}
 731
 732/*
 733 * send an empty reply
 734 */
 735void afs_send_empty_reply(struct afs_call *call)
 736{
 737        struct msghdr msg;
 738
 739        _enter("");
 740
 741        msg.msg_name            = NULL;
 742        msg.msg_namelen         = 0;
 743        iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, NULL, 0, 0);
 744        msg.msg_control         = NULL;
 745        msg.msg_controllen      = 0;
 746        msg.msg_flags           = 0;
 747
 748        call->state = AFS_CALL_AWAIT_ACK;
 749        switch (rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, 0)) {
 750        case 0:
 751                _leave(" [replied]");
 752                return;
 753
 754        case -ENOMEM:
 755                _debug("oom");
 756                rxrpc_kernel_abort_call(afs_socket, call->rxcall,
 757                                        RX_USER_ABORT, -ENOMEM, "KOO");
 758        default:
 759                _leave(" [error]");
 760                return;
 761        }
 762}
 763
 764/*
 765 * send a simple reply
 766 */
 767void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
 768{
 769        struct msghdr msg;
 770        struct kvec iov[1];
 771        int n;
 772
 773        _enter("");
 774
 775        iov[0].iov_base         = (void *) buf;
 776        iov[0].iov_len          = len;
 777        msg.msg_name            = NULL;
 778        msg.msg_namelen         = 0;
 779        iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1, len);
 780        msg.msg_control         = NULL;
 781        msg.msg_controllen      = 0;
 782        msg.msg_flags           = 0;
 783
 784        call->state = AFS_CALL_AWAIT_ACK;
 785        n = rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, len);
 786        if (n >= 0) {
 787                /* Success */
 788                _leave(" [replied]");
 789                return;
 790        }
 791
 792        if (n == -ENOMEM) {
 793                _debug("oom");
 794                rxrpc_kernel_abort_call(afs_socket, call->rxcall,
 795                                        RX_USER_ABORT, -ENOMEM, "KOO");
 796        }
 797        _leave(" [error]");
 798}
 799
 800/*
 801 * Extract a piece of data from the received data socket buffers.
 802 */
 803int afs_extract_data(struct afs_call *call, void *buf, size_t count,
 804                     bool want_more)
 805{
 806        int ret;
 807
 808        _enter("{%s,%zu},,%zu,%d",
 809               call->type->name, call->offset, count, want_more);
 810
 811        ASSERTCMP(call->offset, <=, count);
 812
 813        ret = rxrpc_kernel_recv_data(afs_socket, call->rxcall,
 814                                     buf, count, &call->offset,
 815                                     want_more, &call->abort_code);
 816        trace_afs_recv_data(call, count, call->offset, want_more, ret);
 817        if (ret == 0 || ret == -EAGAIN)
 818                return ret;
 819
 820        if (ret == 1) {
 821                switch (call->state) {
 822                case AFS_CALL_AWAIT_REPLY:
 823                        call->state = AFS_CALL_COMPLETE;
 824                        break;
 825                case AFS_CALL_AWAIT_REQUEST:
 826                        call->state = AFS_CALL_REPLYING;
 827                        break;
 828                default:
 829                        break;
 830                }
 831                return 0;
 832        }
 833
 834        if (ret == -ECONNABORTED)
 835                call->error = call->type->abort_to_error(call->abort_code);
 836        else
 837                call->error = ret;
 838        call->state = AFS_CALL_COMPLETE;
 839        return ret;
 840}
 841