linux/fs/afs/rxrpc.c
<<
>>
Prefs
   1/* Maintain an RxRPC server socket to do AFS communications through
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#include <linux/slab.h>
  13#include <linux/sched/signal.h>
  14
  15#include <net/sock.h>
  16#include <net/af_rxrpc.h>
  17#include "internal.h"
  18#include "afs_cm.h"
  19
  20struct socket *afs_socket; /* my RxRPC socket */
  21static struct workqueue_struct *afs_async_calls;
  22static struct afs_call *afs_spare_incoming_call;
  23atomic_t afs_outstanding_calls;
  24
  25static void afs_wake_up_call_waiter(struct sock *, struct rxrpc_call *, unsigned long);
  26static int afs_wait_for_call_to_complete(struct afs_call *);
  27static void afs_wake_up_async_call(struct sock *, struct rxrpc_call *, unsigned long);
  28static void afs_process_async_call(struct work_struct *);
  29static void afs_rx_new_call(struct sock *, struct rxrpc_call *, unsigned long);
  30static void afs_rx_discard_new_call(struct rxrpc_call *, unsigned long);
  31static int afs_deliver_cm_op_id(struct afs_call *);
  32
  33/* asynchronous incoming call initial processing */
  34static const struct afs_call_type afs_RXCMxxxx = {
  35        .name           = "CB.xxxx",
  36        .deliver        = afs_deliver_cm_op_id,
  37        .abort_to_error = afs_abort_to_error,
  38};
  39
  40static void afs_charge_preallocation(struct work_struct *);
  41
  42static DECLARE_WORK(afs_charge_preallocation_work, afs_charge_preallocation);
  43
  44static int afs_wait_atomic_t(atomic_t *p)
  45{
  46        schedule();
  47        return 0;
  48}
  49
  50/*
  51 * open an RxRPC socket and bind it to be a server for callback notifications
  52 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
  53 */
  54int afs_open_socket(void)
  55{
  56        struct sockaddr_rxrpc srx;
  57        struct socket *socket;
  58        int ret;
  59
  60        _enter("");
  61
  62        ret = -ENOMEM;
  63        afs_async_calls = alloc_workqueue("kafsd", WQ_MEM_RECLAIM, 0);
  64        if (!afs_async_calls)
  65                goto error_0;
  66
  67        ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET, &socket);
  68        if (ret < 0)
  69                goto error_1;
  70
  71        socket->sk->sk_allocation = GFP_NOFS;
  72
  73        /* bind the callback manager's address to make this a server socket */
  74        srx.srx_family                  = AF_RXRPC;
  75        srx.srx_service                 = CM_SERVICE;
  76        srx.transport_type              = SOCK_DGRAM;
  77        srx.transport_len               = sizeof(srx.transport.sin);
  78        srx.transport.sin.sin_family    = AF_INET;
  79        srx.transport.sin.sin_port      = htons(AFS_CM_PORT);
  80        memset(&srx.transport.sin.sin_addr, 0,
  81               sizeof(srx.transport.sin.sin_addr));
  82
  83        ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
  84        if (ret < 0)
  85                goto error_2;
  86
  87        rxrpc_kernel_new_call_notification(socket, afs_rx_new_call,
  88                                           afs_rx_discard_new_call);
  89
  90        ret = kernel_listen(socket, INT_MAX);
  91        if (ret < 0)
  92                goto error_2;
  93
  94        afs_socket = socket;
  95        afs_charge_preallocation(NULL);
  96        _leave(" = 0");
  97        return 0;
  98
  99error_2:
 100        sock_release(socket);
 101error_1:
 102        destroy_workqueue(afs_async_calls);
 103error_0:
 104        _leave(" = %d", ret);
 105        return ret;
 106}
 107
 108/*
 109 * close the RxRPC socket AFS was using
 110 */
 111void afs_close_socket(void)
 112{
 113        _enter("");
 114
 115        kernel_listen(afs_socket, 0);
 116        flush_workqueue(afs_async_calls);
 117
 118        if (afs_spare_incoming_call) {
 119                afs_put_call(afs_spare_incoming_call);
 120                afs_spare_incoming_call = NULL;
 121        }
 122
 123        _debug("outstanding %u", atomic_read(&afs_outstanding_calls));
 124        wait_on_atomic_t(&afs_outstanding_calls, afs_wait_atomic_t,
 125                         TASK_UNINTERRUPTIBLE);
 126        _debug("no outstanding calls");
 127
 128        kernel_sock_shutdown(afs_socket, SHUT_RDWR);
 129        flush_workqueue(afs_async_calls);
 130        sock_release(afs_socket);
 131
 132        _debug("dework");
 133        destroy_workqueue(afs_async_calls);
 134        _leave("");
 135}
 136
 137/*
 138 * Allocate a call.
 139 */
 140static struct afs_call *afs_alloc_call(const struct afs_call_type *type,
 141                                       gfp_t gfp)
 142{
 143        struct afs_call *call;
 144        int o;
 145
 146        call = kzalloc(sizeof(*call), gfp);
 147        if (!call)
 148                return NULL;
 149
 150        call->type = type;
 151        atomic_set(&call->usage, 1);
 152        INIT_WORK(&call->async_work, afs_process_async_call);
 153        init_waitqueue_head(&call->waitq);
 154
 155        o = atomic_inc_return(&afs_outstanding_calls);
 156        trace_afs_call(call, afs_call_trace_alloc, 1, o,
 157                       __builtin_return_address(0));
 158        return call;
 159}
 160
 161/*
 162 * Dispose of a reference on a call.
 163 */
 164void afs_put_call(struct afs_call *call)
 165{
 166        int n = atomic_dec_return(&call->usage);
 167        int o = atomic_read(&afs_outstanding_calls);
 168
 169        trace_afs_call(call, afs_call_trace_put, n + 1, o,
 170                       __builtin_return_address(0));
 171
 172        ASSERTCMP(n, >=, 0);
 173        if (n == 0) {
 174                ASSERT(!work_pending(&call->async_work));
 175                ASSERT(call->type->name != NULL);
 176
 177                if (call->rxcall) {
 178                        rxrpc_kernel_end_call(afs_socket, call->rxcall);
 179                        call->rxcall = NULL;
 180                }
 181                if (call->type->destructor)
 182                        call->type->destructor(call);
 183
 184                kfree(call->request);
 185                kfree(call);
 186
 187                o = atomic_dec_return(&afs_outstanding_calls);
 188                trace_afs_call(call, afs_call_trace_free, 0, o,
 189                               __builtin_return_address(0));
 190                if (o == 0)
 191                        wake_up_atomic_t(&afs_outstanding_calls);
 192        }
 193}
 194
 195/*
 196 * Queue the call for actual work.  Returns 0 unconditionally for convenience.
 197 */
 198int afs_queue_call_work(struct afs_call *call)
 199{
 200        int u = atomic_inc_return(&call->usage);
 201
 202        trace_afs_call(call, afs_call_trace_work, u,
 203                       atomic_read(&afs_outstanding_calls),
 204                       __builtin_return_address(0));
 205
 206        INIT_WORK(&call->work, call->type->work);
 207
 208        if (!queue_work(afs_wq, &call->work))
 209                afs_put_call(call);
 210        return 0;
 211}
 212
 213/*
 214 * allocate a call with flat request and reply buffers
 215 */
 216struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type,
 217                                     size_t request_size, size_t reply_max)
 218{
 219        struct afs_call *call;
 220
 221        call = afs_alloc_call(type, GFP_NOFS);
 222        if (!call)
 223                goto nomem_call;
 224
 225        if (request_size) {
 226                call->request_size = request_size;
 227                call->request = kmalloc(request_size, GFP_NOFS);
 228                if (!call->request)
 229                        goto nomem_free;
 230        }
 231
 232        if (reply_max) {
 233                call->reply_max = reply_max;
 234                call->buffer = kmalloc(reply_max, GFP_NOFS);
 235                if (!call->buffer)
 236                        goto nomem_free;
 237        }
 238
 239        init_waitqueue_head(&call->waitq);
 240        return call;
 241
 242nomem_free:
 243        afs_put_call(call);
 244nomem_call:
 245        return NULL;
 246}
 247
 248/*
 249 * clean up a call with flat buffer
 250 */
 251void afs_flat_call_destructor(struct afs_call *call)
 252{
 253        _enter("");
 254
 255        kfree(call->request);
 256        call->request = NULL;
 257        kfree(call->buffer);
 258        call->buffer = NULL;
 259}
 260
 261#define AFS_BVEC_MAX 8
 262
 263/*
 264 * Load the given bvec with the next few pages.
 265 */
 266static void afs_load_bvec(struct afs_call *call, struct msghdr *msg,
 267                          struct bio_vec *bv, pgoff_t first, pgoff_t last,
 268                          unsigned offset)
 269{
 270        struct page *pages[AFS_BVEC_MAX];
 271        unsigned int nr, n, i, to, bytes = 0;
 272
 273        nr = min_t(pgoff_t, last - first + 1, AFS_BVEC_MAX);
 274        n = find_get_pages_contig(call->mapping, first, nr, pages);
 275        ASSERTCMP(n, ==, nr);
 276
 277        msg->msg_flags |= MSG_MORE;
 278        for (i = 0; i < nr; i++) {
 279                to = PAGE_SIZE;
 280                if (first + i >= last) {
 281                        to = call->last_to;
 282                        msg->msg_flags &= ~MSG_MORE;
 283                }
 284                bv[i].bv_page = pages[i];
 285                bv[i].bv_len = to - offset;
 286                bv[i].bv_offset = offset;
 287                bytes += to - offset;
 288                offset = 0;
 289        }
 290
 291        iov_iter_bvec(&msg->msg_iter, WRITE | ITER_BVEC, bv, nr, bytes);
 292}
 293
 294/*
 295 * Advance the AFS call state when the RxRPC call ends the transmit phase.
 296 */
 297static void afs_notify_end_request_tx(struct sock *sock,
 298                                      struct rxrpc_call *rxcall,
 299                                      unsigned long call_user_ID)
 300{
 301        struct afs_call *call = (struct afs_call *)call_user_ID;
 302
 303        if (call->state == AFS_CALL_REQUESTING)
 304                call->state = AFS_CALL_AWAIT_REPLY;
 305}
 306
 307/*
 308 * attach the data from a bunch of pages on an inode to a call
 309 */
 310static int afs_send_pages(struct afs_call *call, struct msghdr *msg)
 311{
 312        struct bio_vec bv[AFS_BVEC_MAX];
 313        unsigned int bytes, nr, loop, offset;
 314        pgoff_t first = call->first, last = call->last;
 315        int ret;
 316
 317        offset = call->first_offset;
 318        call->first_offset = 0;
 319
 320        do {
 321                afs_load_bvec(call, msg, bv, first, last, offset);
 322                offset = 0;
 323                bytes = msg->msg_iter.count;
 324                nr = msg->msg_iter.nr_segs;
 325
 326                ret = rxrpc_kernel_send_data(afs_socket, call->rxcall, msg,
 327                                             bytes, afs_notify_end_request_tx);
 328                for (loop = 0; loop < nr; loop++)
 329                        put_page(bv[loop].bv_page);
 330                if (ret < 0)
 331                        break;
 332
 333                first += nr;
 334        } while (first <= last);
 335
 336        return ret;
 337}
 338
 339/*
 340 * initiate a call
 341 */
 342int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
 343                  bool async)
 344{
 345        struct sockaddr_rxrpc srx;
 346        struct rxrpc_call *rxcall;
 347        struct msghdr msg;
 348        struct kvec iov[1];
 349        size_t offset;
 350        s64 tx_total_len;
 351        u32 abort_code;
 352        int ret;
 353
 354        _enter("%x,{%d},", addr->s_addr, ntohs(call->port));
 355
 356        ASSERT(call->type != NULL);
 357        ASSERT(call->type->name != NULL);
 358
 359        _debug("____MAKE %p{%s,%x} [%d]____",
 360               call, call->type->name, key_serial(call->key),
 361               atomic_read(&afs_outstanding_calls));
 362
 363        call->async = async;
 364
 365        memset(&srx, 0, sizeof(srx));
 366        srx.srx_family = AF_RXRPC;
 367        srx.srx_service = call->service_id;
 368        srx.transport_type = SOCK_DGRAM;
 369        srx.transport_len = sizeof(srx.transport.sin);
 370        srx.transport.sin.sin_family = AF_INET;
 371        srx.transport.sin.sin_port = call->port;
 372        memcpy(&srx.transport.sin.sin_addr, addr, 4);
 373
 374        /* Work out the length we're going to transmit.  This is awkward for
 375         * calls such as FS.StoreData where there's an extra injection of data
 376         * after the initial fixed part.
 377         */
 378        tx_total_len = call->request_size;
 379        if (call->send_pages) {
 380                tx_total_len += call->last_to - call->first_offset;
 381                tx_total_len += (call->last - call->first) * PAGE_SIZE;
 382        }
 383
 384        /* create a call */
 385        rxcall = rxrpc_kernel_begin_call(afs_socket, &srx, call->key,
 386                                         (unsigned long)call,
 387                                         tx_total_len, gfp,
 388                                         (async ?
 389                                          afs_wake_up_async_call :
 390                                          afs_wake_up_call_waiter));
 391        call->key = NULL;
 392        if (IS_ERR(rxcall)) {
 393                ret = PTR_ERR(rxcall);
 394                goto error_kill_call;
 395        }
 396
 397        call->rxcall = rxcall;
 398
 399        /* send the request */
 400        iov[0].iov_base = call->request;
 401        iov[0].iov_len  = call->request_size;
 402
 403        msg.msg_name            = NULL;
 404        msg.msg_namelen         = 0;
 405        iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1,
 406                      call->request_size);
 407        msg.msg_control         = NULL;
 408        msg.msg_controllen      = 0;
 409        msg.msg_flags           = (call->send_pages ? MSG_MORE : 0);
 410
 411        /* We have to change the state *before* sending the last packet as
 412         * rxrpc might give us the reply before it returns from sending the
 413         * request.  Further, if the send fails, we may already have been given
 414         * a notification and may have collected it.
 415         */
 416        if (!call->send_pages)
 417                call->state = AFS_CALL_AWAIT_REPLY;
 418        ret = rxrpc_kernel_send_data(afs_socket, rxcall,
 419                                     &msg, call->request_size,
 420                                     afs_notify_end_request_tx);
 421        if (ret < 0)
 422                goto error_do_abort;
 423
 424        if (call->send_pages) {
 425                ret = afs_send_pages(call, &msg);
 426                if (ret < 0)
 427                        goto error_do_abort;
 428        }
 429
 430        /* at this point, an async call may no longer exist as it may have
 431         * already completed */
 432        if (call->async)
 433                return -EINPROGRESS;
 434
 435        return afs_wait_for_call_to_complete(call);
 436
 437error_do_abort:
 438        call->state = AFS_CALL_COMPLETE;
 439        if (ret != -ECONNABORTED) {
 440                rxrpc_kernel_abort_call(afs_socket, rxcall, RX_USER_ABORT,
 441                                        ret, "KSD");
 442        } else {
 443                abort_code = 0;
 444                offset = 0;
 445                rxrpc_kernel_recv_data(afs_socket, rxcall, NULL, 0, &offset,
 446                                       false, &abort_code);
 447                ret = call->type->abort_to_error(abort_code);
 448        }
 449error_kill_call:
 450        afs_put_call(call);
 451        _leave(" = %d", ret);
 452        return ret;
 453}
 454
 455/*
 456 * deliver messages to a call
 457 */
 458static void afs_deliver_to_call(struct afs_call *call)
 459{
 460        u32 abort_code;
 461        int ret;
 462
 463        _enter("%s", call->type->name);
 464
 465        while (call->state == AFS_CALL_AWAIT_REPLY ||
 466               call->state == AFS_CALL_AWAIT_OP_ID ||
 467               call->state == AFS_CALL_AWAIT_REQUEST ||
 468               call->state == AFS_CALL_AWAIT_ACK
 469               ) {
 470                if (call->state == AFS_CALL_AWAIT_ACK) {
 471                        size_t offset = 0;
 472                        ret = rxrpc_kernel_recv_data(afs_socket, call->rxcall,
 473                                                     NULL, 0, &offset, false,
 474                                                     &call->abort_code);
 475                        trace_afs_recv_data(call, 0, offset, false, ret);
 476
 477                        if (ret == -EINPROGRESS || ret == -EAGAIN)
 478                                return;
 479                        if (ret == 1 || ret < 0) {
 480                                call->state = AFS_CALL_COMPLETE;
 481                                goto done;
 482                        }
 483                        return;
 484                }
 485
 486                ret = call->type->deliver(call);
 487                switch (ret) {
 488                case 0:
 489                        if (call->state == AFS_CALL_AWAIT_REPLY)
 490                                call->state = AFS_CALL_COMPLETE;
 491                        goto done;
 492                case -EINPROGRESS:
 493                case -EAGAIN:
 494                        goto out;
 495                case -ECONNABORTED:
 496                        goto call_complete;
 497                case -ENOTCONN:
 498                        abort_code = RX_CALL_DEAD;
 499                        rxrpc_kernel_abort_call(afs_socket, call->rxcall,
 500                                                abort_code, ret, "KNC");
 501                        goto save_error;
 502                case -ENOTSUPP:
 503                        abort_code = RXGEN_OPCODE;
 504                        rxrpc_kernel_abort_call(afs_socket, call->rxcall,
 505                                                abort_code, ret, "KIV");
 506                        goto save_error;
 507                case -ENODATA:
 508                case -EBADMSG:
 509                case -EMSGSIZE:
 510                default:
 511                        abort_code = RXGEN_CC_UNMARSHAL;
 512                        if (call->state != AFS_CALL_AWAIT_REPLY)
 513                                abort_code = RXGEN_SS_UNMARSHAL;
 514                        rxrpc_kernel_abort_call(afs_socket, call->rxcall,
 515                                                abort_code, -EBADMSG, "KUM");
 516                        goto save_error;
 517                }
 518        }
 519
 520done:
 521        if (call->state == AFS_CALL_COMPLETE && call->incoming)
 522                afs_put_call(call);
 523out:
 524        _leave("");
 525        return;
 526
 527save_error:
 528        call->error = ret;
 529call_complete:
 530        call->state = AFS_CALL_COMPLETE;
 531        goto done;
 532}
 533
 534/*
 535 * wait synchronously for a call to complete
 536 */
 537static int afs_wait_for_call_to_complete(struct afs_call *call)
 538{
 539        int ret;
 540
 541        DECLARE_WAITQUEUE(myself, current);
 542
 543        _enter("");
 544
 545        add_wait_queue(&call->waitq, &myself);
 546        for (;;) {
 547                set_current_state(TASK_INTERRUPTIBLE);
 548
 549                /* deliver any messages that are in the queue */
 550                if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
 551                        call->need_attention = false;
 552                        __set_current_state(TASK_RUNNING);
 553                        afs_deliver_to_call(call);
 554                        continue;
 555                }
 556
 557                if (call->state == AFS_CALL_COMPLETE ||
 558                    signal_pending(current))
 559                        break;
 560                schedule();
 561        }
 562
 563        remove_wait_queue(&call->waitq, &myself);
 564        __set_current_state(TASK_RUNNING);
 565
 566        /* Kill off the call if it's still live. */
 567        if (call->state < AFS_CALL_COMPLETE) {
 568                _debug("call interrupted");
 569                rxrpc_kernel_abort_call(afs_socket, call->rxcall,
 570                                        RX_USER_ABORT, -EINTR, "KWI");
 571        }
 572
 573        ret = call->error;
 574        _debug("call complete");
 575        afs_put_call(call);
 576        _leave(" = %d", ret);
 577        return ret;
 578}
 579
 580/*
 581 * wake up a waiting call
 582 */
 583static void afs_wake_up_call_waiter(struct sock *sk, struct rxrpc_call *rxcall,
 584                                    unsigned long call_user_ID)
 585{
 586        struct afs_call *call = (struct afs_call *)call_user_ID;
 587
 588        call->need_attention = true;
 589        wake_up(&call->waitq);
 590}
 591
 592/*
 593 * wake up an asynchronous call
 594 */
 595static void afs_wake_up_async_call(struct sock *sk, struct rxrpc_call *rxcall,
 596                                   unsigned long call_user_ID)
 597{
 598        struct afs_call *call = (struct afs_call *)call_user_ID;
 599        int u;
 600
 601        trace_afs_notify_call(rxcall, call);
 602        call->need_attention = true;
 603
 604        u = __atomic_add_unless(&call->usage, 1, 0);
 605        if (u != 0) {
 606                trace_afs_call(call, afs_call_trace_wake, u,
 607                               atomic_read(&afs_outstanding_calls),
 608                               __builtin_return_address(0));
 609
 610                if (!queue_work(afs_async_calls, &call->async_work))
 611                        afs_put_call(call);
 612        }
 613}
 614
 615/*
 616 * Delete an asynchronous call.  The work item carries a ref to the call struct
 617 * that we need to release.
 618 */
 619static void afs_delete_async_call(struct work_struct *work)
 620{
 621        struct afs_call *call = container_of(work, struct afs_call, async_work);
 622
 623        _enter("");
 624
 625        afs_put_call(call);
 626
 627        _leave("");
 628}
 629
 630/*
 631 * Perform I/O processing on an asynchronous call.  The work item carries a ref
 632 * to the call struct that we either need to release or to pass on.
 633 */
 634static void afs_process_async_call(struct work_struct *work)
 635{
 636        struct afs_call *call = container_of(work, struct afs_call, async_work);
 637
 638        _enter("");
 639
 640        if (call->state < AFS_CALL_COMPLETE && call->need_attention) {
 641                call->need_attention = false;
 642                afs_deliver_to_call(call);
 643        }
 644
 645        if (call->state == AFS_CALL_COMPLETE) {
 646                call->reply = NULL;
 647
 648                /* We have two refs to release - one from the alloc and one
 649                 * queued with the work item - and we can't just deallocate the
 650                 * call because the work item may be queued again.
 651                 */
 652                call->async_work.func = afs_delete_async_call;
 653                if (!queue_work(afs_async_calls, &call->async_work))
 654                        afs_put_call(call);
 655        }
 656
 657        afs_put_call(call);
 658        _leave("");
 659}
 660
 661static void afs_rx_attach(struct rxrpc_call *rxcall, unsigned long user_call_ID)
 662{
 663        struct afs_call *call = (struct afs_call *)user_call_ID;
 664
 665        call->rxcall = rxcall;
 666}
 667
 668/*
 669 * Charge the incoming call preallocation.
 670 */
 671static void afs_charge_preallocation(struct work_struct *work)
 672{
 673        struct afs_call *call = afs_spare_incoming_call;
 674
 675        for (;;) {
 676                if (!call) {
 677                        call = afs_alloc_call(&afs_RXCMxxxx, GFP_KERNEL);
 678                        if (!call)
 679                                break;
 680
 681                        call->async = true;
 682                        call->state = AFS_CALL_AWAIT_OP_ID;
 683                        init_waitqueue_head(&call->waitq);
 684                }
 685
 686                if (rxrpc_kernel_charge_accept(afs_socket,
 687                                               afs_wake_up_async_call,
 688                                               afs_rx_attach,
 689                                               (unsigned long)call,
 690                                               GFP_KERNEL) < 0)
 691                        break;
 692                call = NULL;
 693        }
 694        afs_spare_incoming_call = call;
 695}
 696
 697/*
 698 * Discard a preallocated call when a socket is shut down.
 699 */
 700static void afs_rx_discard_new_call(struct rxrpc_call *rxcall,
 701                                    unsigned long user_call_ID)
 702{
 703        struct afs_call *call = (struct afs_call *)user_call_ID;
 704
 705        call->rxcall = NULL;
 706        afs_put_call(call);
 707}
 708
 709/*
 710 * Notification of an incoming call.
 711 */
 712static void afs_rx_new_call(struct sock *sk, struct rxrpc_call *rxcall,
 713                            unsigned long user_call_ID)
 714{
 715        queue_work(afs_wq, &afs_charge_preallocation_work);
 716}
 717
 718/*
 719 * Grab the operation ID from an incoming cache manager call.  The socket
 720 * buffer is discarded on error or if we don't yet have sufficient data.
 721 */
 722static int afs_deliver_cm_op_id(struct afs_call *call)
 723{
 724        int ret;
 725
 726        _enter("{%zu}", call->offset);
 727
 728        ASSERTCMP(call->offset, <, 4);
 729
 730        /* the operation ID forms the first four bytes of the request data */
 731        ret = afs_extract_data(call, &call->tmp, 4, true);
 732        if (ret < 0)
 733                return ret;
 734
 735        call->operation_ID = ntohl(call->tmp);
 736        call->state = AFS_CALL_AWAIT_REQUEST;
 737        call->offset = 0;
 738
 739        /* ask the cache manager to route the call (it'll change the call type
 740         * if successful) */
 741        if (!afs_cm_incoming_call(call))
 742                return -ENOTSUPP;
 743
 744        trace_afs_cb_call(call);
 745
 746        /* pass responsibility for the remainer of this message off to the
 747         * cache manager op */
 748        return call->type->deliver(call);
 749}
 750
 751/*
 752 * Advance the AFS call state when an RxRPC service call ends the transmit
 753 * phase.
 754 */
 755static void afs_notify_end_reply_tx(struct sock *sock,
 756                                    struct rxrpc_call *rxcall,
 757                                    unsigned long call_user_ID)
 758{
 759        struct afs_call *call = (struct afs_call *)call_user_ID;
 760
 761        if (call->state == AFS_CALL_REPLYING)
 762                call->state = AFS_CALL_AWAIT_ACK;
 763}
 764
 765/*
 766 * send an empty reply
 767 */
 768void afs_send_empty_reply(struct afs_call *call)
 769{
 770        struct msghdr msg;
 771
 772        _enter("");
 773
 774        rxrpc_kernel_set_tx_length(afs_socket, call->rxcall, 0);
 775
 776        msg.msg_name            = NULL;
 777        msg.msg_namelen         = 0;
 778        iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, NULL, 0, 0);
 779        msg.msg_control         = NULL;
 780        msg.msg_controllen      = 0;
 781        msg.msg_flags           = 0;
 782
 783        call->state = AFS_CALL_AWAIT_ACK;
 784        switch (rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, 0,
 785                                       afs_notify_end_reply_tx)) {
 786        case 0:
 787                _leave(" [replied]");
 788                return;
 789
 790        case -ENOMEM:
 791                _debug("oom");
 792                rxrpc_kernel_abort_call(afs_socket, call->rxcall,
 793                                        RX_USER_ABORT, -ENOMEM, "KOO");
 794        default:
 795                _leave(" [error]");
 796                return;
 797        }
 798}
 799
 800/*
 801 * send a simple reply
 802 */
 803void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
 804{
 805        struct msghdr msg;
 806        struct kvec iov[1];
 807        int n;
 808
 809        _enter("");
 810
 811        rxrpc_kernel_set_tx_length(afs_socket, call->rxcall, len);
 812
 813        iov[0].iov_base         = (void *) buf;
 814        iov[0].iov_len          = len;
 815        msg.msg_name            = NULL;
 816        msg.msg_namelen         = 0;
 817        iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1, len);
 818        msg.msg_control         = NULL;
 819        msg.msg_controllen      = 0;
 820        msg.msg_flags           = 0;
 821
 822        call->state = AFS_CALL_AWAIT_ACK;
 823        n = rxrpc_kernel_send_data(afs_socket, call->rxcall, &msg, len,
 824                                   afs_notify_end_reply_tx);
 825        if (n >= 0) {
 826                /* Success */
 827                _leave(" [replied]");
 828                return;
 829        }
 830
 831        if (n == -ENOMEM) {
 832                _debug("oom");
 833                rxrpc_kernel_abort_call(afs_socket, call->rxcall,
 834                                        RX_USER_ABORT, -ENOMEM, "KOO");
 835        }
 836        _leave(" [error]");
 837}
 838
 839/*
 840 * Extract a piece of data from the received data socket buffers.
 841 */
 842int afs_extract_data(struct afs_call *call, void *buf, size_t count,
 843                     bool want_more)
 844{
 845        int ret;
 846
 847        _enter("{%s,%zu},,%zu,%d",
 848               call->type->name, call->offset, count, want_more);
 849
 850        ASSERTCMP(call->offset, <=, count);
 851
 852        ret = rxrpc_kernel_recv_data(afs_socket, call->rxcall,
 853                                     buf, count, &call->offset,
 854                                     want_more, &call->abort_code);
 855        trace_afs_recv_data(call, count, call->offset, want_more, ret);
 856        if (ret == 0 || ret == -EAGAIN)
 857                return ret;
 858
 859        if (ret == 1) {
 860                switch (call->state) {
 861                case AFS_CALL_AWAIT_REPLY:
 862                        call->state = AFS_CALL_COMPLETE;
 863                        break;
 864                case AFS_CALL_AWAIT_REQUEST:
 865                        call->state = AFS_CALL_REPLYING;
 866                        break;
 867                default:
 868                        break;
 869                }
 870                return 0;
 871        }
 872
 873        if (ret == -ECONNABORTED)
 874                call->error = call->type->abort_to_error(call->abort_code);
 875        else
 876                call->error = ret;
 877        call->state = AFS_CALL_COMPLETE;
 878        return ret;
 879}
 880