linux/drivers/staging/lustre/lnet/klnds/socklnd/socklnd_lib.c
<<
>>
Prefs
   1/*
   2 * GPL HEADER START
   3 *
   4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
   5 *
   6 * This program is free software; you can redistribute it and/or modify
   7 * it under the terms of the GNU General Public License version 2 only,
   8 * as published by the Free Software Foundation.
   9 *
  10 * This program is distributed in the hope that it will be useful, but
  11 * WITHOUT ANY WARRANTY; without even the implied warranty of
  12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  13 * General Public License version 2 for more details (a copy is included
  14 * in the LICENSE file that accompanied this code).
  15 *
  16 * You should have received a copy of the GNU General Public License
  17 * version 2 along with this program; If not, see
  18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
  19 *
  20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
  21 * CA 95054 USA or visit www.sun.com if you need additional information or
  22 * have any questions.
  23 *
  24 * GPL HEADER END
  25 */
  26/*
  27 * Copyright (c) 2005, 2010, Oracle and/or its affiliates. All rights reserved.
  28 * Use is subject to license terms.
  29 *
  30 * Copyright (c) 2011, 2012, Intel Corporation.
  31 */
  32/*
  33 * This file is part of Lustre, http://www.lustre.org/
  34 * Lustre is a trademark of Sun Microsystems, Inc.
  35 */
  36
  37#include "socklnd.h"
  38
  39int
  40ksocknal_lib_get_conn_addrs(ksock_conn_t *conn)
  41{
  42        int rc = lnet_sock_getaddr(conn->ksnc_sock, 1, &conn->ksnc_ipaddr,
  43                                   &conn->ksnc_port);
  44
  45        /* Didn't need the {get,put}connsock dance to deref ksnc_sock... */
  46        LASSERT(!conn->ksnc_closing);
  47
  48        if (rc != 0) {
  49                CERROR("Error %d getting sock peer IP\n", rc);
  50                return rc;
  51        }
  52
  53        rc = lnet_sock_getaddr(conn->ksnc_sock, 0, &conn->ksnc_myipaddr, NULL);
  54        if (rc != 0) {
  55                CERROR("Error %d getting sock local IP\n", rc);
  56                return rc;
  57        }
  58
  59        return 0;
  60}
  61
  62int
  63ksocknal_lib_zc_capable(ksock_conn_t *conn)
  64{
  65        int caps = conn->ksnc_sock->sk->sk_route_caps;
  66
  67        if (conn->ksnc_proto == &ksocknal_protocol_v1x)
  68                return 0;
  69
  70        /* ZC if the socket supports scatter/gather and doesn't need software
  71         * checksums */
  72        return ((caps & NETIF_F_SG) != 0 && (caps & NETIF_F_CSUM_MASK) != 0);
  73}
  74
  75int
  76ksocknal_lib_send_iov(ksock_conn_t *conn, ksock_tx_t *tx)
  77{
  78        struct socket *sock = conn->ksnc_sock;
  79        int nob;
  80        int rc;
  81
  82        if (*ksocknal_tunables.ksnd_enable_csum && /* checksum enabled */
  83            conn->ksnc_proto == &ksocknal_protocol_v2x && /* V2.x connection  */
  84            tx->tx_nob == tx->tx_resid           && /* frist sending    */
  85            tx->tx_msg.ksm_csum == 0)                /* not checksummed  */
  86                ksocknal_lib_csum_tx(tx);
  87
  88        /* NB we can't trust socket ops to either consume our iovs
  89         * or leave them alone. */
  90
  91        {
  92#if SOCKNAL_SINGLE_FRAG_TX
  93                struct kvec scratch;
  94                struct kvec *scratchiov = &scratch;
  95                unsigned int niov = 1;
  96#else
  97                struct kvec *scratchiov = conn->ksnc_scheduler->kss_scratch_iov;
  98                unsigned int niov = tx->tx_niov;
  99#endif
 100                struct msghdr msg = {.msg_flags = MSG_DONTWAIT};
 101                int i;
 102
 103                for (nob = i = 0; i < niov; i++) {
 104                        scratchiov[i] = tx->tx_iov[i];
 105                        nob += scratchiov[i].iov_len;
 106                }
 107
 108                if (!list_empty(&conn->ksnc_tx_queue) ||
 109                    nob < tx->tx_resid)
 110                        msg.msg_flags |= MSG_MORE;
 111
 112                rc = kernel_sendmsg(sock, &msg, scratchiov, niov, nob);
 113        }
 114        return rc;
 115}
 116
 117int
 118ksocknal_lib_send_kiov(ksock_conn_t *conn, ksock_tx_t *tx)
 119{
 120        struct socket *sock = conn->ksnc_sock;
 121        lnet_kiov_t *kiov = tx->tx_kiov;
 122        int rc;
 123        int nob;
 124
 125        /* Not NOOP message */
 126        LASSERT(tx->tx_lnetmsg != NULL);
 127
 128        /* NB we can't trust socket ops to either consume our iovs
 129         * or leave them alone. */
 130        if (tx->tx_msg.ksm_zc_cookies[0] != 0) {
 131                /* Zero copy is enabled */
 132                struct sock *sk = sock->sk;
 133                struct page *page = kiov->kiov_page;
 134                int offset = kiov->kiov_offset;
 135                int fragsize = kiov->kiov_len;
 136                int msgflg = MSG_DONTWAIT;
 137
 138                CDEBUG(D_NET, "page %p + offset %x for %d\n",
 139                               page, offset, kiov->kiov_len);
 140
 141                if (!list_empty(&conn->ksnc_tx_queue) ||
 142                    fragsize < tx->tx_resid)
 143                        msgflg |= MSG_MORE;
 144
 145                if (sk->sk_prot->sendpage != NULL) {
 146                        rc = sk->sk_prot->sendpage(sk, page,
 147                                                   offset, fragsize, msgflg);
 148                } else {
 149                        rc = tcp_sendpage(sk, page, offset, fragsize, msgflg);
 150                }
 151        } else {
 152#if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK
 153                struct kvec scratch;
 154                struct kvec *scratchiov = &scratch;
 155                unsigned int niov = 1;
 156#else
 157#ifdef CONFIG_HIGHMEM
 158#warning "XXX risk of kmap deadlock on multiple frags..."
 159#endif
 160                struct kvec *scratchiov = conn->ksnc_scheduler->kss_scratch_iov;
 161                unsigned int niov = tx->tx_nkiov;
 162#endif
 163                struct msghdr msg = {.msg_flags = MSG_DONTWAIT};
 164                int i;
 165
 166                for (nob = i = 0; i < niov; i++) {
 167                        scratchiov[i].iov_base = kmap(kiov[i].kiov_page) +
 168                                                 kiov[i].kiov_offset;
 169                        nob += scratchiov[i].iov_len = kiov[i].kiov_len;
 170                }
 171
 172                if (!list_empty(&conn->ksnc_tx_queue) ||
 173                    nob < tx->tx_resid)
 174                        msg.msg_flags |= MSG_MORE;
 175
 176                rc = kernel_sendmsg(sock, &msg, (struct kvec *)scratchiov, niov, nob);
 177
 178                for (i = 0; i < niov; i++)
 179                        kunmap(kiov[i].kiov_page);
 180        }
 181        return rc;
 182}
 183
 184void
 185ksocknal_lib_eager_ack(ksock_conn_t *conn)
 186{
 187        int opt = 1;
 188        struct socket *sock = conn->ksnc_sock;
 189
 190        /* Remind the socket to ACK eagerly.  If I don't, the socket might
 191         * think I'm about to send something it could piggy-back the ACK
 192         * on, introducing delay in completing zero-copy sends in my
 193         * peer. */
 194
 195        kernel_setsockopt(sock, SOL_TCP, TCP_QUICKACK,
 196                               (char *)&opt, sizeof(opt));
 197}
 198
 199int
 200ksocknal_lib_recv_iov(ksock_conn_t *conn)
 201{
 202#if SOCKNAL_SINGLE_FRAG_RX
 203        struct kvec scratch;
 204        struct kvec *scratchiov = &scratch;
 205        unsigned int niov = 1;
 206#else
 207        struct kvec *scratchiov = conn->ksnc_scheduler->kss_scratch_iov;
 208        unsigned int niov = conn->ksnc_rx_niov;
 209#endif
 210        struct kvec *iov = conn->ksnc_rx_iov;
 211        struct msghdr msg = {
 212                .msg_flags = 0
 213        };
 214        int nob;
 215        int i;
 216        int rc;
 217        int fragnob;
 218        int sum;
 219        __u32 saved_csum;
 220
 221        /* NB we can't trust socket ops to either consume our iovs
 222         * or leave them alone. */
 223        LASSERT(niov > 0);
 224
 225        for (nob = i = 0; i < niov; i++) {
 226                scratchiov[i] = iov[i];
 227                nob += scratchiov[i].iov_len;
 228        }
 229        LASSERT(nob <= conn->ksnc_rx_nob_wanted);
 230
 231        rc = kernel_recvmsg(conn->ksnc_sock, &msg,
 232                scratchiov, niov, nob, MSG_DONTWAIT);
 233
 234        saved_csum = 0;
 235        if (conn->ksnc_proto == &ksocknal_protocol_v2x) {
 236                saved_csum = conn->ksnc_msg.ksm_csum;
 237                conn->ksnc_msg.ksm_csum = 0;
 238        }
 239
 240        if (saved_csum != 0) {
 241                /* accumulate checksum */
 242                for (i = 0, sum = rc; sum > 0; i++, sum -= fragnob) {
 243                        LASSERT(i < niov);
 244
 245                        fragnob = iov[i].iov_len;
 246                        if (fragnob > sum)
 247                                fragnob = sum;
 248
 249                        conn->ksnc_rx_csum = ksocknal_csum(conn->ksnc_rx_csum,
 250                                                           iov[i].iov_base, fragnob);
 251                }
 252                conn->ksnc_msg.ksm_csum = saved_csum;
 253        }
 254
 255        return rc;
 256}
 257
 258static void
 259ksocknal_lib_kiov_vunmap(void *addr)
 260{
 261        if (addr == NULL)
 262                return;
 263
 264        vunmap(addr);
 265}
 266
 267static void *
 268ksocknal_lib_kiov_vmap(lnet_kiov_t *kiov, int niov,
 269                       struct kvec *iov, struct page **pages)
 270{
 271        void *addr;
 272        int nob;
 273        int i;
 274
 275        if (!*ksocknal_tunables.ksnd_zc_recv || pages == NULL)
 276                return NULL;
 277
 278        LASSERT(niov <= LNET_MAX_IOV);
 279
 280        if (niov < 2 ||
 281            niov < *ksocknal_tunables.ksnd_zc_recv_min_nfrags)
 282                return NULL;
 283
 284        for (nob = i = 0; i < niov; i++) {
 285                if ((kiov[i].kiov_offset != 0 && i > 0) ||
 286                    (kiov[i].kiov_offset + kiov[i].kiov_len != PAGE_CACHE_SIZE && i < niov - 1))
 287                        return NULL;
 288
 289                pages[i] = kiov[i].kiov_page;
 290                nob += kiov[i].kiov_len;
 291        }
 292
 293        addr = vmap(pages, niov, VM_MAP, PAGE_KERNEL);
 294        if (addr == NULL)
 295                return NULL;
 296
 297        iov->iov_base = addr + kiov[0].kiov_offset;
 298        iov->iov_len = nob;
 299
 300        return addr;
 301}
 302
 303int
 304ksocknal_lib_recv_kiov(ksock_conn_t *conn)
 305{
 306#if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK
 307        struct kvec scratch;
 308        struct kvec *scratchiov = &scratch;
 309        struct page **pages = NULL;
 310        unsigned int niov = 1;
 311#else
 312#ifdef CONFIG_HIGHMEM
 313#warning "XXX risk of kmap deadlock on multiple frags..."
 314#endif
 315        struct kvec *scratchiov = conn->ksnc_scheduler->kss_scratch_iov;
 316        struct page **pages = conn->ksnc_scheduler->kss_rx_scratch_pgs;
 317        unsigned int niov = conn->ksnc_rx_nkiov;
 318#endif
 319        lnet_kiov_t   *kiov = conn->ksnc_rx_kiov;
 320        struct msghdr msg = {
 321                .msg_flags = 0
 322        };
 323        int nob;
 324        int i;
 325        int rc;
 326        void *base;
 327        void *addr;
 328        int sum;
 329        int fragnob;
 330        int n;
 331
 332        /* NB we can't trust socket ops to either consume our iovs
 333         * or leave them alone. */
 334        addr = ksocknal_lib_kiov_vmap(kiov, niov, scratchiov, pages);
 335        if (addr != NULL) {
 336                nob = scratchiov[0].iov_len;
 337                n = 1;
 338
 339        } else {
 340                for (nob = i = 0; i < niov; i++) {
 341                        nob += scratchiov[i].iov_len = kiov[i].kiov_len;
 342                        scratchiov[i].iov_base = kmap(kiov[i].kiov_page) +
 343                                                 kiov[i].kiov_offset;
 344                }
 345                n = niov;
 346        }
 347
 348        LASSERT(nob <= conn->ksnc_rx_nob_wanted);
 349
 350        rc = kernel_recvmsg(conn->ksnc_sock, &msg,
 351                        (struct kvec *)scratchiov, n, nob, MSG_DONTWAIT);
 352
 353        if (conn->ksnc_msg.ksm_csum != 0) {
 354                for (i = 0, sum = rc; sum > 0; i++, sum -= fragnob) {
 355                        LASSERT(i < niov);
 356
 357                        /* Dang! have to kmap again because I have nowhere to
 358                         * stash the mapped address.  But by doing it while the
 359                         * page is still mapped, the kernel just bumps the map
 360                         * count and returns me the address it stashed. */
 361                        base = kmap(kiov[i].kiov_page) + kiov[i].kiov_offset;
 362                        fragnob = kiov[i].kiov_len;
 363                        if (fragnob > sum)
 364                                fragnob = sum;
 365
 366                        conn->ksnc_rx_csum = ksocknal_csum(conn->ksnc_rx_csum,
 367                                                           base, fragnob);
 368
 369                        kunmap(kiov[i].kiov_page);
 370                }
 371        }
 372
 373        if (addr != NULL) {
 374                ksocknal_lib_kiov_vunmap(addr);
 375        } else {
 376                for (i = 0; i < niov; i++)
 377                        kunmap(kiov[i].kiov_page);
 378        }
 379
 380        return rc;
 381}
 382
 383void
 384ksocknal_lib_csum_tx(ksock_tx_t *tx)
 385{
 386        int i;
 387        __u32 csum;
 388        void *base;
 389
 390        LASSERT(tx->tx_iov[0].iov_base == &tx->tx_msg);
 391        LASSERT(tx->tx_conn != NULL);
 392        LASSERT(tx->tx_conn->ksnc_proto == &ksocknal_protocol_v2x);
 393
 394        tx->tx_msg.ksm_csum = 0;
 395
 396        csum = ksocknal_csum(~0, tx->tx_iov[0].iov_base,
 397                             tx->tx_iov[0].iov_len);
 398
 399        if (tx->tx_kiov != NULL) {
 400                for (i = 0; i < tx->tx_nkiov; i++) {
 401                        base = kmap(tx->tx_kiov[i].kiov_page) +
 402                               tx->tx_kiov[i].kiov_offset;
 403
 404                        csum = ksocknal_csum(csum, base, tx->tx_kiov[i].kiov_len);
 405
 406                        kunmap(tx->tx_kiov[i].kiov_page);
 407                }
 408        } else {
 409                for (i = 1; i < tx->tx_niov; i++)
 410                        csum = ksocknal_csum(csum, tx->tx_iov[i].iov_base,
 411                                             tx->tx_iov[i].iov_len);
 412        }
 413
 414        if (*ksocknal_tunables.ksnd_inject_csum_error) {
 415                csum++;
 416                *ksocknal_tunables.ksnd_inject_csum_error = 0;
 417        }
 418
 419        tx->tx_msg.ksm_csum = csum;
 420}
 421
 422int
 423ksocknal_lib_get_conn_tunables(ksock_conn_t *conn, int *txmem, int *rxmem, int *nagle)
 424{
 425        struct socket *sock = conn->ksnc_sock;
 426        int len;
 427        int rc;
 428
 429        rc = ksocknal_connsock_addref(conn);
 430        if (rc != 0) {
 431                LASSERT(conn->ksnc_closing);
 432                *txmem = *rxmem = *nagle = 0;
 433                return -ESHUTDOWN;
 434        }
 435
 436        rc = lnet_sock_getbuf(sock, txmem, rxmem);
 437        if (rc == 0) {
 438                len = sizeof(*nagle);
 439                rc = kernel_getsockopt(sock, SOL_TCP, TCP_NODELAY,
 440                                           (char *)nagle, &len);
 441        }
 442
 443        ksocknal_connsock_decref(conn);
 444
 445        if (rc == 0)
 446                *nagle = !*nagle;
 447        else
 448                *txmem = *rxmem = *nagle = 0;
 449
 450        return rc;
 451}
 452
 453int
 454ksocknal_lib_setup_sock(struct socket *sock)
 455{
 456        int rc;
 457        int option;
 458        int keep_idle;
 459        int keep_intvl;
 460        int keep_count;
 461        int do_keepalive;
 462        struct linger linger;
 463
 464        sock->sk->sk_allocation = GFP_NOFS;
 465
 466        /* Ensure this socket aborts active sends immediately when we close
 467         * it. */
 468
 469        linger.l_onoff = 0;
 470        linger.l_linger = 0;
 471
 472        rc = kernel_setsockopt(sock, SOL_SOCKET, SO_LINGER,
 473                              (char *)&linger, sizeof(linger));
 474        if (rc != 0) {
 475                CERROR("Can't set SO_LINGER: %d\n", rc);
 476                return rc;
 477        }
 478
 479        option = -1;
 480        rc = kernel_setsockopt(sock, SOL_TCP, TCP_LINGER2,
 481                                    (char *)&option, sizeof(option));
 482        if (rc != 0) {
 483                CERROR("Can't set SO_LINGER2: %d\n", rc);
 484                return rc;
 485        }
 486
 487        if (!*ksocknal_tunables.ksnd_nagle) {
 488                option = 1;
 489
 490                rc = kernel_setsockopt(sock, SOL_TCP, TCP_NODELAY,
 491                                            (char *)&option, sizeof(option));
 492                if (rc != 0) {
 493                        CERROR("Can't disable nagle: %d\n", rc);
 494                        return rc;
 495                }
 496        }
 497
 498        rc = lnet_sock_setbuf(sock, *ksocknal_tunables.ksnd_tx_buffer_size,
 499                              *ksocknal_tunables.ksnd_rx_buffer_size);
 500        if (rc != 0) {
 501                CERROR("Can't set buffer tx %d, rx %d buffers: %d\n",
 502                        *ksocknal_tunables.ksnd_tx_buffer_size,
 503                        *ksocknal_tunables.ksnd_rx_buffer_size, rc);
 504                return rc;
 505        }
 506
 507/* TCP_BACKOFF_* sockopt tunables unsupported in stock kernels */
 508
 509        /* snapshot tunables */
 510        keep_idle  = *ksocknal_tunables.ksnd_keepalive_idle;
 511        keep_count = *ksocknal_tunables.ksnd_keepalive_count;
 512        keep_intvl = *ksocknal_tunables.ksnd_keepalive_intvl;
 513
 514        do_keepalive = (keep_idle > 0 && keep_count > 0 && keep_intvl > 0);
 515
 516        option = (do_keepalive ? 1 : 0);
 517        rc = kernel_setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE,
 518                              (char *)&option, sizeof(option));
 519        if (rc != 0) {
 520                CERROR("Can't set SO_KEEPALIVE: %d\n", rc);
 521                return rc;
 522        }
 523
 524        if (!do_keepalive)
 525                return 0;
 526
 527        rc = kernel_setsockopt(sock, SOL_TCP, TCP_KEEPIDLE,
 528                                    (char *)&keep_idle, sizeof(keep_idle));
 529        if (rc != 0) {
 530                CERROR("Can't set TCP_KEEPIDLE: %d\n", rc);
 531                return rc;
 532        }
 533
 534        rc = kernel_setsockopt(sock, SOL_TCP, TCP_KEEPINTVL,
 535                                    (char *)&keep_intvl, sizeof(keep_intvl));
 536        if (rc != 0) {
 537                CERROR("Can't set TCP_KEEPINTVL: %d\n", rc);
 538                return rc;
 539        }
 540
 541        rc = kernel_setsockopt(sock, SOL_TCP, TCP_KEEPCNT,
 542                                    (char *)&keep_count, sizeof(keep_count));
 543        if (rc != 0) {
 544                CERROR("Can't set TCP_KEEPCNT: %d\n", rc);
 545                return rc;
 546        }
 547
 548        return 0;
 549}
 550
 551void
 552ksocknal_lib_push_conn(ksock_conn_t *conn)
 553{
 554        struct sock *sk;
 555        struct tcp_sock *tp;
 556        int nonagle;
 557        int val = 1;
 558        int rc;
 559
 560        rc = ksocknal_connsock_addref(conn);
 561        if (rc != 0)                        /* being shut down */
 562                return;
 563
 564        sk = conn->ksnc_sock->sk;
 565        tp = tcp_sk(sk);
 566
 567        lock_sock(sk);
 568        nonagle = tp->nonagle;
 569        tp->nonagle = 1;
 570        release_sock(sk);
 571
 572        rc = kernel_setsockopt(conn->ksnc_sock, SOL_TCP, TCP_NODELAY,
 573                                      (char *)&val, sizeof(val));
 574        LASSERT(rc == 0);
 575
 576        lock_sock(sk);
 577        tp->nonagle = nonagle;
 578        release_sock(sk);
 579
 580        ksocknal_connsock_decref(conn);
 581}
 582
 583/*
 584 * socket call back in Linux
 585 */
 586static void
 587ksocknal_data_ready(struct sock *sk)
 588{
 589        ksock_conn_t *conn;
 590
 591        /* interleave correctly with closing sockets... */
 592        LASSERT(!in_irq());
 593        read_lock(&ksocknal_data.ksnd_global_lock);
 594
 595        conn = sk->sk_user_data;
 596        if (conn == NULL) {          /* raced with ksocknal_terminate_conn */
 597                LASSERT(sk->sk_data_ready != &ksocknal_data_ready);
 598                sk->sk_data_ready(sk);
 599        } else
 600                ksocknal_read_callback(conn);
 601
 602        read_unlock(&ksocknal_data.ksnd_global_lock);
 603}
 604
 605static void
 606ksocknal_write_space(struct sock *sk)
 607{
 608        ksock_conn_t *conn;
 609        int wspace;
 610        int min_wpace;
 611
 612        /* interleave correctly with closing sockets... */
 613        LASSERT(!in_irq());
 614        read_lock(&ksocknal_data.ksnd_global_lock);
 615
 616        conn = sk->sk_user_data;
 617        wspace = sk_stream_wspace(sk);
 618        min_wpace = sk_stream_min_wspace(sk);
 619
 620        CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
 621               sk, wspace, min_wpace, conn,
 622               (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
 623                                      " ready" : " blocked"),
 624               (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
 625                                      " scheduled" : " idle"),
 626               (conn == NULL) ? "" : (list_empty(&conn->ksnc_tx_queue) ?
 627                                      " empty" : " queued"));
 628
 629        if (conn == NULL) {          /* raced with ksocknal_terminate_conn */
 630                LASSERT(sk->sk_write_space != &ksocknal_write_space);
 631                sk->sk_write_space(sk);
 632
 633                read_unlock(&ksocknal_data.ksnd_global_lock);
 634                return;
 635        }
 636
 637        if (wspace >= min_wpace) {            /* got enough space */
 638                ksocknal_write_callback(conn);
 639
 640                /* Clear SOCK_NOSPACE _after_ ksocknal_write_callback so the
 641                 * ENOMEM check in ksocknal_transmit is race-free (think about
 642                 * it). */
 643
 644                clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 645        }
 646
 647        read_unlock(&ksocknal_data.ksnd_global_lock);
 648}
 649
 650void
 651ksocknal_lib_save_callback(struct socket *sock, ksock_conn_t *conn)
 652{
 653        conn->ksnc_saved_data_ready = sock->sk->sk_data_ready;
 654        conn->ksnc_saved_write_space = sock->sk->sk_write_space;
 655}
 656
 657void
 658ksocknal_lib_set_callback(struct socket *sock,  ksock_conn_t *conn)
 659{
 660        sock->sk->sk_user_data = conn;
 661        sock->sk->sk_data_ready = ksocknal_data_ready;
 662        sock->sk->sk_write_space = ksocknal_write_space;
 663        return;
 664}
 665
 666void
 667ksocknal_lib_reset_callback(struct socket *sock, ksock_conn_t *conn)
 668{
 669        /* Remove conn's network callbacks.
 670         * NB I _have_ to restore the callback, rather than storing a noop,
 671         * since the socket could survive past this module being unloaded!! */
 672        sock->sk->sk_data_ready = conn->ksnc_saved_data_ready;
 673        sock->sk->sk_write_space = conn->ksnc_saved_write_space;
 674
 675        /* A callback could be in progress already; they hold a read lock
 676         * on ksnd_global_lock (to serialise with me) and NOOP if
 677         * sk_user_data is NULL. */
 678        sock->sk->sk_user_data = NULL;
 679
 680        return ;
 681}
 682
 683int
 684ksocknal_lib_memory_pressure(ksock_conn_t *conn)
 685{
 686        int rc = 0;
 687        ksock_sched_t *sched;
 688
 689        sched = conn->ksnc_scheduler;
 690        spin_lock_bh(&sched->kss_lock);
 691
 692        if (!test_bit(SOCK_NOSPACE, &conn->ksnc_sock->flags) &&
 693            !conn->ksnc_tx_ready) {
 694                /* SOCK_NOSPACE is set when the socket fills
 695                 * and cleared in the write_space callback
 696                 * (which also sets ksnc_tx_ready).  If
 697                 * SOCK_NOSPACE and ksnc_tx_ready are BOTH
 698                 * zero, I didn't fill the socket and
 699                 * write_space won't reschedule me, so I
 700                 * return -ENOMEM to get my caller to retry
 701                 * after a timeout */
 702                rc = -ENOMEM;
 703        }
 704
 705        spin_unlock_bh(&sched->kss_lock);
 706
 707        return rc;
 708}
 709