linux/fs/afs/rxrpc.c
<<
>>
Prefs
   1/* Maintain an RxRPC server socket to do AFS communications through
   2 *
   3 * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
   4 * Written by David Howells (dhowells@redhat.com)
   5 *
   6 * This program is free software; you can redistribute it and/or
   7 * modify it under the terms of the GNU General Public License
   8 * as published by the Free Software Foundation; either version
   9 * 2 of the License, or (at your option) any later version.
  10 */
  11
  12#include <linux/slab.h>
  13#include <net/sock.h>
  14#include <net/af_rxrpc.h>
  15#include <rxrpc/packet.h>
  16#include "internal.h"
  17#include "afs_cm.h"
  18
  19static struct socket *afs_socket; /* my RxRPC socket */
  20static struct workqueue_struct *afs_async_calls;
  21static atomic_t afs_outstanding_calls;
  22static atomic_t afs_outstanding_skbs;
  23
  24static void afs_wake_up_call_waiter(struct afs_call *);
  25static int afs_wait_for_call_to_complete(struct afs_call *);
  26static void afs_wake_up_async_call(struct afs_call *);
  27static int afs_dont_wait_for_call_to_complete(struct afs_call *);
  28static void afs_process_async_call(struct afs_call *);
  29static void afs_rx_interceptor(struct sock *, unsigned long, struct sk_buff *);
  30static int afs_deliver_cm_op_id(struct afs_call *, struct sk_buff *, bool);
  31
  32/* synchronous call management */
  33const struct afs_wait_mode afs_sync_call = {
  34        .rx_wakeup      = afs_wake_up_call_waiter,
  35        .wait           = afs_wait_for_call_to_complete,
  36};
  37
  38/* asynchronous call management */
  39const struct afs_wait_mode afs_async_call = {
  40        .rx_wakeup      = afs_wake_up_async_call,
  41        .wait           = afs_dont_wait_for_call_to_complete,
  42};
  43
  44/* asynchronous incoming call management */
  45static const struct afs_wait_mode afs_async_incoming_call = {
  46        .rx_wakeup      = afs_wake_up_async_call,
  47};
  48
  49/* asynchronous incoming call initial processing */
  50static const struct afs_call_type afs_RXCMxxxx = {
  51        .name           = "CB.xxxx",
  52        .deliver        = afs_deliver_cm_op_id,
  53        .abort_to_error = afs_abort_to_error,
  54};
  55
  56static void afs_collect_incoming_call(struct work_struct *);
  57
  58static struct sk_buff_head afs_incoming_calls;
  59static DECLARE_WORK(afs_collect_incoming_call_work, afs_collect_incoming_call);
  60
  61static void afs_async_workfn(struct work_struct *work)
  62{
  63        struct afs_call *call = container_of(work, struct afs_call, async_work);
  64
  65        call->async_workfn(call);
  66}
  67
  68static int afs_wait_atomic_t(atomic_t *p)
  69{
  70        schedule();
  71        return 0;
  72}
  73
  74/*
  75 * open an RxRPC socket and bind it to be a server for callback notifications
  76 * - the socket is left in blocking mode and non-blocking ops use MSG_DONTWAIT
  77 */
  78int afs_open_socket(void)
  79{
  80        struct sockaddr_rxrpc srx;
  81        struct socket *socket;
  82        int ret;
  83
  84        _enter("");
  85
  86        skb_queue_head_init(&afs_incoming_calls);
  87
  88        ret = -ENOMEM;
  89        afs_async_calls = create_singlethread_workqueue("kafsd");
  90        if (!afs_async_calls)
  91                goto error_0;
  92
  93        ret = sock_create_kern(&init_net, AF_RXRPC, SOCK_DGRAM, PF_INET, &socket);
  94        if (ret < 0)
  95                goto error_1;
  96
  97        socket->sk->sk_allocation = GFP_NOFS;
  98
  99        /* bind the callback manager's address to make this a server socket */
 100        srx.srx_family                  = AF_RXRPC;
 101        srx.srx_service                 = CM_SERVICE;
 102        srx.transport_type              = SOCK_DGRAM;
 103        srx.transport_len               = sizeof(srx.transport.sin);
 104        srx.transport.sin.sin_family    = AF_INET;
 105        srx.transport.sin.sin_port      = htons(AFS_CM_PORT);
 106        memset(&srx.transport.sin.sin_addr, 0,
 107               sizeof(srx.transport.sin.sin_addr));
 108
 109        ret = kernel_bind(socket, (struct sockaddr *) &srx, sizeof(srx));
 110        if (ret < 0)
 111                goto error_2;
 112
 113        ret = kernel_listen(socket, INT_MAX);
 114        if (ret < 0)
 115                goto error_2;
 116
 117        rxrpc_kernel_intercept_rx_messages(socket, afs_rx_interceptor);
 118
 119        afs_socket = socket;
 120        _leave(" = 0");
 121        return 0;
 122
 123error_2:
 124        sock_release(socket);
 125error_1:
 126        destroy_workqueue(afs_async_calls);
 127error_0:
 128        _leave(" = %d", ret);
 129        return ret;
 130}
 131
 132/*
 133 * close the RxRPC socket AFS was using
 134 */
 135void afs_close_socket(void)
 136{
 137        _enter("");
 138
 139        wait_on_atomic_t(&afs_outstanding_calls, afs_wait_atomic_t,
 140                         TASK_UNINTERRUPTIBLE);
 141        _debug("no outstanding calls");
 142
 143        sock_release(afs_socket);
 144
 145        _debug("dework");
 146        destroy_workqueue(afs_async_calls);
 147
 148        ASSERTCMP(atomic_read(&afs_outstanding_skbs), ==, 0);
 149        _leave("");
 150}
 151
 152/*
 153 * Note that the data in a socket buffer is now consumed.
 154 */
 155void afs_data_consumed(struct afs_call *call, struct sk_buff *skb)
 156{
 157        if (!skb) {
 158                _debug("DLVR NULL [%d]", atomic_read(&afs_outstanding_skbs));
 159                dump_stack();
 160        } else {
 161                _debug("DLVR %p{%u} [%d]",
 162                       skb, skb->mark, atomic_read(&afs_outstanding_skbs));
 163                rxrpc_kernel_data_consumed(call->rxcall, skb);
 164        }
 165}
 166
 167/*
 168 * free a socket buffer
 169 */
 170static void afs_free_skb(struct sk_buff *skb)
 171{
 172        if (!skb) {
 173                _debug("FREE NULL [%d]", atomic_read(&afs_outstanding_skbs));
 174                dump_stack();
 175        } else {
 176                _debug("FREE %p{%u} [%d]",
 177                       skb, skb->mark, atomic_read(&afs_outstanding_skbs));
 178                if (atomic_dec_return(&afs_outstanding_skbs) == -1)
 179                        BUG();
 180                rxrpc_kernel_free_skb(skb);
 181        }
 182}
 183
 184/*
 185 * free a call
 186 */
 187static void afs_free_call(struct afs_call *call)
 188{
 189        _debug("DONE %p{%s} [%d]",
 190               call, call->type->name, atomic_read(&afs_outstanding_calls));
 191
 192        ASSERTCMP(call->rxcall, ==, NULL);
 193        ASSERT(!work_pending(&call->async_work));
 194        ASSERT(skb_queue_empty(&call->rx_queue));
 195        ASSERT(call->type->name != NULL);
 196
 197        kfree(call->request);
 198        kfree(call);
 199
 200        if (atomic_dec_and_test(&afs_outstanding_calls))
 201                wake_up_atomic_t(&afs_outstanding_calls);
 202}
 203
 204/*
 205 * End a call but do not free it
 206 */
 207static void afs_end_call_nofree(struct afs_call *call)
 208{
 209        if (call->rxcall) {
 210                rxrpc_kernel_end_call(call->rxcall);
 211                call->rxcall = NULL;
 212        }
 213        if (call->type->destructor)
 214                call->type->destructor(call);
 215}
 216
 217/*
 218 * End a call and free it
 219 */
 220static void afs_end_call(struct afs_call *call)
 221{
 222        afs_end_call_nofree(call);
 223        afs_free_call(call);
 224}
 225
 226/*
 227 * allocate a call with flat request and reply buffers
 228 */
 229struct afs_call *afs_alloc_flat_call(const struct afs_call_type *type,
 230                                     size_t request_size, size_t reply_size)
 231{
 232        struct afs_call *call;
 233
 234        call = kzalloc(sizeof(*call), GFP_NOFS);
 235        if (!call)
 236                goto nomem_call;
 237
 238        _debug("CALL %p{%s} [%d]",
 239               call, type->name, atomic_read(&afs_outstanding_calls));
 240        atomic_inc(&afs_outstanding_calls);
 241
 242        call->type = type;
 243        call->request_size = request_size;
 244        call->reply_max = reply_size;
 245
 246        if (request_size) {
 247                call->request = kmalloc(request_size, GFP_NOFS);
 248                if (!call->request)
 249                        goto nomem_free;
 250        }
 251
 252        if (reply_size) {
 253                call->buffer = kmalloc(reply_size, GFP_NOFS);
 254                if (!call->buffer)
 255                        goto nomem_free;
 256        }
 257
 258        init_waitqueue_head(&call->waitq);
 259        skb_queue_head_init(&call->rx_queue);
 260        return call;
 261
 262nomem_free:
 263        afs_free_call(call);
 264nomem_call:
 265        return NULL;
 266}
 267
 268/*
 269 * clean up a call with flat buffer
 270 */
 271void afs_flat_call_destructor(struct afs_call *call)
 272{
 273        _enter("");
 274
 275        kfree(call->request);
 276        call->request = NULL;
 277        kfree(call->buffer);
 278        call->buffer = NULL;
 279}
 280
 281/*
 282 * attach the data from a bunch of pages on an inode to a call
 283 */
 284static int afs_send_pages(struct afs_call *call, struct msghdr *msg,
 285                          struct kvec *iov)
 286{
 287        struct page *pages[8];
 288        unsigned count, n, loop, offset, to;
 289        pgoff_t first = call->first, last = call->last;
 290        int ret;
 291
 292        _enter("");
 293
 294        offset = call->first_offset;
 295        call->first_offset = 0;
 296
 297        do {
 298                _debug("attach %lx-%lx", first, last);
 299
 300                count = last - first + 1;
 301                if (count > ARRAY_SIZE(pages))
 302                        count = ARRAY_SIZE(pages);
 303                n = find_get_pages_contig(call->mapping, first, count, pages);
 304                ASSERTCMP(n, ==, count);
 305
 306                loop = 0;
 307                do {
 308                        msg->msg_flags = 0;
 309                        to = PAGE_SIZE;
 310                        if (first + loop >= last)
 311                                to = call->last_to;
 312                        else
 313                                msg->msg_flags = MSG_MORE;
 314                        iov->iov_base = kmap(pages[loop]) + offset;
 315                        iov->iov_len = to - offset;
 316                        offset = 0;
 317
 318                        _debug("- range %u-%u%s",
 319                               offset, to, msg->msg_flags ? " [more]" : "");
 320                        iov_iter_kvec(&msg->msg_iter, WRITE | ITER_KVEC,
 321                                      iov, 1, to - offset);
 322
 323                        /* have to change the state *before* sending the last
 324                         * packet as RxRPC might give us the reply before it
 325                         * returns from sending the request */
 326                        if (first + loop >= last)
 327                                call->state = AFS_CALL_AWAIT_REPLY;
 328                        ret = rxrpc_kernel_send_data(call->rxcall, msg,
 329                                                     to - offset);
 330                        kunmap(pages[loop]);
 331                        if (ret < 0)
 332                                break;
 333                } while (++loop < count);
 334                first += count;
 335
 336                for (loop = 0; loop < count; loop++)
 337                        put_page(pages[loop]);
 338                if (ret < 0)
 339                        break;
 340        } while (first <= last);
 341
 342        _leave(" = %d", ret);
 343        return ret;
 344}
 345
 346/*
 347 * initiate a call
 348 */
 349int afs_make_call(struct in_addr *addr, struct afs_call *call, gfp_t gfp,
 350                  const struct afs_wait_mode *wait_mode)
 351{
 352        struct sockaddr_rxrpc srx;
 353        struct rxrpc_call *rxcall;
 354        struct msghdr msg;
 355        struct kvec iov[1];
 356        int ret;
 357        struct sk_buff *skb;
 358
 359        _enter("%x,{%d},", addr->s_addr, ntohs(call->port));
 360
 361        ASSERT(call->type != NULL);
 362        ASSERT(call->type->name != NULL);
 363
 364        _debug("____MAKE %p{%s,%x} [%d]____",
 365               call, call->type->name, key_serial(call->key),
 366               atomic_read(&afs_outstanding_calls));
 367
 368        call->wait_mode = wait_mode;
 369        call->async_workfn = afs_process_async_call;
 370        INIT_WORK(&call->async_work, afs_async_workfn);
 371
 372        memset(&srx, 0, sizeof(srx));
 373        srx.srx_family = AF_RXRPC;
 374        srx.srx_service = call->service_id;
 375        srx.transport_type = SOCK_DGRAM;
 376        srx.transport_len = sizeof(srx.transport.sin);
 377        srx.transport.sin.sin_family = AF_INET;
 378        srx.transport.sin.sin_port = call->port;
 379        memcpy(&srx.transport.sin.sin_addr, addr, 4);
 380
 381        /* create a call */
 382        rxcall = rxrpc_kernel_begin_call(afs_socket, &srx, call->key,
 383                                         (unsigned long) call, gfp);
 384        call->key = NULL;
 385        if (IS_ERR(rxcall)) {
 386                ret = PTR_ERR(rxcall);
 387                goto error_kill_call;
 388        }
 389
 390        call->rxcall = rxcall;
 391
 392        /* send the request */
 393        iov[0].iov_base = call->request;
 394        iov[0].iov_len  = call->request_size;
 395
 396        msg.msg_name            = NULL;
 397        msg.msg_namelen         = 0;
 398        iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1,
 399                      call->request_size);
 400        msg.msg_control         = NULL;
 401        msg.msg_controllen      = 0;
 402        msg.msg_flags           = (call->send_pages ? MSG_MORE : 0);
 403
 404        /* have to change the state *before* sending the last packet as RxRPC
 405         * might give us the reply before it returns from sending the
 406         * request */
 407        if (!call->send_pages)
 408                call->state = AFS_CALL_AWAIT_REPLY;
 409        ret = rxrpc_kernel_send_data(rxcall, &msg, call->request_size);
 410        if (ret < 0)
 411                goto error_do_abort;
 412
 413        if (call->send_pages) {
 414                ret = afs_send_pages(call, &msg, iov);
 415                if (ret < 0)
 416                        goto error_do_abort;
 417        }
 418
 419        /* at this point, an async call may no longer exist as it may have
 420         * already completed */
 421        return wait_mode->wait(call);
 422
 423error_do_abort:
 424        rxrpc_kernel_abort_call(rxcall, RX_USER_ABORT);
 425        while ((skb = skb_dequeue(&call->rx_queue)))
 426                afs_free_skb(skb);
 427error_kill_call:
 428        afs_end_call(call);
 429        _leave(" = %d", ret);
 430        return ret;
 431}
 432
 433/*
 434 * Handles intercepted messages that were arriving in the socket's Rx queue.
 435 *
 436 * Called from the AF_RXRPC call processor in waitqueue process context.  For
 437 * each call, it is guaranteed this will be called in order of packet to be
 438 * delivered.
 439 */
 440static void afs_rx_interceptor(struct sock *sk, unsigned long user_call_ID,
 441                               struct sk_buff *skb)
 442{
 443        struct afs_call *call = (struct afs_call *) user_call_ID;
 444
 445        _enter("%p,,%u", call, skb->mark);
 446
 447        _debug("ICPT %p{%u} [%d]",
 448               skb, skb->mark, atomic_read(&afs_outstanding_skbs));
 449
 450        ASSERTCMP(sk, ==, afs_socket->sk);
 451        atomic_inc(&afs_outstanding_skbs);
 452
 453        if (!call) {
 454                /* its an incoming call for our callback service */
 455                skb_queue_tail(&afs_incoming_calls, skb);
 456                queue_work(afs_wq, &afs_collect_incoming_call_work);
 457        } else {
 458                /* route the messages directly to the appropriate call */
 459                skb_queue_tail(&call->rx_queue, skb);
 460                call->wait_mode->rx_wakeup(call);
 461        }
 462
 463        _leave("");
 464}
 465
 466/*
 467 * deliver messages to a call
 468 */
 469static void afs_deliver_to_call(struct afs_call *call)
 470{
 471        struct sk_buff *skb;
 472        bool last;
 473        u32 abort_code;
 474        int ret;
 475
 476        _enter("");
 477
 478        while ((call->state == AFS_CALL_AWAIT_REPLY ||
 479                call->state == AFS_CALL_AWAIT_OP_ID ||
 480                call->state == AFS_CALL_AWAIT_REQUEST ||
 481                call->state == AFS_CALL_AWAIT_ACK) &&
 482               (skb = skb_dequeue(&call->rx_queue))) {
 483                switch (skb->mark) {
 484                case RXRPC_SKB_MARK_DATA:
 485                        _debug("Rcv DATA");
 486                        last = rxrpc_kernel_is_data_last(skb);
 487                        ret = call->type->deliver(call, skb, last);
 488                        switch (ret) {
 489                        case -EAGAIN:
 490                                if (last) {
 491                                        _debug("short data");
 492                                        goto unmarshal_error;
 493                                }
 494                                break;
 495                        case 0:
 496                                ASSERT(last);
 497                                if (call->state == AFS_CALL_AWAIT_REPLY)
 498                                        call->state = AFS_CALL_COMPLETE;
 499                                break;
 500                        case -ENOTCONN:
 501                                abort_code = RX_CALL_DEAD;
 502                                goto do_abort;
 503                        case -ENOTSUPP:
 504                                abort_code = RX_INVALID_OPERATION;
 505                                goto do_abort;
 506                        default:
 507                        unmarshal_error:
 508                                abort_code = RXGEN_CC_UNMARSHAL;
 509                                if (call->state != AFS_CALL_AWAIT_REPLY)
 510                                        abort_code = RXGEN_SS_UNMARSHAL;
 511                        do_abort:
 512                                rxrpc_kernel_abort_call(call->rxcall,
 513                                                        abort_code);
 514                                call->error = ret;
 515                                call->state = AFS_CALL_ERROR;
 516                                break;
 517                        }
 518                        break;
 519                case RXRPC_SKB_MARK_FINAL_ACK:
 520                        _debug("Rcv ACK");
 521                        call->state = AFS_CALL_COMPLETE;
 522                        break;
 523                case RXRPC_SKB_MARK_BUSY:
 524                        _debug("Rcv BUSY");
 525                        call->error = -EBUSY;
 526                        call->state = AFS_CALL_BUSY;
 527                        break;
 528                case RXRPC_SKB_MARK_REMOTE_ABORT:
 529                        abort_code = rxrpc_kernel_get_abort_code(skb);
 530                        call->error = call->type->abort_to_error(abort_code);
 531                        call->state = AFS_CALL_ABORTED;
 532                        _debug("Rcv ABORT %u -> %d", abort_code, call->error);
 533                        break;
 534                case RXRPC_SKB_MARK_LOCAL_ABORT:
 535                        abort_code = rxrpc_kernel_get_abort_code(skb);
 536                        call->error = call->type->abort_to_error(abort_code);
 537                        call->state = AFS_CALL_ABORTED;
 538                        _debug("Loc ABORT %u -> %d", abort_code, call->error);
 539                        break;
 540                case RXRPC_SKB_MARK_NET_ERROR:
 541                        call->error = -rxrpc_kernel_get_error_number(skb);
 542                        call->state = AFS_CALL_ERROR;
 543                        _debug("Rcv NET ERROR %d", call->error);
 544                        break;
 545                case RXRPC_SKB_MARK_LOCAL_ERROR:
 546                        call->error = -rxrpc_kernel_get_error_number(skb);
 547                        call->state = AFS_CALL_ERROR;
 548                        _debug("Rcv LOCAL ERROR %d", call->error);
 549                        break;
 550                default:
 551                        BUG();
 552                        break;
 553                }
 554
 555                afs_free_skb(skb);
 556        }
 557
 558        /* make sure the queue is empty if the call is done with (we might have
 559         * aborted the call early because of an unmarshalling error) */
 560        if (call->state >= AFS_CALL_COMPLETE) {
 561                while ((skb = skb_dequeue(&call->rx_queue)))
 562                        afs_free_skb(skb);
 563                if (call->incoming)
 564                        afs_end_call(call);
 565        }
 566
 567        _leave("");
 568}
 569
 570/*
 571 * wait synchronously for a call to complete
 572 */
 573static int afs_wait_for_call_to_complete(struct afs_call *call)
 574{
 575        struct sk_buff *skb;
 576        int ret;
 577
 578        DECLARE_WAITQUEUE(myself, current);
 579
 580        _enter("");
 581
 582        add_wait_queue(&call->waitq, &myself);
 583        for (;;) {
 584                set_current_state(TASK_INTERRUPTIBLE);
 585
 586                /* deliver any messages that are in the queue */
 587                if (!skb_queue_empty(&call->rx_queue)) {
 588                        __set_current_state(TASK_RUNNING);
 589                        afs_deliver_to_call(call);
 590                        continue;
 591                }
 592
 593                ret = call->error;
 594                if (call->state >= AFS_CALL_COMPLETE)
 595                        break;
 596                ret = -EINTR;
 597                if (signal_pending(current))
 598                        break;
 599                schedule();
 600        }
 601
 602        remove_wait_queue(&call->waitq, &myself);
 603        __set_current_state(TASK_RUNNING);
 604
 605        /* kill the call */
 606        if (call->state < AFS_CALL_COMPLETE) {
 607                _debug("call incomplete");
 608                rxrpc_kernel_abort_call(call->rxcall, RX_CALL_DEAD);
 609                while ((skb = skb_dequeue(&call->rx_queue)))
 610                        afs_free_skb(skb);
 611        }
 612
 613        _debug("call complete");
 614        afs_end_call(call);
 615        _leave(" = %d", ret);
 616        return ret;
 617}
 618
 619/*
 620 * wake up a waiting call
 621 */
 622static void afs_wake_up_call_waiter(struct afs_call *call)
 623{
 624        wake_up(&call->waitq);
 625}
 626
 627/*
 628 * wake up an asynchronous call
 629 */
 630static void afs_wake_up_async_call(struct afs_call *call)
 631{
 632        _enter("");
 633        queue_work(afs_async_calls, &call->async_work);
 634}
 635
 636/*
 637 * put a call into asynchronous mode
 638 * - mustn't touch the call descriptor as the call my have completed by the
 639 *   time we get here
 640 */
 641static int afs_dont_wait_for_call_to_complete(struct afs_call *call)
 642{
 643        _enter("");
 644        return -EINPROGRESS;
 645}
 646
 647/*
 648 * delete an asynchronous call
 649 */
 650static void afs_delete_async_call(struct afs_call *call)
 651{
 652        _enter("");
 653
 654        afs_free_call(call);
 655
 656        _leave("");
 657}
 658
 659/*
 660 * perform processing on an asynchronous call
 661 * - on a multiple-thread workqueue this work item may try to run on several
 662 *   CPUs at the same time
 663 */
 664static void afs_process_async_call(struct afs_call *call)
 665{
 666        _enter("");
 667
 668        if (!skb_queue_empty(&call->rx_queue))
 669                afs_deliver_to_call(call);
 670
 671        if (call->state >= AFS_CALL_COMPLETE && call->wait_mode) {
 672                if (call->wait_mode->async_complete)
 673                        call->wait_mode->async_complete(call->reply,
 674                                                        call->error);
 675                call->reply = NULL;
 676
 677                /* kill the call */
 678                afs_end_call_nofree(call);
 679
 680                /* we can't just delete the call because the work item may be
 681                 * queued */
 682                call->async_workfn = afs_delete_async_call;
 683                queue_work(afs_async_calls, &call->async_work);
 684        }
 685
 686        _leave("");
 687}
 688
 689/*
 690 * Empty a socket buffer into a flat reply buffer.
 691 */
 692int afs_transfer_reply(struct afs_call *call, struct sk_buff *skb, bool last)
 693{
 694        size_t len = skb->len;
 695
 696        if (len > call->reply_max - call->reply_size) {
 697                _leave(" = -EBADMSG [%zu > %u]",
 698                       len, call->reply_max - call->reply_size);
 699                return -EBADMSG;
 700        }
 701
 702        if (len > 0) {
 703                if (skb_copy_bits(skb, 0, call->buffer + call->reply_size,
 704                                  len) < 0)
 705                        BUG();
 706                call->reply_size += len;
 707        }
 708
 709        afs_data_consumed(call, skb);
 710        if (!last)
 711                return -EAGAIN;
 712
 713        if (call->reply_size != call->reply_max) {
 714                _leave(" = -EBADMSG [%u != %u]",
 715                       call->reply_size, call->reply_max);
 716                return -EBADMSG;
 717        }
 718        return 0;
 719}
 720
 721/*
 722 * accept the backlog of incoming calls
 723 */
 724static void afs_collect_incoming_call(struct work_struct *work)
 725{
 726        struct rxrpc_call *rxcall;
 727        struct afs_call *call = NULL;
 728        struct sk_buff *skb;
 729
 730        while ((skb = skb_dequeue(&afs_incoming_calls))) {
 731                _debug("new call");
 732
 733                /* don't need the notification */
 734                afs_free_skb(skb);
 735
 736                if (!call) {
 737                        call = kzalloc(sizeof(struct afs_call), GFP_KERNEL);
 738                        if (!call) {
 739                                rxrpc_kernel_reject_call(afs_socket);
 740                                return;
 741                        }
 742
 743                        call->async_workfn = afs_process_async_call;
 744                        INIT_WORK(&call->async_work, afs_async_workfn);
 745                        call->wait_mode = &afs_async_incoming_call;
 746                        call->type = &afs_RXCMxxxx;
 747                        init_waitqueue_head(&call->waitq);
 748                        skb_queue_head_init(&call->rx_queue);
 749                        call->state = AFS_CALL_AWAIT_OP_ID;
 750
 751                        _debug("CALL %p{%s} [%d]",
 752                               call, call->type->name,
 753                               atomic_read(&afs_outstanding_calls));
 754                        atomic_inc(&afs_outstanding_calls);
 755                }
 756
 757                rxcall = rxrpc_kernel_accept_call(afs_socket,
 758                                                  (unsigned long) call);
 759                if (!IS_ERR(rxcall)) {
 760                        call->rxcall = rxcall;
 761                        call = NULL;
 762                }
 763        }
 764
 765        if (call)
 766                afs_free_call(call);
 767}
 768
 769/*
 770 * Grab the operation ID from an incoming cache manager call.  The socket
 771 * buffer is discarded on error or if we don't yet have sufficient data.
 772 */
 773static int afs_deliver_cm_op_id(struct afs_call *call, struct sk_buff *skb,
 774                                bool last)
 775{
 776        size_t len = skb->len;
 777        void *oibuf = (void *) &call->operation_ID;
 778
 779        _enter("{%u},{%zu},%d", call->offset, len, last);
 780
 781        ASSERTCMP(call->offset, <, 4);
 782
 783        /* the operation ID forms the first four bytes of the request data */
 784        len = min_t(size_t, len, 4 - call->offset);
 785        if (skb_copy_bits(skb, 0, oibuf + call->offset, len) < 0)
 786                BUG();
 787        if (!pskb_pull(skb, len))
 788                BUG();
 789        call->offset += len;
 790
 791        if (call->offset < 4) {
 792                afs_data_consumed(call, skb);
 793                _leave(" = -EAGAIN");
 794                return -EAGAIN;
 795        }
 796
 797        call->state = AFS_CALL_AWAIT_REQUEST;
 798
 799        /* ask the cache manager to route the call (it'll change the call type
 800         * if successful) */
 801        if (!afs_cm_incoming_call(call))
 802                return -ENOTSUPP;
 803
 804        /* pass responsibility for the remainer of this message off to the
 805         * cache manager op */
 806        return call->type->deliver(call, skb, last);
 807}
 808
 809/*
 810 * send an empty reply
 811 */
 812void afs_send_empty_reply(struct afs_call *call)
 813{
 814        struct msghdr msg;
 815
 816        _enter("");
 817
 818        msg.msg_name            = NULL;
 819        msg.msg_namelen         = 0;
 820        iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, NULL, 0, 0);
 821        msg.msg_control         = NULL;
 822        msg.msg_controllen      = 0;
 823        msg.msg_flags           = 0;
 824
 825        call->state = AFS_CALL_AWAIT_ACK;
 826        switch (rxrpc_kernel_send_data(call->rxcall, &msg, 0)) {
 827        case 0:
 828                _leave(" [replied]");
 829                return;
 830
 831        case -ENOMEM:
 832                _debug("oom");
 833                rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT);
 834        default:
 835                afs_end_call(call);
 836                _leave(" [error]");
 837                return;
 838        }
 839}
 840
 841/*
 842 * send a simple reply
 843 */
 844void afs_send_simple_reply(struct afs_call *call, const void *buf, size_t len)
 845{
 846        struct msghdr msg;
 847        struct kvec iov[1];
 848        int n;
 849
 850        _enter("");
 851
 852        iov[0].iov_base         = (void *) buf;
 853        iov[0].iov_len          = len;
 854        msg.msg_name            = NULL;
 855        msg.msg_namelen         = 0;
 856        iov_iter_kvec(&msg.msg_iter, WRITE | ITER_KVEC, iov, 1, len);
 857        msg.msg_control         = NULL;
 858        msg.msg_controllen      = 0;
 859        msg.msg_flags           = 0;
 860
 861        call->state = AFS_CALL_AWAIT_ACK;
 862        n = rxrpc_kernel_send_data(call->rxcall, &msg, len);
 863        if (n >= 0) {
 864                /* Success */
 865                _leave(" [replied]");
 866                return;
 867        }
 868
 869        if (n == -ENOMEM) {
 870                _debug("oom");
 871                rxrpc_kernel_abort_call(call->rxcall, RX_USER_ABORT);
 872        }
 873        afs_end_call(call);
 874        _leave(" [error]");
 875}
 876
 877/*
 878 * Extract a piece of data from the received data socket buffers.
 879 */
 880int afs_extract_data(struct afs_call *call, struct sk_buff *skb,
 881                     bool last, void *buf, size_t count)
 882{
 883        size_t len = skb->len;
 884
 885        _enter("{%u},{%zu},%d,,%zu", call->offset, len, last, count);
 886
 887        ASSERTCMP(call->offset, <, count);
 888
 889        len = min_t(size_t, len, count - call->offset);
 890        if (skb_copy_bits(skb, 0, buf + call->offset, len) < 0 ||
 891            !pskb_pull(skb, len))
 892                BUG();
 893        call->offset += len;
 894
 895        if (call->offset < count) {
 896                afs_data_consumed(call, skb);
 897                _leave(" = -EAGAIN");
 898                return -EAGAIN;
 899        }
 900        return 0;
 901}
 902