linux/drivers/staging/lustre/lnet/klnds/socklnd/socklnd_cb.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
   3 *
   4 * Copyright (c) 2011, 2012, Intel Corporation.
   5 *
   6 *   Author: Zach Brown <zab@zabbo.net>
   7 *   Author: Peter J. Braam <braam@clusterfs.com>
   8 *   Author: Phil Schwan <phil@clusterfs.com>
   9 *   Author: Eric Barton <eric@bartonsoftware.com>
  10 *
  11 *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
  12 *
  13 *   Portals is free software; you can redistribute it and/or
  14 *   modify it under the terms of version 2 of the GNU General Public
  15 *   License as published by the Free Software Foundation.
  16 *
  17 *   Portals is distributed in the hope that it will be useful,
  18 *   but WITHOUT ANY WARRANTY; without even the implied warranty of
  19 *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  20 *   GNU General Public License for more details.
  21 *
  22 *   You should have received a copy of the GNU General Public License
  23 *   along with Portals; if not, write to the Free Software
  24 *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  25 */
  26
  27#include "socklnd.h"
  28
  29ksock_tx_t *
  30ksocknal_alloc_tx(int type, int size)
  31{
  32        ksock_tx_t *tx = NULL;
  33
  34        if (type == KSOCK_MSG_NOOP) {
  35                LASSERT(size == KSOCK_NOOP_TX_SIZE);
  36
  37                /* searching for a noop tx in free list */
  38                spin_lock(&ksocknal_data.ksnd_tx_lock);
  39
  40                if (!list_empty(&ksocknal_data.ksnd_idle_noop_txs)) {
  41                        tx = list_entry(ksocknal_data.ksnd_idle_noop_txs. \
  42                                            next, ksock_tx_t, tx_list);
  43                        LASSERT(tx->tx_desc_size == size);
  44                        list_del(&tx->tx_list);
  45                }
  46
  47                spin_unlock(&ksocknal_data.ksnd_tx_lock);
  48        }
  49
  50        if (tx == NULL)
  51                LIBCFS_ALLOC(tx, size);
  52
  53        if (tx == NULL)
  54                return NULL;
  55
  56        atomic_set(&tx->tx_refcount, 1);
  57        tx->tx_zc_aborted = 0;
  58        tx->tx_zc_capable = 0;
  59        tx->tx_zc_checked = 0;
  60        tx->tx_desc_size  = size;
  61
  62        atomic_inc(&ksocknal_data.ksnd_nactive_txs);
  63
  64        return tx;
  65}
  66
  67ksock_tx_t *
  68ksocknal_alloc_tx_noop(__u64 cookie, int nonblk)
  69{
  70        ksock_tx_t *tx;
  71
  72        tx = ksocknal_alloc_tx(KSOCK_MSG_NOOP, KSOCK_NOOP_TX_SIZE);
  73        if (tx == NULL) {
  74                CERROR("Can't allocate noop tx desc\n");
  75                return NULL;
  76        }
  77
  78        tx->tx_conn     = NULL;
  79        tx->tx_lnetmsg  = NULL;
  80        tx->tx_kiov     = NULL;
  81        tx->tx_nkiov    = 0;
  82        tx->tx_iov      = tx->tx_frags.virt.iov;
  83        tx->tx_niov     = 1;
  84        tx->tx_nonblk   = nonblk;
  85
  86        socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_NOOP);
  87        tx->tx_msg.ksm_zc_cookies[1] = cookie;
  88
  89        return tx;
  90}
  91
  92
  93void
  94ksocknal_free_tx (ksock_tx_t *tx)
  95{
  96        atomic_dec(&ksocknal_data.ksnd_nactive_txs);
  97
  98        if (tx->tx_lnetmsg == NULL && tx->tx_desc_size == KSOCK_NOOP_TX_SIZE) {
  99                /* it's a noop tx */
 100                spin_lock(&ksocknal_data.ksnd_tx_lock);
 101
 102                list_add(&tx->tx_list, &ksocknal_data.ksnd_idle_noop_txs);
 103
 104                spin_unlock(&ksocknal_data.ksnd_tx_lock);
 105        } else {
 106                LIBCFS_FREE(tx, tx->tx_desc_size);
 107        }
 108}
 109
 110int
 111ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
 112{
 113        struct iovec  *iov = tx->tx_iov;
 114        int    nob;
 115        int    rc;
 116
 117        LASSERT (tx->tx_niov > 0);
 118
 119        /* Never touch tx->tx_iov inside ksocknal_lib_send_iov() */
 120        rc = ksocknal_lib_send_iov(conn, tx);
 121
 122        if (rc <= 0)                        /* sent nothing? */
 123                return (rc);
 124
 125        nob = rc;
 126        LASSERT (nob <= tx->tx_resid);
 127        tx->tx_resid -= nob;
 128
 129        /* "consume" iov */
 130        do {
 131                LASSERT (tx->tx_niov > 0);
 132
 133                if (nob < (int) iov->iov_len) {
 134                        iov->iov_base = (void *)((char *)iov->iov_base + nob);
 135                        iov->iov_len -= nob;
 136                        return (rc);
 137                }
 138
 139                nob -= iov->iov_len;
 140                tx->tx_iov = ++iov;
 141                tx->tx_niov--;
 142        } while (nob != 0);
 143
 144        return (rc);
 145}
 146
 147int
 148ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
 149{
 150        lnet_kiov_t    *kiov = tx->tx_kiov;
 151        int     nob;
 152        int     rc;
 153
 154        LASSERT (tx->tx_niov == 0);
 155        LASSERT (tx->tx_nkiov > 0);
 156
 157        /* Never touch tx->tx_kiov inside ksocknal_lib_send_kiov() */
 158        rc = ksocknal_lib_send_kiov(conn, tx);
 159
 160        if (rc <= 0)                        /* sent nothing? */
 161                return (rc);
 162
 163        nob = rc;
 164        LASSERT (nob <= tx->tx_resid);
 165        tx->tx_resid -= nob;
 166
 167        /* "consume" kiov */
 168        do {
 169                LASSERT(tx->tx_nkiov > 0);
 170
 171                if (nob < (int)kiov->kiov_len) {
 172                        kiov->kiov_offset += nob;
 173                        kiov->kiov_len -= nob;
 174                        return rc;
 175                }
 176
 177                nob -= (int)kiov->kiov_len;
 178                tx->tx_kiov = ++kiov;
 179                tx->tx_nkiov--;
 180        } while (nob != 0);
 181
 182        return (rc);
 183}
 184
 185int
 186ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
 187{
 188        int      rc;
 189        int      bufnob;
 190
 191        if (ksocknal_data.ksnd_stall_tx != 0) {
 192                cfs_pause(cfs_time_seconds(ksocknal_data.ksnd_stall_tx));
 193        }
 194
 195        LASSERT (tx->tx_resid != 0);
 196
 197        rc = ksocknal_connsock_addref(conn);
 198        if (rc != 0) {
 199                LASSERT (conn->ksnc_closing);
 200                return (-ESHUTDOWN);
 201        }
 202
 203        do {
 204                if (ksocknal_data.ksnd_enomem_tx > 0) {
 205                        /* testing... */
 206                        ksocknal_data.ksnd_enomem_tx--;
 207                        rc = -EAGAIN;
 208                } else if (tx->tx_niov != 0) {
 209                        rc = ksocknal_send_iov (conn, tx);
 210                } else {
 211                        rc = ksocknal_send_kiov (conn, tx);
 212                }
 213
 214                bufnob = cfs_sock_wmem_queued(conn->ksnc_sock);
 215                if (rc > 0)                  /* sent something? */
 216                        conn->ksnc_tx_bufnob += rc; /* account it */
 217
 218                if (bufnob < conn->ksnc_tx_bufnob) {
 219                        /* allocated send buffer bytes < computed; infer
 220                         * something got ACKed */
 221                        conn->ksnc_tx_deadline =
 222                                cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
 223                        conn->ksnc_peer->ksnp_last_alive = cfs_time_current();
 224                        conn->ksnc_tx_bufnob = bufnob;
 225                        mb();
 226                }
 227
 228                if (rc <= 0) { /* Didn't write anything? */
 229
 230                        if (rc == 0) /* some stacks return 0 instead of -EAGAIN */
 231                                rc = -EAGAIN;
 232
 233                        /* Check if EAGAIN is due to memory pressure */
 234                        if(rc == -EAGAIN && ksocknal_lib_memory_pressure(conn))
 235                                rc = -ENOMEM;
 236
 237                        break;
 238                }
 239
 240                /* socket's wmem_queued now includes 'rc' bytes */
 241                atomic_sub (rc, &conn->ksnc_tx_nob);
 242                rc = 0;
 243
 244        } while (tx->tx_resid != 0);
 245
 246        ksocknal_connsock_decref(conn);
 247        return (rc);
 248}
 249
 250int
 251ksocknal_recv_iov (ksock_conn_t *conn)
 252{
 253        struct iovec *iov = conn->ksnc_rx_iov;
 254        int     nob;
 255        int     rc;
 256
 257        LASSERT (conn->ksnc_rx_niov > 0);
 258
 259        /* Never touch conn->ksnc_rx_iov or change connection
 260         * status inside ksocknal_lib_recv_iov */
 261        rc = ksocknal_lib_recv_iov(conn);
 262
 263        if (rc <= 0)
 264                return (rc);
 265
 266        /* received something... */
 267        nob = rc;
 268
 269        conn->ksnc_peer->ksnp_last_alive = cfs_time_current();
 270        conn->ksnc_rx_deadline =
 271                cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
 272        mb();                  /* order with setting rx_started */
 273        conn->ksnc_rx_started = 1;
 274
 275        conn->ksnc_rx_nob_wanted -= nob;
 276        conn->ksnc_rx_nob_left -= nob;
 277
 278        do {
 279                LASSERT (conn->ksnc_rx_niov > 0);
 280
 281                if (nob < (int)iov->iov_len) {
 282                        iov->iov_len -= nob;
 283                        iov->iov_base = (void *)((char *)iov->iov_base + nob);
 284                        return (-EAGAIN);
 285                }
 286
 287                nob -= iov->iov_len;
 288                conn->ksnc_rx_iov = ++iov;
 289                conn->ksnc_rx_niov--;
 290        } while (nob != 0);
 291
 292        return (rc);
 293}
 294
 295int
 296ksocknal_recv_kiov (ksock_conn_t *conn)
 297{
 298        lnet_kiov_t   *kiov = conn->ksnc_rx_kiov;
 299        int     nob;
 300        int     rc;
 301        LASSERT (conn->ksnc_rx_nkiov > 0);
 302
 303        /* Never touch conn->ksnc_rx_kiov or change connection
 304         * status inside ksocknal_lib_recv_iov */
 305        rc = ksocknal_lib_recv_kiov(conn);
 306
 307        if (rc <= 0)
 308                return (rc);
 309
 310        /* received something... */
 311        nob = rc;
 312
 313        conn->ksnc_peer->ksnp_last_alive = cfs_time_current();
 314        conn->ksnc_rx_deadline =
 315                cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
 316        mb();                  /* order with setting rx_started */
 317        conn->ksnc_rx_started = 1;
 318
 319        conn->ksnc_rx_nob_wanted -= nob;
 320        conn->ksnc_rx_nob_left -= nob;
 321
 322        do {
 323                LASSERT (conn->ksnc_rx_nkiov > 0);
 324
 325                if (nob < (int) kiov->kiov_len) {
 326                        kiov->kiov_offset += nob;
 327                        kiov->kiov_len -= nob;
 328                        return -EAGAIN;
 329                }
 330
 331                nob -= kiov->kiov_len;
 332                conn->ksnc_rx_kiov = ++kiov;
 333                conn->ksnc_rx_nkiov--;
 334        } while (nob != 0);
 335
 336        return 1;
 337}
 338
 339int
 340ksocknal_receive (ksock_conn_t *conn)
 341{
 342        /* Return 1 on success, 0 on EOF, < 0 on error.
 343         * Caller checks ksnc_rx_nob_wanted to determine
 344         * progress/completion. */
 345        int     rc;
 346        ENTRY;
 347
 348        if (ksocknal_data.ksnd_stall_rx != 0) {
 349                cfs_pause(cfs_time_seconds (ksocknal_data.ksnd_stall_rx));
 350        }
 351
 352        rc = ksocknal_connsock_addref(conn);
 353        if (rc != 0) {
 354                LASSERT (conn->ksnc_closing);
 355                return (-ESHUTDOWN);
 356        }
 357
 358        for (;;) {
 359                if (conn->ksnc_rx_niov != 0)
 360                        rc = ksocknal_recv_iov (conn);
 361                else
 362                        rc = ksocknal_recv_kiov (conn);
 363
 364                if (rc <= 0) {
 365                        /* error/EOF or partial receive */
 366                        if (rc == -EAGAIN) {
 367                                rc = 1;
 368                        } else if (rc == 0 && conn->ksnc_rx_started) {
 369                                /* EOF in the middle of a message */
 370                                rc = -EPROTO;
 371                        }
 372                        break;
 373                }
 374
 375                /* Completed a fragment */
 376
 377                if (conn->ksnc_rx_nob_wanted == 0) {
 378                        rc = 1;
 379                        break;
 380                }
 381        }
 382
 383        ksocknal_connsock_decref(conn);
 384        RETURN (rc);
 385}
 386
 387void
 388ksocknal_tx_done (lnet_ni_t *ni, ksock_tx_t *tx)
 389{
 390        lnet_msg_t  *lnetmsg = tx->tx_lnetmsg;
 391        int       rc = (tx->tx_resid == 0 && !tx->tx_zc_aborted) ? 0 : -EIO;
 392        ENTRY;
 393
 394        LASSERT(ni != NULL || tx->tx_conn != NULL);
 395
 396        if (tx->tx_conn != NULL)
 397                ksocknal_conn_decref(tx->tx_conn);
 398
 399        if (ni == NULL && tx->tx_conn != NULL)
 400                ni = tx->tx_conn->ksnc_peer->ksnp_ni;
 401
 402        ksocknal_free_tx (tx);
 403        if (lnetmsg != NULL) /* KSOCK_MSG_NOOP go without lnetmsg */
 404                lnet_finalize (ni, lnetmsg, rc);
 405
 406        EXIT;
 407}
 408
 409void
 410ksocknal_txlist_done (lnet_ni_t *ni, struct list_head *txlist, int error)
 411{
 412        ksock_tx_t *tx;
 413
 414        while (!list_empty (txlist)) {
 415                tx = list_entry (txlist->next, ksock_tx_t, tx_list);
 416
 417                if (error && tx->tx_lnetmsg != NULL) {
 418                        CNETERR("Deleting packet type %d len %d %s->%s\n",
 419                                le32_to_cpu (tx->tx_lnetmsg->msg_hdr.type),
 420                                le32_to_cpu (tx->tx_lnetmsg->msg_hdr.payload_length),
 421                                libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.src_nid)),
 422                                libcfs_nid2str(le64_to_cpu(tx->tx_lnetmsg->msg_hdr.dest_nid)));
 423                } else if (error) {
 424                        CNETERR("Deleting noop packet\n");
 425                }
 426
 427                list_del (&tx->tx_list);
 428
 429                LASSERT (atomic_read(&tx->tx_refcount) == 1);
 430                ksocknal_tx_done (ni, tx);
 431        }
 432}
 433
 434static void
 435ksocknal_check_zc_req(ksock_tx_t *tx)
 436{
 437        ksock_conn_t   *conn = tx->tx_conn;
 438        ksock_peer_t   *peer = conn->ksnc_peer;
 439
 440        /* Set tx_msg.ksm_zc_cookies[0] to a unique non-zero cookie and add tx
 441         * to ksnp_zc_req_list if some fragment of this message should be sent
 442         * zero-copy.  Our peer will send an ACK containing this cookie when
 443         * she has received this message to tell us we can signal completion.
 444         * tx_msg.ksm_zc_cookies[0] remains non-zero while tx is on
 445         * ksnp_zc_req_list. */
 446        LASSERT (tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
 447        LASSERT (tx->tx_zc_capable);
 448
 449        tx->tx_zc_checked = 1;
 450
 451        if (conn->ksnc_proto == &ksocknal_protocol_v1x ||
 452            !conn->ksnc_zc_capable)
 453                return;
 454
 455        /* assign cookie and queue tx to pending list, it will be released when
 456         * a matching ack is received. See ksocknal_handle_zcack() */
 457
 458        ksocknal_tx_addref(tx);
 459
 460        spin_lock(&peer->ksnp_lock);
 461
 462        /* ZC_REQ is going to be pinned to the peer */
 463        tx->tx_deadline =
 464                cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
 465
 466        LASSERT (tx->tx_msg.ksm_zc_cookies[0] == 0);
 467
 468        tx->tx_msg.ksm_zc_cookies[0] = peer->ksnp_zc_next_cookie++;
 469
 470        if (peer->ksnp_zc_next_cookie == 0)
 471                peer->ksnp_zc_next_cookie = SOCKNAL_KEEPALIVE_PING + 1;
 472
 473        list_add_tail(&tx->tx_zc_list, &peer->ksnp_zc_req_list);
 474
 475        spin_unlock(&peer->ksnp_lock);
 476}
 477
 478static void
 479ksocknal_uncheck_zc_req(ksock_tx_t *tx)
 480{
 481        ksock_peer_t   *peer = tx->tx_conn->ksnc_peer;
 482
 483        LASSERT(tx->tx_msg.ksm_type != KSOCK_MSG_NOOP);
 484        LASSERT(tx->tx_zc_capable);
 485
 486        tx->tx_zc_checked = 0;
 487
 488        spin_lock(&peer->ksnp_lock);
 489
 490        if (tx->tx_msg.ksm_zc_cookies[0] == 0) {
 491                /* Not waiting for an ACK */
 492                spin_unlock(&peer->ksnp_lock);
 493                return;
 494        }
 495
 496        tx->tx_msg.ksm_zc_cookies[0] = 0;
 497        list_del(&tx->tx_zc_list);
 498
 499        spin_unlock(&peer->ksnp_lock);
 500
 501        ksocknal_tx_decref(tx);
 502}
 503
 504int
 505ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
 506{
 507        int         rc;
 508
 509        if (tx->tx_zc_capable && !tx->tx_zc_checked)
 510                ksocknal_check_zc_req(tx);
 511
 512        rc = ksocknal_transmit (conn, tx);
 513
 514        CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
 515
 516        if (tx->tx_resid == 0) {
 517                /* Sent everything OK */
 518                LASSERT (rc == 0);
 519
 520                return (0);
 521        }
 522
 523        if (rc == -EAGAIN)
 524                return (rc);
 525
 526        if (rc == -ENOMEM) {
 527                static int counter;
 528
 529                counter++;   /* exponential backoff warnings */
 530                if ((counter & (-counter)) == counter)
 531                        CWARN("%u ENOMEM tx %p (%u allocated)\n",
 532                              counter, conn, atomic_read(&libcfs_kmemory));
 533
 534                /* Queue on ksnd_enomem_conns for retry after a timeout */
 535                spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
 536
 537                /* enomem list takes over scheduler's ref... */
 538                LASSERT (conn->ksnc_tx_scheduled);
 539                list_add_tail(&conn->ksnc_tx_list,
 540                                  &ksocknal_data.ksnd_enomem_conns);
 541                if (!cfs_time_aftereq(cfs_time_add(cfs_time_current(),
 542                                                   SOCKNAL_ENOMEM_RETRY),
 543                                   ksocknal_data.ksnd_reaper_waketime))
 544                        wake_up (&ksocknal_data.ksnd_reaper_waitq);
 545
 546                spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
 547                return (rc);
 548        }
 549
 550        /* Actual error */
 551        LASSERT (rc < 0);
 552
 553        if (!conn->ksnc_closing) {
 554                switch (rc) {
 555                case -ECONNRESET:
 556                        LCONSOLE_WARN("Host %u.%u.%u.%u reset our connection "
 557                                      "while we were sending data; it may have "
 558                                      "rebooted.\n",
 559                                      HIPQUAD(conn->ksnc_ipaddr));
 560                        break;
 561                default:
 562                        LCONSOLE_WARN("There was an unexpected network error "
 563                                      "while writing to %u.%u.%u.%u: %d.\n",
 564                                      HIPQUAD(conn->ksnc_ipaddr), rc);
 565                        break;
 566                }
 567                CDEBUG(D_NET, "[%p] Error %d on write to %s"
 568                       " ip %d.%d.%d.%d:%d\n", conn, rc,
 569                       libcfs_id2str(conn->ksnc_peer->ksnp_id),
 570                       HIPQUAD(conn->ksnc_ipaddr),
 571                       conn->ksnc_port);
 572        }
 573
 574        if (tx->tx_zc_checked)
 575                ksocknal_uncheck_zc_req(tx);
 576
 577        /* it's not an error if conn is being closed */
 578        ksocknal_close_conn_and_siblings (conn,
 579                                          (conn->ksnc_closing) ? 0 : rc);
 580
 581        return (rc);
 582}
 583
 584void
 585ksocknal_launch_connection_locked (ksock_route_t *route)
 586{
 587
 588        /* called holding write lock on ksnd_global_lock */
 589
 590        LASSERT (!route->ksnr_scheduled);
 591        LASSERT (!route->ksnr_connecting);
 592        LASSERT ((ksocknal_route_mask() & ~route->ksnr_connected) != 0);
 593
 594        route->ksnr_scheduled = 1;            /* scheduling conn for connd */
 595        ksocknal_route_addref(route);      /* extra ref for connd */
 596
 597        spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
 598
 599        list_add_tail(&route->ksnr_connd_list,
 600                          &ksocknal_data.ksnd_connd_routes);
 601        wake_up(&ksocknal_data.ksnd_connd_waitq);
 602
 603        spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
 604}
 605
 606void
 607ksocknal_launch_all_connections_locked (ksock_peer_t *peer)
 608{
 609        ksock_route_t *route;
 610
 611        /* called holding write lock on ksnd_global_lock */
 612        for (;;) {
 613                /* launch any/all connections that need it */
 614                route = ksocknal_find_connectable_route_locked(peer);
 615                if (route == NULL)
 616                        return;
 617
 618                ksocknal_launch_connection_locked(route);
 619        }
 620}
 621
 622ksock_conn_t *
 623ksocknal_find_conn_locked(ksock_peer_t *peer, ksock_tx_t *tx, int nonblk)
 624{
 625        struct list_head       *tmp;
 626        ksock_conn_t     *conn;
 627        ksock_conn_t     *typed = NULL;
 628        ksock_conn_t     *fallback = NULL;
 629        int            tnob     = 0;
 630        int            fnob     = 0;
 631
 632        list_for_each (tmp, &peer->ksnp_conns) {
 633                ksock_conn_t *c  = list_entry(tmp, ksock_conn_t, ksnc_list);
 634                int        nob = atomic_read(&c->ksnc_tx_nob) +
 635                                    cfs_sock_wmem_queued(c->ksnc_sock);
 636                int        rc;
 637
 638                LASSERT (!c->ksnc_closing);
 639                LASSERT (c->ksnc_proto != NULL &&
 640                         c->ksnc_proto->pro_match_tx != NULL);
 641
 642                rc = c->ksnc_proto->pro_match_tx(c, tx, nonblk);
 643
 644                switch (rc) {
 645                default:
 646                        LBUG();
 647                case SOCKNAL_MATCH_NO: /* protocol rejected the tx */
 648                        continue;
 649
 650                case SOCKNAL_MATCH_YES: /* typed connection */
 651                        if (typed == NULL || tnob > nob ||
 652                            (tnob == nob && *ksocknal_tunables.ksnd_round_robin &&
 653                             cfs_time_after(typed->ksnc_tx_last_post, c->ksnc_tx_last_post))) {
 654                                typed = c;
 655                                tnob  = nob;
 656                        }
 657                        break;
 658
 659                case SOCKNAL_MATCH_MAY: /* fallback connection */
 660                        if (fallback == NULL || fnob > nob ||
 661                            (fnob == nob && *ksocknal_tunables.ksnd_round_robin &&
 662                             cfs_time_after(fallback->ksnc_tx_last_post, c->ksnc_tx_last_post))) {
 663                                fallback = c;
 664                                fnob     = nob;
 665                        }
 666                        break;
 667                }
 668        }
 669
 670        /* prefer the typed selection */
 671        conn = (typed != NULL) ? typed : fallback;
 672
 673        if (conn != NULL)
 674                conn->ksnc_tx_last_post = cfs_time_current();
 675
 676        return conn;
 677}
 678
 679void
 680ksocknal_tx_prep(ksock_conn_t *conn, ksock_tx_t *tx)
 681{
 682        conn->ksnc_proto->pro_pack(tx);
 683
 684        atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
 685        ksocknal_conn_addref(conn); /* +1 ref for tx */
 686        tx->tx_conn = conn;
 687}
 688
 689void
 690ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
 691{
 692        ksock_sched_t *sched = conn->ksnc_scheduler;
 693        ksock_msg_t   *msg = &tx->tx_msg;
 694        ksock_tx_t    *ztx = NULL;
 695        int         bufnob = 0;
 696
 697        /* called holding global lock (read or irq-write) and caller may
 698         * not have dropped this lock between finding conn and calling me,
 699         * so we don't need the {get,put}connsock dance to deref
 700         * ksnc_sock... */
 701        LASSERT(!conn->ksnc_closing);
 702
 703        CDEBUG (D_NET, "Sending to %s ip %d.%d.%d.%d:%d\n",
 704                libcfs_id2str(conn->ksnc_peer->ksnp_id),
 705                HIPQUAD(conn->ksnc_ipaddr),
 706                conn->ksnc_port);
 707
 708        ksocknal_tx_prep(conn, tx);
 709
 710        /* Ensure the frags we've been given EXACTLY match the number of
 711         * bytes we want to send.  Many TCP/IP stacks disregard any total
 712         * size parameters passed to them and just look at the frags.
 713         *
 714         * We always expect at least 1 mapped fragment containing the
 715         * complete ksocknal message header. */
 716        LASSERT (lnet_iov_nob (tx->tx_niov, tx->tx_iov) +
 717                 lnet_kiov_nob(tx->tx_nkiov, tx->tx_kiov) ==
 718                 (unsigned int)tx->tx_nob);
 719        LASSERT (tx->tx_niov >= 1);
 720        LASSERT (tx->tx_resid == tx->tx_nob);
 721
 722        CDEBUG (D_NET, "Packet %p type %d, nob %d niov %d nkiov %d\n",
 723                tx, (tx->tx_lnetmsg != NULL) ? tx->tx_lnetmsg->msg_hdr.type:
 724                                               KSOCK_MSG_NOOP,
 725                tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
 726
 727        /*
 728         * FIXME: SOCK_WMEM_QUEUED and SOCK_ERROR could block in __DARWIN8__
 729         * but they're used inside spinlocks a lot.
 730         */
 731        bufnob = cfs_sock_wmem_queued(conn->ksnc_sock);
 732        spin_lock_bh(&sched->kss_lock);
 733
 734        if (list_empty(&conn->ksnc_tx_queue) && bufnob == 0) {
 735                /* First packet starts the timeout */
 736                conn->ksnc_tx_deadline =
 737                        cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
 738                if (conn->ksnc_tx_bufnob > 0) /* something got ACKed */
 739                        conn->ksnc_peer->ksnp_last_alive = cfs_time_current();
 740                conn->ksnc_tx_bufnob = 0;
 741                mb(); /* order with adding to tx_queue */
 742        }
 743
 744        if (msg->ksm_type == KSOCK_MSG_NOOP) {
 745                /* The packet is noop ZC ACK, try to piggyback the ack_cookie
 746                 * on a normal packet so I don't need to send it */
 747                LASSERT (msg->ksm_zc_cookies[1] != 0);
 748                LASSERT (conn->ksnc_proto->pro_queue_tx_zcack != NULL);
 749
 750                if (conn->ksnc_proto->pro_queue_tx_zcack(conn, tx, 0))
 751                        ztx = tx; /* ZC ACK piggybacked on ztx release tx later */
 752
 753        } else {
 754                /* It's a normal packet - can it piggback a noop zc-ack that
 755                 * has been queued already? */
 756                LASSERT (msg->ksm_zc_cookies[1] == 0);
 757                LASSERT (conn->ksnc_proto->pro_queue_tx_msg != NULL);
 758
 759                ztx = conn->ksnc_proto->pro_queue_tx_msg(conn, tx);
 760                /* ztx will be released later */
 761        }
 762
 763        if (ztx != NULL) {
 764                atomic_sub (ztx->tx_nob, &conn->ksnc_tx_nob);
 765                list_add_tail(&ztx->tx_list, &sched->kss_zombie_noop_txs);
 766        }
 767
 768        if (conn->ksnc_tx_ready &&      /* able to send */
 769            !conn->ksnc_tx_scheduled) { /* not scheduled to send */
 770                /* +1 ref for scheduler */
 771                ksocknal_conn_addref(conn);
 772                list_add_tail (&conn->ksnc_tx_list,
 773                                   &sched->kss_tx_conns);
 774                conn->ksnc_tx_scheduled = 1;
 775                wake_up (&sched->kss_waitq);
 776        }
 777
 778        spin_unlock_bh(&sched->kss_lock);
 779}
 780
 781
 782ksock_route_t *
 783ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
 784{
 785        cfs_time_t     now = cfs_time_current();
 786        struct list_head    *tmp;
 787        ksock_route_t *route;
 788
 789        list_for_each (tmp, &peer->ksnp_routes) {
 790                route = list_entry (tmp, ksock_route_t, ksnr_list);
 791
 792                LASSERT (!route->ksnr_connecting || route->ksnr_scheduled);
 793
 794                if (route->ksnr_scheduled)      /* connections being established */
 795                        continue;
 796
 797                /* all route types connected ? */
 798                if ((ksocknal_route_mask() & ~route->ksnr_connected) == 0)
 799                        continue;
 800
 801                if (!(route->ksnr_retry_interval == 0 || /* first attempt */
 802                      cfs_time_aftereq(now, route->ksnr_timeout))) {
 803                        CDEBUG(D_NET,
 804                               "Too soon to retry route %u.%u.%u.%u "
 805                               "(cnted %d, interval %ld, %ld secs later)\n",
 806                               HIPQUAD(route->ksnr_ipaddr),
 807                               route->ksnr_connected,
 808                               route->ksnr_retry_interval,
 809                               cfs_duration_sec(route->ksnr_timeout - now));
 810                        continue;
 811                }
 812
 813                return (route);
 814        }
 815
 816        return (NULL);
 817}
 818
 819ksock_route_t *
 820ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
 821{
 822        struct list_head        *tmp;
 823        ksock_route_t     *route;
 824
 825        list_for_each (tmp, &peer->ksnp_routes) {
 826                route = list_entry (tmp, ksock_route_t, ksnr_list);
 827
 828                LASSERT (!route->ksnr_connecting || route->ksnr_scheduled);
 829
 830                if (route->ksnr_scheduled)
 831                        return (route);
 832        }
 833
 834        return (NULL);
 835}
 836
 837int
 838ksocknal_launch_packet (lnet_ni_t *ni, ksock_tx_t *tx, lnet_process_id_t id)
 839{
 840        ksock_peer_t     *peer;
 841        ksock_conn_t     *conn;
 842        rwlock_t     *g_lock;
 843        int            retry;
 844        int            rc;
 845
 846        LASSERT (tx->tx_conn == NULL);
 847
 848        g_lock = &ksocknal_data.ksnd_global_lock;
 849
 850        for (retry = 0;; retry = 1) {
 851                read_lock(g_lock);
 852                peer = ksocknal_find_peer_locked(ni, id);
 853                if (peer != NULL) {
 854                        if (ksocknal_find_connectable_route_locked(peer) == NULL) {
 855                                conn = ksocknal_find_conn_locked(peer, tx, tx->tx_nonblk);
 856                                if (conn != NULL) {
 857                                        /* I've got no routes that need to be
 858                                         * connecting and I do have an actual
 859                                         * connection... */
 860                                        ksocknal_queue_tx_locked (tx, conn);
 861                                        read_unlock(g_lock);
 862                                        return (0);
 863                                }
 864                        }
 865                }
 866
 867                /* I'll need a write lock... */
 868                read_unlock(g_lock);
 869
 870                write_lock_bh(g_lock);
 871
 872                peer = ksocknal_find_peer_locked(ni, id);
 873                if (peer != NULL)
 874                        break;
 875
 876                write_unlock_bh(g_lock);
 877
 878                if ((id.pid & LNET_PID_USERFLAG) != 0) {
 879                        CERROR("Refusing to create a connection to "
 880                               "userspace process %s\n", libcfs_id2str(id));
 881                        return -EHOSTUNREACH;
 882                }
 883
 884                if (retry) {
 885                        CERROR("Can't find peer %s\n", libcfs_id2str(id));
 886                        return -EHOSTUNREACH;
 887                }
 888
 889                rc = ksocknal_add_peer(ni, id,
 890                                       LNET_NIDADDR(id.nid),
 891                                       lnet_acceptor_port());
 892                if (rc != 0) {
 893                        CERROR("Can't add peer %s: %d\n",
 894                               libcfs_id2str(id), rc);
 895                        return rc;
 896                }
 897        }
 898
 899        ksocknal_launch_all_connections_locked(peer);
 900
 901        conn = ksocknal_find_conn_locked(peer, tx, tx->tx_nonblk);
 902        if (conn != NULL) {
 903                /* Connection exists; queue message on it */
 904                ksocknal_queue_tx_locked (tx, conn);
 905                write_unlock_bh(g_lock);
 906                return (0);
 907        }
 908
 909        if (peer->ksnp_accepting > 0 ||
 910            ksocknal_find_connecting_route_locked (peer) != NULL) {
 911                /* the message is going to be pinned to the peer */
 912                tx->tx_deadline =
 913                        cfs_time_shift(*ksocknal_tunables.ksnd_timeout);
 914
 915                /* Queue the message until a connection is established */
 916                list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
 917                write_unlock_bh(g_lock);
 918                return 0;
 919        }
 920
 921        write_unlock_bh(g_lock);
 922
 923        /* NB Routes may be ignored if connections to them failed recently */
 924        CNETERR("No usable routes to %s\n", libcfs_id2str(id));
 925        return (-EHOSTUNREACH);
 926}
 927
 928int
 929ksocknal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
 930{
 931        int            mpflag = 0;
 932        int            type = lntmsg->msg_type;
 933        lnet_process_id_t target = lntmsg->msg_target;
 934        unsigned int      payload_niov = lntmsg->msg_niov;
 935        struct iovec     *payload_iov = lntmsg->msg_iov;
 936        lnet_kiov_t      *payload_kiov = lntmsg->msg_kiov;
 937        unsigned int      payload_offset = lntmsg->msg_offset;
 938        unsigned int      payload_nob = lntmsg->msg_len;
 939        ksock_tx_t       *tx;
 940        int            desc_size;
 941        int            rc;
 942
 943        /* NB 'private' is different depending on what we're sending.
 944         * Just ignore it... */
 945
 946        CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",
 947               payload_nob, payload_niov, libcfs_id2str(target));
 948
 949        LASSERT (payload_nob == 0 || payload_niov > 0);
 950        LASSERT (payload_niov <= LNET_MAX_IOV);
 951        /* payload is either all vaddrs or all pages */
 952        LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
 953        LASSERT (!in_interrupt ());
 954
 955        if (payload_iov != NULL)
 956                desc_size = offsetof(ksock_tx_t,
 957                                     tx_frags.virt.iov[1 + payload_niov]);
 958        else
 959                desc_size = offsetof(ksock_tx_t,
 960                                     tx_frags.paged.kiov[payload_niov]);
 961
 962        if (lntmsg->msg_vmflush)
 963                mpflag = cfs_memory_pressure_get_and_set();
 964        tx = ksocknal_alloc_tx(KSOCK_MSG_LNET, desc_size);
 965        if (tx == NULL) {
 966                CERROR("Can't allocate tx desc type %d size %d\n",
 967                       type, desc_size);
 968                if (lntmsg->msg_vmflush)
 969                        cfs_memory_pressure_restore(mpflag);
 970                return (-ENOMEM);
 971        }
 972
 973        tx->tx_conn = NULL;                  /* set when assigned a conn */
 974        tx->tx_lnetmsg = lntmsg;
 975
 976        if (payload_iov != NULL) {
 977                tx->tx_kiov = NULL;
 978                tx->tx_nkiov = 0;
 979                tx->tx_iov = tx->tx_frags.virt.iov;
 980                tx->tx_niov = 1 +
 981                              lnet_extract_iov(payload_niov, &tx->tx_iov[1],
 982                                               payload_niov, payload_iov,
 983                                               payload_offset, payload_nob);
 984        } else {
 985                tx->tx_niov = 1;
 986                tx->tx_iov = &tx->tx_frags.paged.iov;
 987                tx->tx_kiov = tx->tx_frags.paged.kiov;
 988                tx->tx_nkiov = lnet_extract_kiov(payload_niov, tx->tx_kiov,
 989                                                 payload_niov, payload_kiov,
 990                                                 payload_offset, payload_nob);
 991
 992                if (payload_nob >= *ksocknal_tunables.ksnd_zc_min_payload)
 993                        tx->tx_zc_capable = 1;
 994        }
 995
 996        socklnd_init_msg(&tx->tx_msg, KSOCK_MSG_LNET);
 997
 998        /* The first fragment will be set later in pro_pack */
 999        rc = ksocknal_launch_packet(ni, tx, target);
1000        if (lntmsg->msg_vmflush)
1001                cfs_memory_pressure_restore(mpflag);
1002        if (rc == 0)
1003                return (0);
1004
1005        ksocknal_free_tx(tx);
1006        return (-EIO);
1007}
1008
1009int
1010ksocknal_thread_start(int (*fn)(void *arg), void *arg, char *name)
1011{
1012        task_t *task = kthread_run(fn, arg, name);
1013
1014        if (IS_ERR(task))
1015                return PTR_ERR(task);
1016
1017        write_lock_bh(&ksocknal_data.ksnd_global_lock);
1018        ksocknal_data.ksnd_nthreads++;
1019        write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1020        return 0;
1021}
1022
1023void
1024ksocknal_thread_fini (void)
1025{
1026        write_lock_bh(&ksocknal_data.ksnd_global_lock);
1027        ksocknal_data.ksnd_nthreads--;
1028        write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1029}
1030
1031int
1032ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1033{
1034        static char ksocknal_slop_buffer[4096];
1035
1036        int         nob;
1037        unsigned int   niov;
1038        int         skipped;
1039
1040        LASSERT(conn->ksnc_proto != NULL);
1041
1042        if ((*ksocknal_tunables.ksnd_eager_ack & conn->ksnc_type) != 0) {
1043                /* Remind the socket to ack eagerly... */
1044                ksocknal_lib_eager_ack(conn);
1045        }
1046
1047        if (nob_to_skip == 0) {  /* right at next packet boundary now */
1048                conn->ksnc_rx_started = 0;
1049                mb();                  /* racing with timeout thread */
1050
1051                switch (conn->ksnc_proto->pro_version) {
1052                case  KSOCK_PROTO_V2:
1053                case  KSOCK_PROTO_V3:
1054                        conn->ksnc_rx_state = SOCKNAL_RX_KSM_HEADER;
1055                        conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1056                        conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg;
1057
1058                        conn->ksnc_rx_nob_wanted = offsetof(ksock_msg_t, ksm_u);
1059                        conn->ksnc_rx_nob_left = offsetof(ksock_msg_t, ksm_u);
1060                        conn->ksnc_rx_iov[0].iov_len  = offsetof(ksock_msg_t, ksm_u);
1061                        break;
1062
1063                case KSOCK_PROTO_V1:
1064                        /* Receiving bare lnet_hdr_t */
1065                        conn->ksnc_rx_state = SOCKNAL_RX_LNET_HEADER;
1066                        conn->ksnc_rx_nob_wanted = sizeof(lnet_hdr_t);
1067                        conn->ksnc_rx_nob_left = sizeof(lnet_hdr_t);
1068
1069                        conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1070                        conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg.ksm_u.lnetmsg;
1071                        conn->ksnc_rx_iov[0].iov_len  = sizeof (lnet_hdr_t);
1072                        break;
1073
1074                default:
1075                        LBUG ();
1076                }
1077                conn->ksnc_rx_niov = 1;
1078
1079                conn->ksnc_rx_kiov = NULL;
1080                conn->ksnc_rx_nkiov = 0;
1081                conn->ksnc_rx_csum = ~0;
1082                return (1);
1083        }
1084
1085        /* Set up to skip as much as possible now.  If there's more left
1086         * (ran out of iov entries) we'll get called again */
1087
1088        conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1089        conn->ksnc_rx_nob_left = nob_to_skip;
1090        conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1091        skipped = 0;
1092        niov = 0;
1093
1094        do {
1095                nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1096
1097                conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1098                conn->ksnc_rx_iov[niov].iov_len  = nob;
1099                niov++;
1100                skipped += nob;
1101                nob_to_skip -=nob;
1102
1103        } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1104                 niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1105
1106        conn->ksnc_rx_niov = niov;
1107        conn->ksnc_rx_kiov = NULL;
1108        conn->ksnc_rx_nkiov = 0;
1109        conn->ksnc_rx_nob_wanted = skipped;
1110        return (0);
1111}
1112
1113int
1114ksocknal_process_receive (ksock_conn_t *conn)
1115{
1116        lnet_hdr_t      *lhdr;
1117        lnet_process_id_t *id;
1118        int             rc;
1119
1120        LASSERT (atomic_read(&conn->ksnc_conn_refcount) > 0);
1121
1122        /* NB: sched lock NOT held */
1123        /* SOCKNAL_RX_LNET_HEADER is here for backward compatability */
1124        LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_KSM_HEADER ||
1125                 conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD ||
1126                 conn->ksnc_rx_state == SOCKNAL_RX_LNET_HEADER ||
1127                 conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1128 again:
1129        if (conn->ksnc_rx_nob_wanted != 0) {
1130                rc = ksocknal_receive(conn);
1131
1132                if (rc <= 0) {
1133                        LASSERT (rc != -EAGAIN);
1134
1135                        if (rc == 0)
1136                                CDEBUG (D_NET, "[%p] EOF from %s"
1137                                        " ip %d.%d.%d.%d:%d\n", conn,
1138                                        libcfs_id2str(conn->ksnc_peer->ksnp_id),
1139                                        HIPQUAD(conn->ksnc_ipaddr),
1140                                        conn->ksnc_port);
1141                        else if (!conn->ksnc_closing)
1142                                CERROR ("[%p] Error %d on read from %s"
1143                                        " ip %d.%d.%d.%d:%d\n",
1144                                        conn, rc,
1145                                        libcfs_id2str(conn->ksnc_peer->ksnp_id),
1146                                        HIPQUAD(conn->ksnc_ipaddr),
1147                                        conn->ksnc_port);
1148
1149                        /* it's not an error if conn is being closed */
1150                        ksocknal_close_conn_and_siblings (conn,
1151                                                          (conn->ksnc_closing) ? 0 : rc);
1152                        return (rc == 0 ? -ESHUTDOWN : rc);
1153                }
1154
1155                if (conn->ksnc_rx_nob_wanted != 0) {
1156                        /* short read */
1157                        return (-EAGAIN);
1158                }
1159        }
1160        switch (conn->ksnc_rx_state) {
1161        case SOCKNAL_RX_KSM_HEADER:
1162                if (conn->ksnc_flip) {
1163                        __swab32s(&conn->ksnc_msg.ksm_type);
1164                        __swab32s(&conn->ksnc_msg.ksm_csum);
1165                        __swab64s(&conn->ksnc_msg.ksm_zc_cookies[0]);
1166                        __swab64s(&conn->ksnc_msg.ksm_zc_cookies[1]);
1167                }
1168
1169                if (conn->ksnc_msg.ksm_type != KSOCK_MSG_NOOP &&
1170                    conn->ksnc_msg.ksm_type != KSOCK_MSG_LNET) {
1171                        CERROR("%s: Unknown message type: %x\n",
1172                               libcfs_id2str(conn->ksnc_peer->ksnp_id),
1173                               conn->ksnc_msg.ksm_type);
1174                        ksocknal_new_packet(conn, 0);
1175                        ksocknal_close_conn_and_siblings(conn, -EPROTO);
1176                        return (-EPROTO);
1177                }
1178
1179                if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP &&
1180                    conn->ksnc_msg.ksm_csum != 0 &&     /* has checksum */
1181                    conn->ksnc_msg.ksm_csum != conn->ksnc_rx_csum) {
1182                        /* NOOP Checksum error */
1183                        CERROR("%s: Checksum error, wire:0x%08X data:0x%08X\n",
1184                               libcfs_id2str(conn->ksnc_peer->ksnp_id),
1185                               conn->ksnc_msg.ksm_csum, conn->ksnc_rx_csum);
1186                        ksocknal_new_packet(conn, 0);
1187                        ksocknal_close_conn_and_siblings(conn, -EPROTO);
1188                        return (-EIO);
1189                }
1190
1191                if (conn->ksnc_msg.ksm_zc_cookies[1] != 0) {
1192                        __u64 cookie = 0;
1193
1194                        LASSERT (conn->ksnc_proto != &ksocknal_protocol_v1x);
1195
1196                        if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP)
1197                                cookie = conn->ksnc_msg.ksm_zc_cookies[0];
1198
1199                        rc = conn->ksnc_proto->pro_handle_zcack(conn, cookie,
1200                                               conn->ksnc_msg.ksm_zc_cookies[1]);
1201
1202                        if (rc != 0) {
1203                                CERROR("%s: Unknown ZC-ACK cookie: "LPU64", "LPU64"\n",
1204                                       libcfs_id2str(conn->ksnc_peer->ksnp_id),
1205                                       cookie, conn->ksnc_msg.ksm_zc_cookies[1]);
1206                                ksocknal_new_packet(conn, 0);
1207                                ksocknal_close_conn_and_siblings(conn, -EPROTO);
1208                                return (rc);
1209                        }
1210                }
1211
1212                if (conn->ksnc_msg.ksm_type == KSOCK_MSG_NOOP) {
1213                        ksocknal_new_packet (conn, 0);
1214                        return 0;       /* NOOP is done and just return */
1215                }
1216
1217                conn->ksnc_rx_state = SOCKNAL_RX_LNET_HEADER;
1218                conn->ksnc_rx_nob_wanted = sizeof(ksock_lnet_msg_t);
1219                conn->ksnc_rx_nob_left = sizeof(ksock_lnet_msg_t);
1220
1221                conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1222                conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_msg.ksm_u.lnetmsg;
1223                conn->ksnc_rx_iov[0].iov_len  = sizeof(ksock_lnet_msg_t);
1224
1225                conn->ksnc_rx_niov = 1;
1226                conn->ksnc_rx_kiov = NULL;
1227                conn->ksnc_rx_nkiov = 0;
1228
1229                goto again;     /* read lnet header now */
1230
1231        case SOCKNAL_RX_LNET_HEADER:
1232                /* unpack message header */
1233                conn->ksnc_proto->pro_unpack(&conn->ksnc_msg);
1234
1235                if ((conn->ksnc_peer->ksnp_id.pid & LNET_PID_USERFLAG) != 0) {
1236                        /* Userspace peer */
1237                        lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr;
1238                        id   = &conn->ksnc_peer->ksnp_id;
1239
1240                        /* Substitute process ID assigned at connection time */
1241                        lhdr->src_pid = cpu_to_le32(id->pid);
1242                        lhdr->src_nid = cpu_to_le64(id->nid);
1243                }
1244
1245                conn->ksnc_rx_state = SOCKNAL_RX_PARSE;
1246                ksocknal_conn_addref(conn);     /* ++ref while parsing */
1247
1248                rc = lnet_parse(conn->ksnc_peer->ksnp_ni,
1249                                &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr,
1250                                conn->ksnc_peer->ksnp_id.nid, conn, 0);
1251                if (rc < 0) {
1252                        /* I just received garbage: give up on this conn */
1253                        ksocknal_new_packet(conn, 0);
1254                        ksocknal_close_conn_and_siblings (conn, rc);
1255                        ksocknal_conn_decref(conn);
1256                        return (-EPROTO);
1257                }
1258
1259                /* I'm racing with ksocknal_recv() */
1260                LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_PARSE ||
1261                         conn->ksnc_rx_state == SOCKNAL_RX_LNET_PAYLOAD);
1262
1263                if (conn->ksnc_rx_state != SOCKNAL_RX_LNET_PAYLOAD)
1264                        return 0;
1265
1266                /* ksocknal_recv() got called */
1267                goto again;
1268
1269        case SOCKNAL_RX_LNET_PAYLOAD:
1270                /* payload all received */
1271                rc = 0;
1272
1273                if (conn->ksnc_rx_nob_left == 0 &&   /* not truncating */
1274                    conn->ksnc_msg.ksm_csum != 0 &&  /* has checksum */
1275                    conn->ksnc_msg.ksm_csum != conn->ksnc_rx_csum) {
1276                        CERROR("%s: Checksum error, wire:0x%08X data:0x%08X\n",
1277                               libcfs_id2str(conn->ksnc_peer->ksnp_id),
1278                               conn->ksnc_msg.ksm_csum, conn->ksnc_rx_csum);
1279                        rc = -EIO;
1280                }
1281
1282                if (rc == 0 && conn->ksnc_msg.ksm_zc_cookies[0] != 0) {
1283                        LASSERT(conn->ksnc_proto != &ksocknal_protocol_v1x);
1284
1285                        lhdr = &conn->ksnc_msg.ksm_u.lnetmsg.ksnm_hdr;
1286                        id   = &conn->ksnc_peer->ksnp_id;
1287
1288                        rc = conn->ksnc_proto->pro_handle_zcreq(conn,
1289                                        conn->ksnc_msg.ksm_zc_cookies[0],
1290                                        *ksocknal_tunables.ksnd_nonblk_zcack ||
1291                                        le64_to_cpu(lhdr->src_nid) != id->nid);
1292                }
1293
1294                lnet_finalize(conn->ksnc_peer->ksnp_ni, conn->ksnc_cookie, rc);
1295
1296                if (rc != 0) {
1297                        ksocknal_new_packet(conn, 0);
1298                        ksocknal_close_conn_and_siblings (conn, rc);
1299                        return (-EPROTO);
1300                }
1301                /* Fall through */
1302
1303        case SOCKNAL_RX_SLOP:
1304                /* starting new packet? */
1305                if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1306                        return 0;       /* come back later */
1307                goto again;          /* try to finish reading slop now */
1308
1309        default:
1310                break;
1311        }
1312
1313        /* Not Reached */
1314        LBUG ();
1315        return (-EINVAL);                      /* keep gcc happy */
1316}
1317
1318int
1319ksocknal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *msg, int delayed,
1320               unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
1321               unsigned int offset, unsigned int mlen, unsigned int rlen)
1322{
1323        ksock_conn_t  *conn = (ksock_conn_t *)private;
1324        ksock_sched_t *sched = conn->ksnc_scheduler;
1325
1326        LASSERT (mlen <= rlen);
1327        LASSERT (niov <= LNET_MAX_IOV);
1328
1329        conn->ksnc_cookie = msg;
1330        conn->ksnc_rx_nob_wanted = mlen;
1331        conn->ksnc_rx_nob_left   = rlen;
1332
1333        if (mlen == 0 || iov != NULL) {
1334                conn->ksnc_rx_nkiov = 0;
1335                conn->ksnc_rx_kiov = NULL;
1336                conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1337                conn->ksnc_rx_niov =
1338                        lnet_extract_iov(LNET_MAX_IOV, conn->ksnc_rx_iov,
1339                                         niov, iov, offset, mlen);
1340        } else {
1341                conn->ksnc_rx_niov = 0;
1342                conn->ksnc_rx_iov  = NULL;
1343                conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1344                conn->ksnc_rx_nkiov =
1345                        lnet_extract_kiov(LNET_MAX_IOV, conn->ksnc_rx_kiov,
1346                                          niov, kiov, offset, mlen);
1347        }
1348
1349        LASSERT (mlen ==
1350                 lnet_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1351                 lnet_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1352
1353        LASSERT (conn->ksnc_rx_scheduled);
1354
1355        spin_lock_bh(&sched->kss_lock);
1356
1357        switch (conn->ksnc_rx_state) {
1358        case SOCKNAL_RX_PARSE_WAIT:
1359                list_add_tail(&conn->ksnc_rx_list, &sched->kss_rx_conns);
1360                wake_up (&sched->kss_waitq);
1361                LASSERT (conn->ksnc_rx_ready);
1362                break;
1363
1364        case SOCKNAL_RX_PARSE:
1365                /* scheduler hasn't noticed I'm parsing yet */
1366                break;
1367        }
1368
1369        conn->ksnc_rx_state = SOCKNAL_RX_LNET_PAYLOAD;
1370
1371        spin_unlock_bh(&sched->kss_lock);
1372        ksocknal_conn_decref(conn);
1373        return 0;
1374}
1375
1376static inline int
1377ksocknal_sched_cansleep(ksock_sched_t *sched)
1378{
1379        int        rc;
1380
1381        spin_lock_bh(&sched->kss_lock);
1382
1383        rc = (!ksocknal_data.ksnd_shuttingdown &&
1384              list_empty(&sched->kss_rx_conns) &&
1385              list_empty(&sched->kss_tx_conns));
1386
1387        spin_unlock_bh(&sched->kss_lock);
1388        return rc;
1389}
1390
1391int ksocknal_scheduler(void *arg)
1392{
1393        struct ksock_sched_info *info;
1394        ksock_sched_t           *sched;
1395        ksock_conn_t            *conn;
1396        ksock_tx_t              *tx;
1397        int                     rc;
1398        int                     nloops = 0;
1399        long                    id = (long)arg;
1400
1401        info = ksocknal_data.ksnd_sched_info[KSOCK_THREAD_CPT(id)];
1402        sched = &info->ksi_scheds[KSOCK_THREAD_SID(id)];
1403
1404        cfs_block_allsigs();
1405
1406        rc = cfs_cpt_bind(lnet_cpt_table(), info->ksi_cpt);
1407        if (rc != 0) {
1408                CERROR("Can't set CPT affinity to %d: %d\n",
1409                       info->ksi_cpt, rc);
1410        }
1411
1412        spin_lock_bh(&sched->kss_lock);
1413
1414        while (!ksocknal_data.ksnd_shuttingdown) {
1415                int did_something = 0;
1416
1417                /* Ensure I progress everything semi-fairly */
1418
1419                if (!list_empty (&sched->kss_rx_conns)) {
1420                        conn = list_entry(sched->kss_rx_conns.next,
1421                                              ksock_conn_t, ksnc_rx_list);
1422                        list_del(&conn->ksnc_rx_list);
1423
1424                        LASSERT(conn->ksnc_rx_scheduled);
1425                        LASSERT(conn->ksnc_rx_ready);
1426
1427                        /* clear rx_ready in case receive isn't complete.
1428                         * Do it BEFORE we call process_recv, since
1429                         * data_ready can set it any time after we release
1430                         * kss_lock. */
1431                        conn->ksnc_rx_ready = 0;
1432                        spin_unlock_bh(&sched->kss_lock);
1433
1434                        rc = ksocknal_process_receive(conn);
1435
1436                        spin_lock_bh(&sched->kss_lock);
1437
1438                        /* I'm the only one that can clear this flag */
1439                        LASSERT(conn->ksnc_rx_scheduled);
1440
1441                        /* Did process_receive get everything it wanted? */
1442                        if (rc == 0)
1443                                conn->ksnc_rx_ready = 1;
1444
1445                        if (conn->ksnc_rx_state == SOCKNAL_RX_PARSE) {
1446                                /* Conn blocked waiting for ksocknal_recv()
1447                                 * I change its state (under lock) to signal
1448                                 * it can be rescheduled */
1449                                conn->ksnc_rx_state = SOCKNAL_RX_PARSE_WAIT;
1450                        } else if (conn->ksnc_rx_ready) {
1451                                /* reschedule for rx */
1452                                list_add_tail (&conn->ksnc_rx_list,
1453                                                   &sched->kss_rx_conns);
1454                        } else {
1455                                conn->ksnc_rx_scheduled = 0;
1456                                /* drop my ref */
1457                                ksocknal_conn_decref(conn);
1458                        }
1459
1460                        did_something = 1;
1461                }
1462
1463                if (!list_empty (&sched->kss_tx_conns)) {
1464                        LIST_HEAD    (zlist);
1465
1466                        if (!list_empty(&sched->kss_zombie_noop_txs)) {
1467                                list_add(&zlist,
1468                                             &sched->kss_zombie_noop_txs);
1469                                list_del_init(&sched->kss_zombie_noop_txs);
1470                        }
1471
1472                        conn = list_entry(sched->kss_tx_conns.next,
1473                                              ksock_conn_t, ksnc_tx_list);
1474                        list_del (&conn->ksnc_tx_list);
1475
1476                        LASSERT(conn->ksnc_tx_scheduled);
1477                        LASSERT(conn->ksnc_tx_ready);
1478                        LASSERT(!list_empty(&conn->ksnc_tx_queue));
1479
1480                        tx = list_entry(conn->ksnc_tx_queue.next,
1481                                            ksock_tx_t, tx_list);
1482
1483                        if (conn->ksnc_tx_carrier == tx)
1484                                ksocknal_next_tx_carrier(conn);
1485
1486                        /* dequeue now so empty list => more to send */
1487                        list_del(&tx->tx_list);
1488
1489                        /* Clear tx_ready in case send isn't complete.  Do
1490                         * it BEFORE we call process_transmit, since
1491                         * write_space can set it any time after we release
1492                         * kss_lock. */
1493                        conn->ksnc_tx_ready = 0;
1494                        spin_unlock_bh(&sched->kss_lock);
1495
1496                        if (!list_empty(&zlist)) {
1497                                /* free zombie noop txs, it's fast because
1498                                 * noop txs are just put in freelist */
1499                                ksocknal_txlist_done(NULL, &zlist, 0);
1500                        }
1501
1502                        rc = ksocknal_process_transmit(conn, tx);
1503
1504                        if (rc == -ENOMEM || rc == -EAGAIN) {
1505                                /* Incomplete send: replace tx on HEAD of tx_queue */
1506                                spin_lock_bh(&sched->kss_lock);
1507                                list_add(&tx->tx_list,
1508                                             &conn->ksnc_tx_queue);
1509                        } else {
1510                                /* Complete send; tx -ref */
1511                                ksocknal_tx_decref(tx);
1512
1513                                spin_lock_bh(&sched->kss_lock);
1514                                /* assume space for more */
1515                                conn->ksnc_tx_ready = 1;
1516                        }
1517
1518                        if (rc == -ENOMEM) {
1519                                /* Do nothing; after a short timeout, this
1520                                 * conn will be reposted on kss_tx_conns. */
1521                        } else if (conn->ksnc_tx_ready &&
1522                                   !list_empty (&conn->ksnc_tx_queue)) {
1523                                /* reschedule for tx */
1524                                list_add_tail (&conn->ksnc_tx_list,
1525                                                   &sched->kss_tx_conns);
1526                        } else {
1527                                conn->ksnc_tx_scheduled = 0;
1528                                /* drop my ref */
1529                                ksocknal_conn_decref(conn);
1530                        }
1531
1532                        did_something = 1;
1533                }
1534                if (!did_something ||      /* nothing to do */
1535                    ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1536                        spin_unlock_bh(&sched->kss_lock);
1537
1538                        nloops = 0;
1539
1540                        if (!did_something) {   /* wait for something to do */
1541                                cfs_wait_event_interruptible_exclusive(
1542                                        sched->kss_waitq,
1543                                        !ksocknal_sched_cansleep(sched), rc);
1544                                LASSERT (rc == 0);
1545                        } else {
1546                                cond_resched();
1547                        }
1548
1549                        spin_lock_bh(&sched->kss_lock);
1550                }
1551        }
1552
1553        spin_unlock_bh(&sched->kss_lock);
1554        ksocknal_thread_fini();
1555        return 0;
1556}
1557
1558/*
1559 * Add connection to kss_rx_conns of scheduler
1560 * and wakeup the scheduler.
1561 */
1562void ksocknal_read_callback (ksock_conn_t *conn)
1563{
1564        ksock_sched_t *sched;
1565        ENTRY;
1566
1567        sched = conn->ksnc_scheduler;
1568
1569        spin_lock_bh(&sched->kss_lock);
1570
1571        conn->ksnc_rx_ready = 1;
1572
1573        if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1574                list_add_tail(&conn->ksnc_rx_list,
1575                                  &sched->kss_rx_conns);
1576                conn->ksnc_rx_scheduled = 1;
1577                /* extra ref for scheduler */
1578                ksocknal_conn_addref(conn);
1579
1580                wake_up (&sched->kss_waitq);
1581        }
1582        spin_unlock_bh(&sched->kss_lock);
1583
1584        EXIT;
1585}
1586
1587/*
1588 * Add connection to kss_tx_conns of scheduler
1589 * and wakeup the scheduler.
1590 */
1591void ksocknal_write_callback (ksock_conn_t *conn)
1592{
1593        ksock_sched_t *sched;
1594        ENTRY;
1595
1596        sched = conn->ksnc_scheduler;
1597
1598        spin_lock_bh(&sched->kss_lock);
1599
1600        conn->ksnc_tx_ready = 1;
1601
1602        if (!conn->ksnc_tx_scheduled && // not being progressed
1603            !list_empty(&conn->ksnc_tx_queue)){//packets to send
1604                list_add_tail (&conn->ksnc_tx_list,
1605                                   &sched->kss_tx_conns);
1606                conn->ksnc_tx_scheduled = 1;
1607                /* extra ref for scheduler */
1608                ksocknal_conn_addref(conn);
1609
1610                wake_up (&sched->kss_waitq);
1611        }
1612
1613        spin_unlock_bh(&sched->kss_lock);
1614
1615        EXIT;
1616}
1617
1618ksock_proto_t *
1619ksocknal_parse_proto_version (ksock_hello_msg_t *hello)
1620{
1621        __u32   version = 0;
1622
1623        if (hello->kshm_magic == LNET_PROTO_MAGIC)
1624                version = hello->kshm_version;
1625        else if (hello->kshm_magic == __swab32(LNET_PROTO_MAGIC))
1626                version = __swab32(hello->kshm_version);
1627
1628        if (version != 0) {
1629#if SOCKNAL_VERSION_DEBUG
1630                if (*ksocknal_tunables.ksnd_protocol == 1)
1631                        return NULL;
1632
1633                if (*ksocknal_tunables.ksnd_protocol == 2 &&
1634                    version == KSOCK_PROTO_V3)
1635                        return NULL;
1636#endif
1637                if (version == KSOCK_PROTO_V2)
1638                        return &ksocknal_protocol_v2x;
1639
1640                if (version == KSOCK_PROTO_V3)
1641                        return &ksocknal_protocol_v3x;
1642
1643                return NULL;
1644        }
1645
1646        if (hello->kshm_magic == le32_to_cpu(LNET_PROTO_TCP_MAGIC)) {
1647                lnet_magicversion_t *hmv = (lnet_magicversion_t *)hello;
1648
1649                CLASSERT (sizeof (lnet_magicversion_t) ==
1650                          offsetof (ksock_hello_msg_t, kshm_src_nid));
1651
1652                if (hmv->version_major == cpu_to_le16 (KSOCK_PROTO_V1_MAJOR) &&
1653                    hmv->version_minor == cpu_to_le16 (KSOCK_PROTO_V1_MINOR))
1654                        return &ksocknal_protocol_v1x;
1655        }
1656
1657        return NULL;
1658}
1659
1660int
1661ksocknal_send_hello (lnet_ni_t *ni, ksock_conn_t *conn,
1662                     lnet_nid_t peer_nid, ksock_hello_msg_t *hello)
1663{
1664        /* CAVEAT EMPTOR: this byte flips 'ipaddrs' */
1665        ksock_net_t      *net = (ksock_net_t *)ni->ni_data;
1666
1667        LASSERT (hello->kshm_nips <= LNET_MAX_INTERFACES);
1668
1669        /* rely on caller to hold a ref on socket so it wouldn't disappear */
1670        LASSERT (conn->ksnc_proto != NULL);
1671
1672        hello->kshm_src_nid      = ni->ni_nid;
1673        hello->kshm_dst_nid      = peer_nid;
1674        hello->kshm_src_pid      = the_lnet.ln_pid;
1675
1676        hello->kshm_src_incarnation = net->ksnn_incarnation;
1677        hello->kshm_ctype          = conn->ksnc_type;
1678
1679        return conn->ksnc_proto->pro_send_hello(conn, hello);
1680}
1681
1682int
1683ksocknal_invert_type(int type)
1684{
1685        switch (type)
1686        {
1687        case SOCKLND_CONN_ANY:
1688        case SOCKLND_CONN_CONTROL:
1689                return (type);
1690        case SOCKLND_CONN_BULK_IN:
1691                return SOCKLND_CONN_BULK_OUT;
1692        case SOCKLND_CONN_BULK_OUT:
1693                return SOCKLND_CONN_BULK_IN;
1694        default:
1695                return (SOCKLND_CONN_NONE);
1696        }
1697}
1698
1699int
1700ksocknal_recv_hello (lnet_ni_t *ni, ksock_conn_t *conn,
1701                     ksock_hello_msg_t *hello, lnet_process_id_t *peerid,
1702                     __u64 *incarnation)
1703{
1704        /* Return < 0   fatal error
1705         *      0         success
1706         *      EALREADY   lost connection race
1707         *      EPROTO     protocol version mismatch
1708         */
1709        socket_t        *sock = conn->ksnc_sock;
1710        int               active = (conn->ksnc_proto != NULL);
1711        int               timeout;
1712        int               proto_match;
1713        int               rc;
1714        ksock_proto_t       *proto;
1715        lnet_process_id_t    recv_id;
1716
1717        /* socket type set on active connections - not set on passive */
1718        LASSERT (!active == !(conn->ksnc_type != SOCKLND_CONN_NONE));
1719
1720        timeout = active ? *ksocknal_tunables.ksnd_timeout :
1721                            lnet_acceptor_timeout();
1722
1723        rc = libcfs_sock_read(sock, &hello->kshm_magic, sizeof (hello->kshm_magic), timeout);
1724        if (rc != 0) {
1725                CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n",
1726                        rc, HIPQUAD(conn->ksnc_ipaddr));
1727                LASSERT (rc < 0);
1728                return rc;
1729        }
1730
1731        if (hello->kshm_magic != LNET_PROTO_MAGIC &&
1732            hello->kshm_magic != __swab32(LNET_PROTO_MAGIC) &&
1733            hello->kshm_magic != le32_to_cpu (LNET_PROTO_TCP_MAGIC)) {
1734                /* Unexpected magic! */
1735                CERROR ("Bad magic(1) %#08x (%#08x expected) from "
1736                        "%u.%u.%u.%u\n", __cpu_to_le32 (hello->kshm_magic),
1737                        LNET_PROTO_TCP_MAGIC,
1738                        HIPQUAD(conn->ksnc_ipaddr));
1739                return -EPROTO;
1740        }
1741
1742        rc = libcfs_sock_read(sock, &hello->kshm_version,
1743                              sizeof(hello->kshm_version), timeout);
1744        if (rc != 0) {
1745                CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n",
1746                        rc, HIPQUAD(conn->ksnc_ipaddr));
1747                LASSERT (rc < 0);
1748                return rc;
1749        }
1750
1751        proto = ksocknal_parse_proto_version(hello);
1752        if (proto == NULL) {
1753                if (!active) {
1754                        /* unknown protocol from peer, tell peer my protocol */
1755                        conn->ksnc_proto = &ksocknal_protocol_v3x;
1756#if SOCKNAL_VERSION_DEBUG
1757                        if (*ksocknal_tunables.ksnd_protocol == 2)
1758                                conn->ksnc_proto = &ksocknal_protocol_v2x;
1759                        else if (*ksocknal_tunables.ksnd_protocol == 1)
1760                                conn->ksnc_proto = &ksocknal_protocol_v1x;
1761#endif
1762                        hello->kshm_nips = 0;
1763                        ksocknal_send_hello(ni, conn, ni->ni_nid, hello);
1764                }
1765
1766                CERROR ("Unknown protocol version (%d.x expected)"
1767                        " from %u.%u.%u.%u\n",
1768                        conn->ksnc_proto->pro_version,
1769                        HIPQUAD(conn->ksnc_ipaddr));
1770
1771                return -EPROTO;
1772        }
1773
1774        proto_match = (conn->ksnc_proto == proto);
1775        conn->ksnc_proto = proto;
1776
1777        /* receive the rest of hello message anyway */
1778        rc = conn->ksnc_proto->pro_recv_hello(conn, hello, timeout);
1779        if (rc != 0) {
1780                CERROR("Error %d reading or checking hello from from %u.%u.%u.%u\n",
1781                       rc, HIPQUAD(conn->ksnc_ipaddr));
1782                LASSERT (rc < 0);
1783                return rc;
1784        }
1785
1786        *incarnation = hello->kshm_src_incarnation;
1787
1788        if (hello->kshm_src_nid == LNET_NID_ANY) {
1789                CERROR("Expecting a HELLO hdr with a NID, but got LNET_NID_ANY"
1790                       "from %u.%u.%u.%u\n", HIPQUAD(conn->ksnc_ipaddr));
1791                return -EPROTO;
1792        }
1793
1794        if (!active &&
1795            conn->ksnc_port > LNET_ACCEPTOR_MAX_RESERVED_PORT) {
1796                /* Userspace NAL assigns peer process ID from socket */
1797                recv_id.pid = conn->ksnc_port | LNET_PID_USERFLAG;
1798                recv_id.nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), conn->ksnc_ipaddr);
1799        } else {
1800                recv_id.nid = hello->kshm_src_nid;
1801                recv_id.pid = hello->kshm_src_pid;
1802        }
1803
1804        if (!active) {
1805                *peerid = recv_id;
1806
1807                /* peer determines type */
1808                conn->ksnc_type = ksocknal_invert_type(hello->kshm_ctype);
1809                if (conn->ksnc_type == SOCKLND_CONN_NONE) {
1810                        CERROR ("Unexpected type %d from %s ip %u.%u.%u.%u\n",
1811                                hello->kshm_ctype, libcfs_id2str(*peerid),
1812                                HIPQUAD(conn->ksnc_ipaddr));
1813                        return -EPROTO;
1814                }
1815
1816                return 0;
1817        }
1818
1819        if (peerid->pid != recv_id.pid ||
1820            peerid->nid != recv_id.nid) {
1821                LCONSOLE_ERROR_MSG(0x130, "Connected successfully to %s on host"
1822                                   " %u.%u.%u.%u, but they claimed they were "
1823                                   "%s; please check your Lustre "
1824                                   "configuration.\n",
1825                                   libcfs_id2str(*peerid),
1826                                   HIPQUAD(conn->ksnc_ipaddr),
1827                                   libcfs_id2str(recv_id));
1828                return -EPROTO;
1829        }
1830
1831        if (hello->kshm_ctype == SOCKLND_CONN_NONE) {
1832                /* Possible protocol mismatch or I lost the connection race */
1833                return proto_match ? EALREADY : EPROTO;
1834        }
1835
1836        if (ksocknal_invert_type(hello->kshm_ctype) != conn->ksnc_type) {
1837                CERROR ("Mismatched types: me %d, %s ip %u.%u.%u.%u %d\n",
1838                        conn->ksnc_type, libcfs_id2str(*peerid),
1839                        HIPQUAD(conn->ksnc_ipaddr),
1840                        hello->kshm_ctype);
1841                return -EPROTO;
1842        }
1843
1844        return 0;
1845}
1846
1847int
1848ksocknal_connect (ksock_route_t *route)
1849{
1850        LIST_HEAD    (zombies);
1851        ksock_peer_t     *peer = route->ksnr_peer;
1852        int            type;
1853        int            wanted;
1854        socket_t     *sock;
1855        cfs_time_t      deadline;
1856        int            retry_later = 0;
1857        int            rc = 0;
1858
1859        deadline = cfs_time_add(cfs_time_current(),
1860                                cfs_time_seconds(*ksocknal_tunables.ksnd_timeout));
1861
1862        write_lock_bh(&ksocknal_data.ksnd_global_lock);
1863
1864        LASSERT (route->ksnr_scheduled);
1865        LASSERT (!route->ksnr_connecting);
1866
1867        route->ksnr_connecting = 1;
1868
1869        for (;;) {
1870                wanted = ksocknal_route_mask() & ~route->ksnr_connected;
1871
1872                /* stop connecting if peer/route got closed under me, or
1873                 * route got connected while queued */
1874                if (peer->ksnp_closing || route->ksnr_deleted ||
1875                    wanted == 0) {
1876                        retry_later = 0;
1877                        break;
1878                }
1879
1880                /* reschedule if peer is connecting to me */
1881                if (peer->ksnp_accepting > 0) {
1882                        CDEBUG(D_NET,
1883                               "peer %s(%d) already connecting to me, retry later.\n",
1884                               libcfs_nid2str(peer->ksnp_id.nid), peer->ksnp_accepting);
1885                        retry_later = 1;
1886                }
1887
1888                if (retry_later) /* needs reschedule */
1889                        break;
1890
1891                if ((wanted & (1 << SOCKLND_CONN_ANY)) != 0) {
1892                        type = SOCKLND_CONN_ANY;
1893                } else if ((wanted & (1 << SOCKLND_CONN_CONTROL)) != 0) {
1894                        type = SOCKLND_CONN_CONTROL;
1895                } else if ((wanted & (1 << SOCKLND_CONN_BULK_IN)) != 0) {
1896                        type = SOCKLND_CONN_BULK_IN;
1897                } else {
1898                        LASSERT ((wanted & (1 << SOCKLND_CONN_BULK_OUT)) != 0);
1899                        type = SOCKLND_CONN_BULK_OUT;
1900                }
1901
1902                write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1903
1904                if (cfs_time_aftereq(cfs_time_current(), deadline)) {
1905                        rc = -ETIMEDOUT;
1906                        lnet_connect_console_error(rc, peer->ksnp_id.nid,
1907                                                   route->ksnr_ipaddr,
1908                                                   route->ksnr_port);
1909                        goto failed;
1910                }
1911
1912                rc = lnet_connect(&sock, peer->ksnp_id.nid,
1913                                  route->ksnr_myipaddr,
1914                                  route->ksnr_ipaddr, route->ksnr_port);
1915                if (rc != 0)
1916                        goto failed;
1917
1918                rc = ksocknal_create_conn(peer->ksnp_ni, route, sock, type);
1919                if (rc < 0) {
1920                        lnet_connect_console_error(rc, peer->ksnp_id.nid,
1921                                                   route->ksnr_ipaddr,
1922                                                   route->ksnr_port);
1923                        goto failed;
1924                }
1925
1926                /* A +ve RC means I have to retry because I lost the connection
1927                 * race or I have to renegotiate protocol version */
1928                retry_later = (rc != 0);
1929                if (retry_later)
1930                        CDEBUG(D_NET, "peer %s: conn race, retry later.\n",
1931                               libcfs_nid2str(peer->ksnp_id.nid));
1932
1933                write_lock_bh(&ksocknal_data.ksnd_global_lock);
1934        }
1935
1936        route->ksnr_scheduled = 0;
1937        route->ksnr_connecting = 0;
1938
1939        if (retry_later) {
1940                /* re-queue for attention; this frees me up to handle
1941                 * the peer's incoming connection request */
1942
1943                if (rc == EALREADY ||
1944                    (rc == 0 && peer->ksnp_accepting > 0)) {
1945                        /* We want to introduce a delay before next
1946                         * attempt to connect if we lost conn race,
1947                         * but the race is resolved quickly usually,
1948                         * so min_reconnectms should be good heuristic */
1949                        route->ksnr_retry_interval =
1950                                cfs_time_seconds(*ksocknal_tunables.ksnd_min_reconnectms)/1000;
1951                        route->ksnr_timeout = cfs_time_add(cfs_time_current(),
1952                                                           route->ksnr_retry_interval);
1953                }
1954
1955                ksocknal_launch_connection_locked(route);
1956        }
1957
1958        write_unlock_bh(&ksocknal_data.ksnd_global_lock);
1959        return retry_later;
1960
1961 failed:
1962        write_lock_bh(&ksocknal_data.ksnd_global_lock);
1963
1964        route->ksnr_scheduled = 0;
1965        route->ksnr_connecting = 0;
1966
1967        /* This is a retry rather than a new connection */
1968        route->ksnr_retry_interval *= 2;
1969        route->ksnr_retry_interval =
1970                MAX(route->ksnr_retry_interval,
1971                    cfs_time_seconds(*ksocknal_tunables.ksnd_min_reconnectms)/1000);
1972        route->ksnr_retry_interval =
1973                MIN(route->ksnr_retry_interval,
1974                    cfs_time_seconds(*ksocknal_tunables.ksnd_max_reconnectms)/1000);
1975
1976        LASSERT (route->ksnr_retry_interval != 0);
1977        route->ksnr_timeout = cfs_time_add(cfs_time_current(),
1978                                           route->ksnr_retry_interval);
1979
1980        if (!list_empty(&peer->ksnp_tx_queue) &&
1981            peer->ksnp_accepting == 0 &&
1982            ksocknal_find_connecting_route_locked(peer) == NULL) {
1983                ksock_conn_t *conn;
1984
1985                /* ksnp_tx_queue is queued on a conn on successful
1986                 * connection for V1.x and V2.x */
1987                if (!list_empty (&peer->ksnp_conns)) {
1988                        conn = list_entry(peer->ksnp_conns.next,
1989                                              ksock_conn_t, ksnc_list);
1990                        LASSERT (conn->ksnc_proto == &ksocknal_protocol_v3x);
1991                }
1992
1993                /* take all the blocked packets while I've got the lock and
1994                 * complete below... */
1995                list_splice_init(&peer->ksnp_tx_queue, &zombies);
1996        }
1997
1998#if 0      /* irrelevent with only eager routes */
1999        if (!route->ksnr_deleted) {
2000                /* make this route least-favourite for re-selection */
2001                list_del(&route->ksnr_list);
2002                list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
2003        }
2004#endif
2005        write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2006
2007        ksocknal_peer_failed(peer);
2008        ksocknal_txlist_done(peer->ksnp_ni, &zombies, 1);
2009        return 0;
2010}
2011
2012/*
2013 * check whether we need to create more connds.
2014 * It will try to create new thread if it's necessary, @timeout can
2015 * be updated if failed to create, so caller wouldn't keep try while
2016 * running out of resource.
2017 */
2018static int
2019ksocknal_connd_check_start(long sec, long *timeout)
2020{
2021        char name[16];
2022        int rc;
2023        int total = ksocknal_data.ksnd_connd_starting +
2024                    ksocknal_data.ksnd_connd_running;
2025
2026        if (unlikely(ksocknal_data.ksnd_init < SOCKNAL_INIT_ALL)) {
2027                /* still in initializing */
2028                return 0;
2029        }
2030
2031        if (total >= *ksocknal_tunables.ksnd_nconnds_max ||
2032            total > ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV) {
2033                /* can't create more connd, or still have enough
2034                 * threads to handle more connecting */
2035                return 0;
2036        }
2037
2038        if (list_empty(&ksocknal_data.ksnd_connd_routes)) {
2039                /* no pending connecting request */
2040                return 0;
2041        }
2042
2043        if (sec - ksocknal_data.ksnd_connd_failed_stamp <= 1) {
2044                /* may run out of resource, retry later */
2045                *timeout = cfs_time_seconds(1);
2046                return 0;
2047        }
2048
2049        if (ksocknal_data.ksnd_connd_starting > 0) {
2050                /* serialize starting to avoid flood */
2051                return 0;
2052        }
2053
2054        ksocknal_data.ksnd_connd_starting_stamp = sec;
2055        ksocknal_data.ksnd_connd_starting++;
2056        spin_unlock_bh(&ksocknal_data.ksnd_connd_lock);
2057
2058        /* NB: total is the next id */
2059        snprintf(name, sizeof(name), "socknal_cd%02d", total);
2060        rc = ksocknal_thread_start(ksocknal_connd, NULL, name);
2061
2062        spin_lock_bh(&ksocknal_data.ksnd_connd_lock);
2063        if (rc == 0)
2064                return 1;
2065
2066        /* we tried ... */
2067        LASSERT(ksocknal_data.ksnd_connd_starting > 0);
2068        ksocknal_data.ksnd_connd_starting--;
2069        ksocknal_data.ksnd_connd_failed_stamp = cfs_time_current_sec();
2070
2071        return 1;
2072}
2073
2074/*
2075 * check whether current thread can exit, it will return 1 if there are too
2076 * many threads and no creating in past 120 seconds.
2077 * Also, this function may update @timeout to make caller come back
2078 * again to recheck these conditions.
2079 */
2080static int
2081ksocknal_connd_check_stop(long sec, long *timeout)
2082{
2083        int val;
2084
2085        if (unlikely(ksocknal_data.ksnd_init < SOCKNAL_INIT_ALL)) {
2086                /* still in initializing */
2087                return 0;
2088        }
2089
2090        if (ksocknal_data.ksnd_connd_starting > 0) {
2091                /* in progress of starting new thread */
2092                return 0;
2093        }
2094
2095        if (ksocknal_data.ksnd_connd_running <=
2096            *ksocknal_tunables.ksnd_nconnds) { /* can't shrink */
2097                return 0;
2098        }
2099
2100        /* created thread in past 120 seconds? */
2101        val = (int)(ksocknal_data.ksnd_connd_starting_stamp +
2102                    SOCKNAL_CONND_TIMEOUT - sec);
2103
2104        *timeout = (val > 0) ? cfs_time_seconds(val) :
2105                               cfs_time_seconds(SOCKNAL_CONND_TIMEOUT);
2106        if (val > 0)
2107                return 0;
2108
2109        /* no creating in past 120 seconds */
2110
2111        return ksocknal_data.ksnd_connd_running >
2112               ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV;
2113}
2114
2115/* Go through connd_routes queue looking for a route that we can process
2116 * right now, @timeout_p can be updated if we need to come back later */
2117static ksock_route_t *
2118ksocknal_connd_get_route_locked(signed long *timeout_p)
2119{
2120        ksock_route_t *route;
2121        cfs_time_t     now;
2122
2123        now = cfs_time_current();
2124
2125        /* connd_routes can contain both pending and ordinary routes */
2126        list_for_each_entry (route, &ksocknal_data.ksnd_connd_routes,
2127                                 ksnr_connd_list) {
2128
2129                if (route->ksnr_retry_interval == 0 ||
2130                    cfs_time_aftereq(now, route->ksnr_timeout))
2131                        return route;
2132
2133                if (*timeout_p == MAX_SCHEDULE_TIMEOUT ||
2134                    (int)*timeout_p > (int)(route->ksnr_timeout - now))
2135                        *timeout_p = (int)(route->ksnr_timeout - now);
2136        }
2137
2138        return NULL;
2139}
2140
2141int
2142ksocknal_connd (void *arg)
2143{
2144        spinlock_t    *connd_lock = &ksocknal_data.ksnd_connd_lock;
2145        ksock_connreq_t   *cr;
2146        wait_queue_t     wait;
2147        int             nloops = 0;
2148        int             cons_retry = 0;
2149
2150        cfs_block_allsigs ();
2151
2152        init_waitqueue_entry_current (&wait);
2153
2154        spin_lock_bh(connd_lock);
2155
2156        LASSERT(ksocknal_data.ksnd_connd_starting > 0);
2157        ksocknal_data.ksnd_connd_starting--;
2158        ksocknal_data.ksnd_connd_running++;
2159
2160        while (!ksocknal_data.ksnd_shuttingdown) {
2161                ksock_route_t *route = NULL;
2162                long sec = cfs_time_current_sec();
2163                long timeout = MAX_SCHEDULE_TIMEOUT;
2164                int  dropped_lock = 0;
2165
2166                if (ksocknal_connd_check_stop(sec, &timeout)) {
2167                        /* wakeup another one to check stop */
2168                        wake_up(&ksocknal_data.ksnd_connd_waitq);
2169                        break;
2170                }
2171
2172                if (ksocknal_connd_check_start(sec, &timeout)) {
2173                        /* created new thread */
2174                        dropped_lock = 1;
2175                }
2176
2177                if (!list_empty(&ksocknal_data.ksnd_connd_connreqs)) {
2178                        /* Connection accepted by the listener */
2179                        cr = list_entry(ksocknal_data.ksnd_connd_connreqs. \
2180                                            next, ksock_connreq_t, ksncr_list);
2181
2182                        list_del(&cr->ksncr_list);
2183                        spin_unlock_bh(connd_lock);
2184                        dropped_lock = 1;
2185
2186                        ksocknal_create_conn(cr->ksncr_ni, NULL,
2187                                             cr->ksncr_sock, SOCKLND_CONN_NONE);
2188                        lnet_ni_decref(cr->ksncr_ni);
2189                        LIBCFS_FREE(cr, sizeof(*cr));
2190
2191                        spin_lock_bh(connd_lock);
2192                }
2193
2194                /* Only handle an outgoing connection request if there
2195                 * is a thread left to handle incoming connections and
2196                 * create new connd */
2197                if (ksocknal_data.ksnd_connd_connecting + SOCKNAL_CONND_RESV <
2198                    ksocknal_data.ksnd_connd_running) {
2199                        route = ksocknal_connd_get_route_locked(&timeout);
2200                }
2201                if (route != NULL) {
2202                        list_del (&route->ksnr_connd_list);
2203                        ksocknal_data.ksnd_connd_connecting++;
2204                        spin_unlock_bh(connd_lock);
2205                        dropped_lock = 1;
2206
2207                        if (ksocknal_connect(route)) {
2208                                /* consecutive retry */
2209                                if (cons_retry++ > SOCKNAL_INSANITY_RECONN) {
2210                                        CWARN("massive consecutive "
2211                                              "re-connecting to %u.%u.%u.%u\n",
2212                                              HIPQUAD(route->ksnr_ipaddr));
2213                                        cons_retry = 0;
2214                                }
2215                        } else {
2216                                cons_retry = 0;
2217                        }
2218
2219                        ksocknal_route_decref(route);
2220
2221                        spin_lock_bh(connd_lock);
2222                        ksocknal_data.ksnd_connd_connecting--;
2223                }
2224
2225                if (dropped_lock) {
2226                        if (++nloops < SOCKNAL_RESCHED)
2227                                continue;
2228                        spin_unlock_bh(connd_lock);
2229                        nloops = 0;
2230                        cond_resched();
2231                        spin_lock_bh(connd_lock);
2232                        continue;
2233                }
2234
2235                /* Nothing to do for 'timeout'  */
2236                set_current_state(TASK_INTERRUPTIBLE);
2237                add_wait_queue_exclusive(&ksocknal_data.ksnd_connd_waitq, &wait);
2238                spin_unlock_bh(connd_lock);
2239
2240                nloops = 0;
2241                waitq_timedwait(&wait, TASK_INTERRUPTIBLE, timeout);
2242
2243                set_current_state(TASK_RUNNING);
2244                remove_wait_queue(&ksocknal_data.ksnd_connd_waitq, &wait);
2245                spin_lock_bh(connd_lock);
2246        }
2247        ksocknal_data.ksnd_connd_running--;
2248        spin_unlock_bh(connd_lock);
2249
2250        ksocknal_thread_fini();
2251        return 0;
2252}
2253
2254ksock_conn_t *
2255ksocknal_find_timed_out_conn (ksock_peer_t *peer)
2256{
2257        /* We're called with a shared lock on ksnd_global_lock */
2258        ksock_conn_t      *conn;
2259        struct list_head        *ctmp;
2260
2261        list_for_each (ctmp, &peer->ksnp_conns) {
2262                int     error;
2263                conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2264
2265                /* Don't need the {get,put}connsock dance to deref ksnc_sock */
2266                LASSERT (!conn->ksnc_closing);
2267
2268                /* SOCK_ERROR will reset error code of socket in
2269                 * some platform (like Darwin8.x) */
2270                error = cfs_sock_error(conn->ksnc_sock);
2271                if (error != 0) {
2272                        ksocknal_conn_addref(conn);
2273
2274                        switch (error) {
2275                        case ECONNRESET:
2276                                CNETERR("A connection with %s "
2277                                        "(%u.%u.%u.%u:%d) was reset; "
2278                                        "it may have rebooted.\n",
2279                                        libcfs_id2str(peer->ksnp_id),
2280                                        HIPQUAD(conn->ksnc_ipaddr),
2281                                        conn->ksnc_port);
2282                                break;
2283                        case ETIMEDOUT:
2284                                CNETERR("A connection with %s "
2285                                        "(%u.%u.%u.%u:%d) timed out; the "
2286                                        "network or node may be down.\n",
2287                                        libcfs_id2str(peer->ksnp_id),
2288                                        HIPQUAD(conn->ksnc_ipaddr),
2289                                        conn->ksnc_port);
2290                                break;
2291                        default:
2292                                CNETERR("An unexpected network error %d "
2293                                        "occurred with %s "
2294                                        "(%u.%u.%u.%u:%d\n", error,
2295                                        libcfs_id2str(peer->ksnp_id),
2296                                        HIPQUAD(conn->ksnc_ipaddr),
2297                                        conn->ksnc_port);
2298                                break;
2299                        }
2300
2301                        return (conn);
2302                }
2303
2304                if (conn->ksnc_rx_started &&
2305                    cfs_time_aftereq(cfs_time_current(),
2306                                     conn->ksnc_rx_deadline)) {
2307                        /* Timed out incomplete incoming message */
2308                        ksocknal_conn_addref(conn);
2309                        CNETERR("Timeout receiving from %s (%u.%u.%u.%u:%d), "
2310                                "state %d wanted %d left %d\n",
2311                                libcfs_id2str(peer->ksnp_id),
2312                                HIPQUAD(conn->ksnc_ipaddr),
2313                                conn->ksnc_port,
2314                                conn->ksnc_rx_state,
2315                                conn->ksnc_rx_nob_wanted,
2316                                conn->ksnc_rx_nob_left);
2317                        return (conn);
2318                }
2319
2320                if ((!list_empty(&conn->ksnc_tx_queue) ||
2321                     cfs_sock_wmem_queued(conn->ksnc_sock) != 0) &&
2322                    cfs_time_aftereq(cfs_time_current(),
2323                                     conn->ksnc_tx_deadline)) {
2324                        /* Timed out messages queued for sending or
2325                         * buffered in the socket's send buffer */
2326                        ksocknal_conn_addref(conn);
2327                        CNETERR("Timeout sending data to %s (%u.%u.%u.%u:%d) "
2328                                "the network or that node may be down.\n",
2329                                libcfs_id2str(peer->ksnp_id),
2330                                HIPQUAD(conn->ksnc_ipaddr),
2331                                conn->ksnc_port);
2332                        return (conn);
2333                }
2334        }
2335
2336        return (NULL);
2337}
2338
2339static inline void
2340ksocknal_flush_stale_txs(ksock_peer_t *peer)
2341{
2342        ksock_tx_t      *tx;
2343        LIST_HEAD      (stale_txs);
2344
2345        write_lock_bh(&ksocknal_data.ksnd_global_lock);
2346
2347        while (!list_empty (&peer->ksnp_tx_queue)) {
2348                tx = list_entry (peer->ksnp_tx_queue.next,
2349                                     ksock_tx_t, tx_list);
2350
2351                if (!cfs_time_aftereq(cfs_time_current(),
2352                                      tx->tx_deadline))
2353                        break;
2354
2355                list_del (&tx->tx_list);
2356                list_add_tail (&tx->tx_list, &stale_txs);
2357        }
2358
2359        write_unlock_bh(&ksocknal_data.ksnd_global_lock);
2360
2361        ksocknal_txlist_done(peer->ksnp_ni, &stale_txs, 1);
2362}
2363
2364int
2365ksocknal_send_keepalive_locked(ksock_peer_t *peer)
2366{
2367        ksock_sched_t  *sched;
2368        ksock_conn_t   *conn;
2369        ksock_tx_t     *tx;
2370
2371        if (list_empty(&peer->ksnp_conns)) /* last_alive will be updated by create_conn */
2372                return 0;
2373
2374        if (peer->ksnp_proto != &ksocknal_protocol_v3x)
2375                return 0;
2376
2377        if (*ksocknal_tunables.ksnd_keepalive <= 0 ||
2378            cfs_time_before(cfs_time_current(),
2379                            cfs_time_add(peer->ksnp_last_alive,
2380                                         cfs_time_seconds(*ksocknal_tunables.ksnd_keepalive))))
2381                return 0;
2382
2383        if (cfs_time_before(cfs_time_current(),
2384                            peer->ksnp_send_keepalive))
2385                return 0;
2386
2387        /* retry 10 secs later, so we wouldn't put pressure
2388         * on this peer if we failed to send keepalive this time */
2389        peer->ksnp_send_keepalive = cfs_time_shift(10);
2390
2391        conn = ksocknal_find_conn_locked(peer, NULL, 1);
2392        if (conn != NULL) {
2393                sched = conn->ksnc_scheduler;
2394
2395                spin_lock_bh(&sched->kss_lock);
2396                if (!list_empty(&conn->ksnc_tx_queue)) {
2397                        spin_unlock_bh(&sched->kss_lock);
2398                        /* there is an queued ACK, don't need keepalive */
2399                        return 0;
2400                }
2401
2402                spin_unlock_bh(&sched->kss_lock);
2403        }
2404
2405        read_unlock(&ksocknal_data.ksnd_global_lock);
2406
2407        /* cookie = 1 is reserved for keepalive PING */
2408        tx = ksocknal_alloc_tx_noop(1, 1);
2409        if (tx == NULL) {
2410                read_lock(&ksocknal_data.ksnd_global_lock);
2411                return -ENOMEM;
2412        }
2413
2414        if (ksocknal_launch_packet(peer->ksnp_ni, tx, peer->ksnp_id) == 0) {
2415                read_lock(&ksocknal_data.ksnd_global_lock);
2416                return 1;
2417        }
2418
2419        ksocknal_free_tx(tx);
2420        read_lock(&ksocknal_data.ksnd_global_lock);
2421
2422        return -EIO;
2423}
2424
2425
2426void
2427ksocknal_check_peer_timeouts (int idx)
2428{
2429        struct list_head       *peers = &ksocknal_data.ksnd_peers[idx];
2430        ksock_peer_t     *peer;
2431        ksock_conn_t     *conn;
2432        ksock_tx_t       *tx;
2433
2434 again:
2435        /* NB. We expect to have a look at all the peers and not find any
2436         * connections to time out, so we just use a shared lock while we
2437         * take a look... */
2438        read_lock(&ksocknal_data.ksnd_global_lock);
2439
2440        list_for_each_entry(peer, peers, ksnp_list) {
2441                cfs_time_t  deadline = 0;
2442                int      resid = 0;
2443                int      n     = 0;
2444
2445                if (ksocknal_send_keepalive_locked(peer) != 0) {
2446                        read_unlock(&ksocknal_data.ksnd_global_lock);
2447                        goto again;
2448                }
2449
2450                conn = ksocknal_find_timed_out_conn (peer);
2451
2452                if (conn != NULL) {
2453                        read_unlock(&ksocknal_data.ksnd_global_lock);
2454
2455                        ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
2456
2457                        /* NB we won't find this one again, but we can't
2458                         * just proceed with the next peer, since we dropped
2459                         * ksnd_global_lock and it might be dead already! */
2460                        ksocknal_conn_decref(conn);
2461                        goto again;
2462                }
2463
2464                /* we can't process stale txs right here because we're
2465                 * holding only shared lock */
2466                if (!list_empty (&peer->ksnp_tx_queue)) {
2467                        ksock_tx_t *tx =
2468                                list_entry (peer->ksnp_tx_queue.next,
2469                                                ksock_tx_t, tx_list);
2470
2471                        if (cfs_time_aftereq(cfs_time_current(),
2472                                             tx->tx_deadline)) {
2473
2474                                ksocknal_peer_addref(peer);
2475                                read_unlock(&ksocknal_data.ksnd_global_lock);
2476
2477                                ksocknal_flush_stale_txs(peer);
2478
2479                                ksocknal_peer_decref(peer);
2480                                goto again;
2481                        }
2482                }
2483
2484                if (list_empty(&peer->ksnp_zc_req_list))
2485                        continue;
2486
2487                spin_lock(&peer->ksnp_lock);
2488                list_for_each_entry(tx, &peer->ksnp_zc_req_list, tx_zc_list) {
2489                        if (!cfs_time_aftereq(cfs_time_current(),
2490                                              tx->tx_deadline))
2491                                break;
2492                        /* ignore the TX if connection is being closed */
2493                        if (tx->tx_conn->ksnc_closing)
2494                                continue;
2495                        n++;
2496                }
2497
2498                if (n == 0) {
2499                        spin_unlock(&peer->ksnp_lock);
2500                        continue;
2501                }
2502
2503                tx = list_entry(peer->ksnp_zc_req_list.next,
2504                                    ksock_tx_t, tx_zc_list);
2505                deadline = tx->tx_deadline;
2506                resid    = tx->tx_resid;
2507                conn     = tx->tx_conn;
2508                ksocknal_conn_addref(conn);
2509
2510                spin_unlock(&peer->ksnp_lock);
2511                read_unlock(&ksocknal_data.ksnd_global_lock);
2512
2513                CERROR("Total %d stale ZC_REQs for peer %s detected; the "
2514                       "oldest(%p) timed out %ld secs ago, "
2515                       "resid: %d, wmem: %d\n",
2516                       n, libcfs_nid2str(peer->ksnp_id.nid), tx,
2517                       cfs_duration_sec(cfs_time_current() - deadline),
2518                       resid, cfs_sock_wmem_queued(conn->ksnc_sock));
2519
2520                ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
2521                ksocknal_conn_decref(conn);
2522                goto again;
2523        }
2524
2525        read_unlock(&ksocknal_data.ksnd_global_lock);
2526}
2527
2528int
2529ksocknal_reaper (void *arg)
2530{
2531        wait_queue_t     wait;
2532        ksock_conn_t      *conn;
2533        ksock_sched_t     *sched;
2534        struct list_head         enomem_conns;
2535        int             nenomem_conns;
2536        cfs_duration_t     timeout;
2537        int             i;
2538        int             peer_index = 0;
2539        cfs_time_t       deadline = cfs_time_current();
2540
2541        cfs_block_allsigs ();
2542
2543        INIT_LIST_HEAD(&enomem_conns);
2544        init_waitqueue_entry_current (&wait);
2545
2546        spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
2547
2548        while (!ksocknal_data.ksnd_shuttingdown) {
2549
2550                if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2551                        conn = list_entry (ksocknal_data. \
2552                                               ksnd_deathrow_conns.next,
2553                                               ksock_conn_t, ksnc_list);
2554                        list_del (&conn->ksnc_list);
2555
2556                        spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
2557
2558                        ksocknal_terminate_conn(conn);
2559                        ksocknal_conn_decref(conn);
2560
2561                        spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
2562                        continue;
2563                }
2564
2565                if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2566                        conn = list_entry (ksocknal_data.ksnd_zombie_conns.\
2567                                               next, ksock_conn_t, ksnc_list);
2568                        list_del (&conn->ksnc_list);
2569
2570                        spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
2571
2572                        ksocknal_destroy_conn(conn);
2573
2574                        spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
2575                        continue;
2576                }
2577
2578                if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) {
2579                        list_add(&enomem_conns,
2580                                     &ksocknal_data.ksnd_enomem_conns);
2581                        list_del_init(&ksocknal_data.ksnd_enomem_conns);
2582                }
2583
2584                spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
2585
2586                /* reschedule all the connections that stalled with ENOMEM... */
2587                nenomem_conns = 0;
2588                while (!list_empty (&enomem_conns)) {
2589                        conn = list_entry (enomem_conns.next,
2590                                               ksock_conn_t, ksnc_tx_list);
2591                        list_del (&conn->ksnc_tx_list);
2592
2593                        sched = conn->ksnc_scheduler;
2594
2595                        spin_lock_bh(&sched->kss_lock);
2596
2597                        LASSERT(conn->ksnc_tx_scheduled);
2598                        conn->ksnc_tx_ready = 1;
2599                        list_add_tail(&conn->ksnc_tx_list,
2600                                          &sched->kss_tx_conns);
2601                        wake_up(&sched->kss_waitq);
2602
2603                        spin_unlock_bh(&sched->kss_lock);
2604                        nenomem_conns++;
2605                }
2606
2607                /* careful with the jiffy wrap... */
2608                while ((timeout = cfs_time_sub(deadline,
2609                                               cfs_time_current())) <= 0) {
2610                        const int n = 4;
2611                        const int p = 1;
2612                        int       chunk = ksocknal_data.ksnd_peer_hash_size;
2613
2614                        /* Time to check for timeouts on a few more peers: I do
2615                         * checks every 'p' seconds on a proportion of the peer
2616                         * table and I need to check every connection 'n' times
2617                         * within a timeout interval, to ensure I detect a
2618                         * timeout on any connection within (n+1)/n times the
2619                         * timeout interval. */
2620
2621                        if (*ksocknal_tunables.ksnd_timeout > n * p)
2622                                chunk = (chunk * n * p) /
2623                                        *ksocknal_tunables.ksnd_timeout;
2624                        if (chunk == 0)
2625                                chunk = 1;
2626
2627                        for (i = 0; i < chunk; i++) {
2628                                ksocknal_check_peer_timeouts (peer_index);
2629                                peer_index = (peer_index + 1) %
2630                                             ksocknal_data.ksnd_peer_hash_size;
2631                        }
2632
2633                        deadline = cfs_time_add(deadline, cfs_time_seconds(p));
2634                }
2635
2636                if (nenomem_conns != 0) {
2637                        /* Reduce my timeout if I rescheduled ENOMEM conns.
2638                         * This also prevents me getting woken immediately
2639                         * if any go back on my enomem list. */
2640                        timeout = SOCKNAL_ENOMEM_RETRY;
2641                }
2642                ksocknal_data.ksnd_reaper_waketime =
2643                        cfs_time_add(cfs_time_current(), timeout);
2644
2645                set_current_state (TASK_INTERRUPTIBLE);
2646                add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2647
2648                if (!ksocknal_data.ksnd_shuttingdown &&
2649                    list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2650                    list_empty (&ksocknal_data.ksnd_zombie_conns))
2651                        waitq_timedwait (&wait, TASK_INTERRUPTIBLE,
2652                                             timeout);
2653
2654                set_current_state (TASK_RUNNING);
2655                remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2656
2657                spin_lock_bh(&ksocknal_data.ksnd_reaper_lock);
2658        }
2659
2660        spin_unlock_bh(&ksocknal_data.ksnd_reaper_lock);
2661
2662        ksocknal_thread_fini();
2663        return 0;
2664}
2665