linux/net/rds/send.c
<<
>>
Prefs
   1/*
   2 * Copyright (c) 2006 Oracle.  All rights reserved.
   3 *
   4 * This software is available to you under a choice of one of two
   5 * licenses.  You may choose to be licensed under the terms of the GNU
   6 * General Public License (GPL) Version 2, available from the file
   7 * COPYING in the main directory of this source tree, or the
   8 * OpenIB.org BSD license below:
   9 *
  10 *     Redistribution and use in source and binary forms, with or
  11 *     without modification, are permitted provided that the following
  12 *     conditions are met:
  13 *
  14 *      - Redistributions of source code must retain the above
  15 *        copyright notice, this list of conditions and the following
  16 *        disclaimer.
  17 *
  18 *      - Redistributions in binary form must reproduce the above
  19 *        copyright notice, this list of conditions and the following
  20 *        disclaimer in the documentation and/or other materials
  21 *        provided with the distribution.
  22 *
  23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
  27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
  29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  30 * SOFTWARE.
  31 *
  32 */
  33#include <linux/kernel.h>
  34#include <linux/moduleparam.h>
  35#include <linux/gfp.h>
  36#include <net/sock.h>
  37#include <linux/in.h>
  38#include <linux/list.h>
  39#include <linux/ratelimit.h>
  40#include <linux/export.h>
  41#include <linux/sizes.h>
  42
  43#include "rds.h"
  44
  45/* When transmitting messages in rds_send_xmit, we need to emerge from
  46 * time to time and briefly release the CPU. Otherwise the softlock watchdog
  47 * will kick our shin.
  48 * Also, it seems fairer to not let one busy connection stall all the
  49 * others.
  50 *
  51 * send_batch_count is the number of times we'll loop in send_xmit. Setting
  52 * it to 0 will restore the old behavior (where we looped until we had
  53 * drained the queue).
  54 */
  55static int send_batch_count = SZ_1K;
  56module_param(send_batch_count, int, 0444);
  57MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
  58
  59static void rds_send_remove_from_sock(struct list_head *messages, int status);
  60
  61/*
  62 * Reset the send state.  Callers must ensure that this doesn't race with
  63 * rds_send_xmit().
  64 */
  65void rds_send_path_reset(struct rds_conn_path *cp)
  66{
  67        struct rds_message *rm, *tmp;
  68        unsigned long flags;
  69
  70        if (cp->cp_xmit_rm) {
  71                rm = cp->cp_xmit_rm;
  72                cp->cp_xmit_rm = NULL;
  73                /* Tell the user the RDMA op is no longer mapped by the
  74                 * transport. This isn't entirely true (it's flushed out
  75                 * independently) but as the connection is down, there's
  76                 * no ongoing RDMA to/from that memory */
  77                rds_message_unmapped(rm);
  78                rds_message_put(rm);
  79        }
  80
  81        cp->cp_xmit_sg = 0;
  82        cp->cp_xmit_hdr_off = 0;
  83        cp->cp_xmit_data_off = 0;
  84        cp->cp_xmit_atomic_sent = 0;
  85        cp->cp_xmit_rdma_sent = 0;
  86        cp->cp_xmit_data_sent = 0;
  87
  88        cp->cp_conn->c_map_queued = 0;
  89
  90        cp->cp_unacked_packets = rds_sysctl_max_unacked_packets;
  91        cp->cp_unacked_bytes = rds_sysctl_max_unacked_bytes;
  92
  93        /* Mark messages as retransmissions, and move them to the send q */
  94        spin_lock_irqsave(&cp->cp_lock, flags);
  95        list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
  96                set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
  97                set_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags);
  98        }
  99        list_splice_init(&cp->cp_retrans, &cp->cp_send_queue);
 100        spin_unlock_irqrestore(&cp->cp_lock, flags);
 101}
 102EXPORT_SYMBOL_GPL(rds_send_path_reset);
 103
 104static int acquire_in_xmit(struct rds_conn_path *cp)
 105{
 106        return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0;
 107}
 108
 109static void release_in_xmit(struct rds_conn_path *cp)
 110{
 111        clear_bit(RDS_IN_XMIT, &cp->cp_flags);
 112        smp_mb__after_atomic();
 113        /*
 114         * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
 115         * hot path and finding waiters is very rare.  We don't want to walk
 116         * the system-wide hashed waitqueue buckets in the fast path only to
 117         * almost never find waiters.
 118         */
 119        if (waitqueue_active(&cp->cp_waitq))
 120                wake_up_all(&cp->cp_waitq);
 121}
 122
 123/*
 124 * We're making the conscious trade-off here to only send one message
 125 * down the connection at a time.
 126 *   Pro:
 127 *      - tx queueing is a simple fifo list
 128 *      - reassembly is optional and easily done by transports per conn
 129 *      - no per flow rx lookup at all, straight to the socket
 130 *      - less per-frag memory and wire overhead
 131 *   Con:
 132 *      - queued acks can be delayed behind large messages
 133 *   Depends:
 134 *      - small message latency is higher behind queued large messages
 135 *      - large message latency isn't starved by intervening small sends
 136 */
 137int rds_send_xmit(struct rds_conn_path *cp)
 138{
 139        struct rds_connection *conn = cp->cp_conn;
 140        struct rds_message *rm;
 141        unsigned long flags;
 142        unsigned int tmp;
 143        struct scatterlist *sg;
 144        int ret = 0;
 145        LIST_HEAD(to_be_dropped);
 146        int batch_count;
 147        unsigned long send_gen = 0;
 148
 149restart:
 150        batch_count = 0;
 151
 152        /*
 153         * sendmsg calls here after having queued its message on the send
 154         * queue.  We only have one task feeding the connection at a time.  If
 155         * another thread is already feeding the queue then we back off.  This
 156         * avoids blocking the caller and trading per-connection data between
 157         * caches per message.
 158         */
 159        if (!acquire_in_xmit(cp)) {
 160                rds_stats_inc(s_send_lock_contention);
 161                ret = -ENOMEM;
 162                goto out;
 163        }
 164
 165        /*
 166         * we record the send generation after doing the xmit acquire.
 167         * if someone else manages to jump in and do some work, we'll use
 168         * this to avoid a goto restart farther down.
 169         *
 170         * The acquire_in_xmit() check above ensures that only one
 171         * caller can increment c_send_gen at any time.
 172         */
 173        cp->cp_send_gen++;
 174        send_gen = cp->cp_send_gen;
 175
 176        /*
 177         * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
 178         * we do the opposite to avoid races.
 179         */
 180        if (!rds_conn_path_up(cp)) {
 181                release_in_xmit(cp);
 182                ret = 0;
 183                goto out;
 184        }
 185
 186        if (conn->c_trans->xmit_path_prepare)
 187                conn->c_trans->xmit_path_prepare(cp);
 188
 189        /*
 190         * spin trying to push headers and data down the connection until
 191         * the connection doesn't make forward progress.
 192         */
 193        while (1) {
 194
 195                rm = cp->cp_xmit_rm;
 196
 197                /*
 198                 * If between sending messages, we can send a pending congestion
 199                 * map update.
 200                 */
 201                if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
 202                        rm = rds_cong_update_alloc(conn);
 203                        if (IS_ERR(rm)) {
 204                                ret = PTR_ERR(rm);
 205                                break;
 206                        }
 207                        rm->data.op_active = 1;
 208                        rm->m_inc.i_conn_path = cp;
 209                        rm->m_inc.i_conn = cp->cp_conn;
 210
 211                        cp->cp_xmit_rm = rm;
 212                }
 213
 214                /*
 215                 * If not already working on one, grab the next message.
 216                 *
 217                 * cp_xmit_rm holds a ref while we're sending this message down
 218                 * the connction.  We can use this ref while holding the
 219                 * send_sem.. rds_send_reset() is serialized with it.
 220                 */
 221                if (!rm) {
 222                        unsigned int len;
 223
 224                        batch_count++;
 225
 226                        /* we want to process as big a batch as we can, but
 227                         * we also want to avoid softlockups.  If we've been
 228                         * through a lot of messages, lets back off and see
 229                         * if anyone else jumps in
 230                         */
 231                        if (batch_count >= send_batch_count)
 232                                goto over_batch;
 233
 234                        spin_lock_irqsave(&cp->cp_lock, flags);
 235
 236                        if (!list_empty(&cp->cp_send_queue)) {
 237                                rm = list_entry(cp->cp_send_queue.next,
 238                                                struct rds_message,
 239                                                m_conn_item);
 240                                rds_message_addref(rm);
 241
 242                                /*
 243                                 * Move the message from the send queue to the retransmit
 244                                 * list right away.
 245                                 */
 246                                list_move_tail(&rm->m_conn_item,
 247                                               &cp->cp_retrans);
 248                        }
 249
 250                        spin_unlock_irqrestore(&cp->cp_lock, flags);
 251
 252                        if (!rm)
 253                                break;
 254
 255                        /* Unfortunately, the way Infiniband deals with
 256                         * RDMA to a bad MR key is by moving the entire
 257                         * queue pair to error state. We cold possibly
 258                         * recover from that, but right now we drop the
 259                         * connection.
 260                         * Therefore, we never retransmit messages with RDMA ops.
 261                         */
 262                        if (rm->rdma.op_active &&
 263                            test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
 264                                spin_lock_irqsave(&cp->cp_lock, flags);
 265                                if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
 266                                        list_move(&rm->m_conn_item, &to_be_dropped);
 267                                spin_unlock_irqrestore(&cp->cp_lock, flags);
 268                                continue;
 269                        }
 270
 271                        /* Require an ACK every once in a while */
 272                        len = ntohl(rm->m_inc.i_hdr.h_len);
 273                        if (cp->cp_unacked_packets == 0 ||
 274                            cp->cp_unacked_bytes < len) {
 275                                __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
 276
 277                                cp->cp_unacked_packets =
 278                                        rds_sysctl_max_unacked_packets;
 279                                cp->cp_unacked_bytes =
 280                                        rds_sysctl_max_unacked_bytes;
 281                                rds_stats_inc(s_send_ack_required);
 282                        } else {
 283                                cp->cp_unacked_bytes -= len;
 284                                cp->cp_unacked_packets--;
 285                        }
 286
 287                        cp->cp_xmit_rm = rm;
 288                }
 289
 290                /* The transport either sends the whole rdma or none of it */
 291                if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) {
 292                        rm->m_final_op = &rm->rdma;
 293                        /* The transport owns the mapped memory for now.
 294                         * You can't unmap it while it's on the send queue
 295                         */
 296                        set_bit(RDS_MSG_MAPPED, &rm->m_flags);
 297                        ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
 298                        if (ret) {
 299                                clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
 300                                wake_up_interruptible(&rm->m_flush_wait);
 301                                break;
 302                        }
 303                        cp->cp_xmit_rdma_sent = 1;
 304
 305                }
 306
 307                if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) {
 308                        rm->m_final_op = &rm->atomic;
 309                        /* The transport owns the mapped memory for now.
 310                         * You can't unmap it while it's on the send queue
 311                         */
 312                        set_bit(RDS_MSG_MAPPED, &rm->m_flags);
 313                        ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
 314                        if (ret) {
 315                                clear_bit(RDS_MSG_MAPPED, &rm->m_flags);
 316                                wake_up_interruptible(&rm->m_flush_wait);
 317                                break;
 318                        }
 319                        cp->cp_xmit_atomic_sent = 1;
 320
 321                }
 322
 323                /*
 324                 * A number of cases require an RDS header to be sent
 325                 * even if there is no data.
 326                 * We permit 0-byte sends; rds-ping depends on this.
 327                 * However, if there are exclusively attached silent ops,
 328                 * we skip the hdr/data send, to enable silent operation.
 329                 */
 330                if (rm->data.op_nents == 0) {
 331                        int ops_present;
 332                        int all_ops_are_silent = 1;
 333
 334                        ops_present = (rm->atomic.op_active || rm->rdma.op_active);
 335                        if (rm->atomic.op_active && !rm->atomic.op_silent)
 336                                all_ops_are_silent = 0;
 337                        if (rm->rdma.op_active && !rm->rdma.op_silent)
 338                                all_ops_are_silent = 0;
 339
 340                        if (ops_present && all_ops_are_silent
 341                            && !rm->m_rdma_cookie)
 342                                rm->data.op_active = 0;
 343                }
 344
 345                if (rm->data.op_active && !cp->cp_xmit_data_sent) {
 346                        rm->m_final_op = &rm->data;
 347
 348                        ret = conn->c_trans->xmit(conn, rm,
 349                                                  cp->cp_xmit_hdr_off,
 350                                                  cp->cp_xmit_sg,
 351                                                  cp->cp_xmit_data_off);
 352                        if (ret <= 0)
 353                                break;
 354
 355                        if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) {
 356                                tmp = min_t(int, ret,
 357                                            sizeof(struct rds_header) -
 358                                            cp->cp_xmit_hdr_off);
 359                                cp->cp_xmit_hdr_off += tmp;
 360                                ret -= tmp;
 361                        }
 362
 363                        sg = &rm->data.op_sg[cp->cp_xmit_sg];
 364                        while (ret) {
 365                                tmp = min_t(int, ret, sg->length -
 366                                                      cp->cp_xmit_data_off);
 367                                cp->cp_xmit_data_off += tmp;
 368                                ret -= tmp;
 369                                if (cp->cp_xmit_data_off == sg->length) {
 370                                        cp->cp_xmit_data_off = 0;
 371                                        sg++;
 372                                        cp->cp_xmit_sg++;
 373                                        BUG_ON(ret != 0 && cp->cp_xmit_sg ==
 374                                               rm->data.op_nents);
 375                                }
 376                        }
 377
 378                        if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) &&
 379                            (cp->cp_xmit_sg == rm->data.op_nents))
 380                                cp->cp_xmit_data_sent = 1;
 381                }
 382
 383                /*
 384                 * A rm will only take multiple times through this loop
 385                 * if there is a data op. Thus, if the data is sent (or there was
 386                 * none), then we're done with the rm.
 387                 */
 388                if (!rm->data.op_active || cp->cp_xmit_data_sent) {
 389                        cp->cp_xmit_rm = NULL;
 390                        cp->cp_xmit_sg = 0;
 391                        cp->cp_xmit_hdr_off = 0;
 392                        cp->cp_xmit_data_off = 0;
 393                        cp->cp_xmit_rdma_sent = 0;
 394                        cp->cp_xmit_atomic_sent = 0;
 395                        cp->cp_xmit_data_sent = 0;
 396
 397                        rds_message_put(rm);
 398                }
 399        }
 400
 401over_batch:
 402        if (conn->c_trans->xmit_path_complete)
 403                conn->c_trans->xmit_path_complete(cp);
 404        release_in_xmit(cp);
 405
 406        /* Nuke any messages we decided not to retransmit. */
 407        if (!list_empty(&to_be_dropped)) {
 408                /* irqs on here, so we can put(), unlike above */
 409                list_for_each_entry(rm, &to_be_dropped, m_conn_item)
 410                        rds_message_put(rm);
 411                rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
 412        }
 413
 414        /*
 415         * Other senders can queue a message after we last test the send queue
 416         * but before we clear RDS_IN_XMIT.  In that case they'd back off and
 417         * not try and send their newly queued message.  We need to check the
 418         * send queue after having cleared RDS_IN_XMIT so that their message
 419         * doesn't get stuck on the send queue.
 420         *
 421         * If the transport cannot continue (i.e ret != 0), then it must
 422         * call us when more room is available, such as from the tx
 423         * completion handler.
 424         *
 425         * We have an extra generation check here so that if someone manages
 426         * to jump in after our release_in_xmit, we'll see that they have done
 427         * some work and we will skip our goto
 428         */
 429        if (ret == 0) {
 430                smp_mb();
 431                if ((test_bit(0, &conn->c_map_queued) ||
 432                     !list_empty(&cp->cp_send_queue)) &&
 433                    send_gen == cp->cp_send_gen) {
 434                        rds_stats_inc(s_send_lock_queue_raced);
 435                        if (batch_count < send_batch_count)
 436                                goto restart;
 437                        queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
 438                }
 439        }
 440out:
 441        return ret;
 442}
 443EXPORT_SYMBOL_GPL(rds_send_xmit);
 444
 445static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
 446{
 447        u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
 448
 449        assert_spin_locked(&rs->rs_lock);
 450
 451        BUG_ON(rs->rs_snd_bytes < len);
 452        rs->rs_snd_bytes -= len;
 453
 454        if (rs->rs_snd_bytes == 0)
 455                rds_stats_inc(s_send_queue_empty);
 456}
 457
 458static inline int rds_send_is_acked(struct rds_message *rm, u64 ack,
 459                                    is_acked_func is_acked)
 460{
 461        if (is_acked)
 462                return is_acked(rm, ack);
 463        return be64_to_cpu(rm->m_inc.i_hdr.h_sequence) <= ack;
 464}
 465
 466/*
 467 * This is pretty similar to what happens below in the ACK
 468 * handling code - except that we call here as soon as we get
 469 * the IB send completion on the RDMA op and the accompanying
 470 * message.
 471 */
 472void rds_rdma_send_complete(struct rds_message *rm, int status)
 473{
 474        struct rds_sock *rs = NULL;
 475        struct rm_rdma_op *ro;
 476        struct rds_notifier *notifier;
 477        unsigned long flags;
 478
 479        spin_lock_irqsave(&rm->m_rs_lock, flags);
 480
 481        ro = &rm->rdma;
 482        if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
 483            ro->op_active && ro->op_notify && ro->op_notifier) {
 484                notifier = ro->op_notifier;
 485                rs = rm->m_rs;
 486                sock_hold(rds_rs_to_sk(rs));
 487
 488                notifier->n_status = status;
 489                spin_lock(&rs->rs_lock);
 490                list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
 491                spin_unlock(&rs->rs_lock);
 492
 493                ro->op_notifier = NULL;
 494        }
 495
 496        spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 497
 498        if (rs) {
 499                rds_wake_sk_sleep(rs);
 500                sock_put(rds_rs_to_sk(rs));
 501        }
 502}
 503EXPORT_SYMBOL_GPL(rds_rdma_send_complete);
 504
 505/*
 506 * Just like above, except looks at atomic op
 507 */
 508void rds_atomic_send_complete(struct rds_message *rm, int status)
 509{
 510        struct rds_sock *rs = NULL;
 511        struct rm_atomic_op *ao;
 512        struct rds_notifier *notifier;
 513        unsigned long flags;
 514
 515        spin_lock_irqsave(&rm->m_rs_lock, flags);
 516
 517        ao = &rm->atomic;
 518        if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
 519            && ao->op_active && ao->op_notify && ao->op_notifier) {
 520                notifier = ao->op_notifier;
 521                rs = rm->m_rs;
 522                sock_hold(rds_rs_to_sk(rs));
 523
 524                notifier->n_status = status;
 525                spin_lock(&rs->rs_lock);
 526                list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
 527                spin_unlock(&rs->rs_lock);
 528
 529                ao->op_notifier = NULL;
 530        }
 531
 532        spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 533
 534        if (rs) {
 535                rds_wake_sk_sleep(rs);
 536                sock_put(rds_rs_to_sk(rs));
 537        }
 538}
 539EXPORT_SYMBOL_GPL(rds_atomic_send_complete);
 540
 541/*
 542 * This is the same as rds_rdma_send_complete except we
 543 * don't do any locking - we have all the ingredients (message,
 544 * socket, socket lock) and can just move the notifier.
 545 */
 546static inline void
 547__rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status)
 548{
 549        struct rm_rdma_op *ro;
 550        struct rm_atomic_op *ao;
 551
 552        ro = &rm->rdma;
 553        if (ro->op_active && ro->op_notify && ro->op_notifier) {
 554                ro->op_notifier->n_status = status;
 555                list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue);
 556                ro->op_notifier = NULL;
 557        }
 558
 559        ao = &rm->atomic;
 560        if (ao->op_active && ao->op_notify && ao->op_notifier) {
 561                ao->op_notifier->n_status = status;
 562                list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue);
 563                ao->op_notifier = NULL;
 564        }
 565
 566        /* No need to wake the app - caller does this */
 567}
 568
 569/*
 570 * This removes messages from the socket's list if they're on it.  The list
 571 * argument must be private to the caller, we must be able to modify it
 572 * without locks.  The messages must have a reference held for their
 573 * position on the list.  This function will drop that reference after
 574 * removing the messages from the 'messages' list regardless of if it found
 575 * the messages on the socket list or not.
 576 */
 577static void rds_send_remove_from_sock(struct list_head *messages, int status)
 578{
 579        unsigned long flags;
 580        struct rds_sock *rs = NULL;
 581        struct rds_message *rm;
 582
 583        while (!list_empty(messages)) {
 584                int was_on_sock = 0;
 585
 586                rm = list_entry(messages->next, struct rds_message,
 587                                m_conn_item);
 588                list_del_init(&rm->m_conn_item);
 589
 590                /*
 591                 * If we see this flag cleared then we're *sure* that someone
 592                 * else beat us to removing it from the sock.  If we race
 593                 * with their flag update we'll get the lock and then really
 594                 * see that the flag has been cleared.
 595                 *
 596                 * The message spinlock makes sure nobody clears rm->m_rs
 597                 * while we're messing with it. It does not prevent the
 598                 * message from being removed from the socket, though.
 599                 */
 600                spin_lock_irqsave(&rm->m_rs_lock, flags);
 601                if (!test_bit(RDS_MSG_ON_SOCK, &rm->m_flags))
 602                        goto unlock_and_drop;
 603
 604                if (rs != rm->m_rs) {
 605                        if (rs) {
 606                                rds_wake_sk_sleep(rs);
 607                                sock_put(rds_rs_to_sk(rs));
 608                        }
 609                        rs = rm->m_rs;
 610                        if (rs)
 611                                sock_hold(rds_rs_to_sk(rs));
 612                }
 613                if (!rs)
 614                        goto unlock_and_drop;
 615                spin_lock(&rs->rs_lock);
 616
 617                if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) {
 618                        struct rm_rdma_op *ro = &rm->rdma;
 619                        struct rds_notifier *notifier;
 620
 621                        list_del_init(&rm->m_sock_item);
 622                        rds_send_sndbuf_remove(rs, rm);
 623
 624                        if (ro->op_active && ro->op_notifier &&
 625                               (ro->op_notify || (ro->op_recverr && status))) {
 626                                notifier = ro->op_notifier;
 627                                list_add_tail(&notifier->n_list,
 628                                                &rs->rs_notify_queue);
 629                                if (!notifier->n_status)
 630                                        notifier->n_status = status;
 631                                rm->rdma.op_notifier = NULL;
 632                        }
 633                        was_on_sock = 1;
 634                        rm->m_rs = NULL;
 635                }
 636                spin_unlock(&rs->rs_lock);
 637
 638unlock_and_drop:
 639                spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 640                rds_message_put(rm);
 641                if (was_on_sock)
 642                        rds_message_put(rm);
 643        }
 644
 645        if (rs) {
 646                rds_wake_sk_sleep(rs);
 647                sock_put(rds_rs_to_sk(rs));
 648        }
 649}
 650
 651/*
 652 * Transports call here when they've determined that the receiver queued
 653 * messages up to, and including, the given sequence number.  Messages are
 654 * moved to the retrans queue when rds_send_xmit picks them off the send
 655 * queue. This means that in the TCP case, the message may not have been
 656 * assigned the m_ack_seq yet - but that's fine as long as tcp_is_acked
 657 * checks the RDS_MSG_HAS_ACK_SEQ bit.
 658 */
 659void rds_send_path_drop_acked(struct rds_conn_path *cp, u64 ack,
 660                              is_acked_func is_acked)
 661{
 662        struct rds_message *rm, *tmp;
 663        unsigned long flags;
 664        LIST_HEAD(list);
 665
 666        spin_lock_irqsave(&cp->cp_lock, flags);
 667
 668        list_for_each_entry_safe(rm, tmp, &cp->cp_retrans, m_conn_item) {
 669                if (!rds_send_is_acked(rm, ack, is_acked))
 670                        break;
 671
 672                list_move(&rm->m_conn_item, &list);
 673                clear_bit(RDS_MSG_ON_CONN, &rm->m_flags);
 674        }
 675
 676        /* order flag updates with spin locks */
 677        if (!list_empty(&list))
 678                smp_mb__after_atomic();
 679
 680        spin_unlock_irqrestore(&cp->cp_lock, flags);
 681
 682        /* now remove the messages from the sock list as needed */
 683        rds_send_remove_from_sock(&list, RDS_RDMA_SUCCESS);
 684}
 685EXPORT_SYMBOL_GPL(rds_send_path_drop_acked);
 686
 687void rds_send_drop_acked(struct rds_connection *conn, u64 ack,
 688                         is_acked_func is_acked)
 689{
 690        WARN_ON(conn->c_trans->t_mp_capable);
 691        rds_send_path_drop_acked(&conn->c_path[0], ack, is_acked);
 692}
 693EXPORT_SYMBOL_GPL(rds_send_drop_acked);
 694
 695void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
 696{
 697        struct rds_message *rm, *tmp;
 698        struct rds_connection *conn;
 699        struct rds_conn_path *cp;
 700        unsigned long flags;
 701        LIST_HEAD(list);
 702
 703        /* get all the messages we're dropping under the rs lock */
 704        spin_lock_irqsave(&rs->rs_lock, flags);
 705
 706        list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) {
 707                if (dest && (dest->sin_addr.s_addr != rm->m_daddr ||
 708                             dest->sin_port != rm->m_inc.i_hdr.h_dport))
 709                        continue;
 710
 711                list_move(&rm->m_sock_item, &list);
 712                rds_send_sndbuf_remove(rs, rm);
 713                clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
 714        }
 715
 716        /* order flag updates with the rs lock */
 717        smp_mb__after_atomic();
 718
 719        spin_unlock_irqrestore(&rs->rs_lock, flags);
 720
 721        if (list_empty(&list))
 722                return;
 723
 724        /* Remove the messages from the conn */
 725        list_for_each_entry(rm, &list, m_sock_item) {
 726
 727                conn = rm->m_inc.i_conn;
 728                if (conn->c_trans->t_mp_capable)
 729                        cp = rm->m_inc.i_conn_path;
 730                else
 731                        cp = &conn->c_path[0];
 732
 733                spin_lock_irqsave(&cp->cp_lock, flags);
 734                /*
 735                 * Maybe someone else beat us to removing rm from the conn.
 736                 * If we race with their flag update we'll get the lock and
 737                 * then really see that the flag has been cleared.
 738                 */
 739                if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
 740                        spin_unlock_irqrestore(&cp->cp_lock, flags);
 741                        spin_lock_irqsave(&rm->m_rs_lock, flags);
 742                        rm->m_rs = NULL;
 743                        spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 744                        continue;
 745                }
 746                list_del_init(&rm->m_conn_item);
 747                spin_unlock_irqrestore(&cp->cp_lock, flags);
 748
 749                /*
 750                 * Couldn't grab m_rs_lock in top loop (lock ordering),
 751                 * but we can now.
 752                 */
 753                spin_lock_irqsave(&rm->m_rs_lock, flags);
 754
 755                spin_lock(&rs->rs_lock);
 756                __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
 757                spin_unlock(&rs->rs_lock);
 758
 759                rm->m_rs = NULL;
 760                spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 761
 762                rds_message_put(rm);
 763        }
 764
 765        rds_wake_sk_sleep(rs);
 766
 767        while (!list_empty(&list)) {
 768                rm = list_entry(list.next, struct rds_message, m_sock_item);
 769                list_del_init(&rm->m_sock_item);
 770                rds_message_wait(rm);
 771
 772                /* just in case the code above skipped this message
 773                 * because RDS_MSG_ON_CONN wasn't set, run it again here
 774                 * taking m_rs_lock is the only thing that keeps us
 775                 * from racing with ack processing.
 776                 */
 777                spin_lock_irqsave(&rm->m_rs_lock, flags);
 778
 779                spin_lock(&rs->rs_lock);
 780                __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
 781                spin_unlock(&rs->rs_lock);
 782
 783                rm->m_rs = NULL;
 784                spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 785
 786                rds_message_put(rm);
 787        }
 788}
 789
 790/*
 791 * we only want this to fire once so we use the callers 'queued'.  It's
 792 * possible that another thread can race with us and remove the
 793 * message from the flow with RDS_CANCEL_SENT_TO.
 794 */
 795static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
 796                             struct rds_conn_path *cp,
 797                             struct rds_message *rm, __be16 sport,
 798                             __be16 dport, int *queued)
 799{
 800        unsigned long flags;
 801        u32 len;
 802
 803        if (*queued)
 804                goto out;
 805
 806        len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
 807
 808        /* this is the only place which holds both the socket's rs_lock
 809         * and the connection's c_lock */
 810        spin_lock_irqsave(&rs->rs_lock, flags);
 811
 812        /*
 813         * If there is a little space in sndbuf, we don't queue anything,
 814         * and userspace gets -EAGAIN. But poll() indicates there's send
 815         * room. This can lead to bad behavior (spinning) if snd_bytes isn't
 816         * freed up by incoming acks. So we check the *old* value of
 817         * rs_snd_bytes here to allow the last msg to exceed the buffer,
 818         * and poll() now knows no more data can be sent.
 819         */
 820        if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) {
 821                rs->rs_snd_bytes += len;
 822
 823                /* let recv side know we are close to send space exhaustion.
 824                 * This is probably not the optimal way to do it, as this
 825                 * means we set the flag on *all* messages as soon as our
 826                 * throughput hits a certain threshold.
 827                 */
 828                if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2)
 829                        __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
 830
 831                list_add_tail(&rm->m_sock_item, &rs->rs_send_queue);
 832                set_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
 833                rds_message_addref(rm);
 834                rm->m_rs = rs;
 835
 836                /* The code ordering is a little weird, but we're
 837                   trying to minimize the time we hold c_lock */
 838                rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport, 0);
 839                rm->m_inc.i_conn = conn;
 840                rm->m_inc.i_conn_path = cp;
 841                rds_message_addref(rm);
 842
 843                spin_lock(&cp->cp_lock);
 844                rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++);
 845                list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
 846                set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
 847                spin_unlock(&cp->cp_lock);
 848
 849                rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n",
 850                         rm, len, rs, rs->rs_snd_bytes,
 851                         (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence));
 852
 853                *queued = 1;
 854        }
 855
 856        spin_unlock_irqrestore(&rs->rs_lock, flags);
 857out:
 858        return *queued;
 859}
 860
 861/*
 862 * rds_message is getting to be quite complicated, and we'd like to allocate
 863 * it all in one go. This figures out how big it needs to be up front.
 864 */
 865static int rds_rm_size(struct msghdr *msg, int data_len)
 866{
 867        struct cmsghdr *cmsg;
 868        int size = 0;
 869        int cmsg_groups = 0;
 870        int retval;
 871
 872        for_each_cmsghdr(cmsg, msg) {
 873                if (!CMSG_OK(msg, cmsg))
 874                        return -EINVAL;
 875
 876                if (cmsg->cmsg_level != SOL_RDS)
 877                        continue;
 878
 879                switch (cmsg->cmsg_type) {
 880                case RDS_CMSG_RDMA_ARGS:
 881                        cmsg_groups |= 1;
 882                        retval = rds_rdma_extra_size(CMSG_DATA(cmsg));
 883                        if (retval < 0)
 884                                return retval;
 885                        size += retval;
 886
 887                        break;
 888
 889                case RDS_CMSG_RDMA_DEST:
 890                case RDS_CMSG_RDMA_MAP:
 891                        cmsg_groups |= 2;
 892                        /* these are valid but do no add any size */
 893                        break;
 894
 895                case RDS_CMSG_ATOMIC_CSWP:
 896                case RDS_CMSG_ATOMIC_FADD:
 897                case RDS_CMSG_MASKED_ATOMIC_CSWP:
 898                case RDS_CMSG_MASKED_ATOMIC_FADD:
 899                        cmsg_groups |= 1;
 900                        size += sizeof(struct scatterlist);
 901                        break;
 902
 903                default:
 904                        return -EINVAL;
 905                }
 906
 907        }
 908
 909        size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist);
 910
 911        /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */
 912        if (cmsg_groups == 3)
 913                return -EINVAL;
 914
 915        return size;
 916}
 917
 918static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
 919                         struct msghdr *msg, int *allocated_mr)
 920{
 921        struct cmsghdr *cmsg;
 922        int ret = 0;
 923
 924        for_each_cmsghdr(cmsg, msg) {
 925                if (!CMSG_OK(msg, cmsg))
 926                        return -EINVAL;
 927
 928                if (cmsg->cmsg_level != SOL_RDS)
 929                        continue;
 930
 931                /* As a side effect, RDMA_DEST and RDMA_MAP will set
 932                 * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr.
 933                 */
 934                switch (cmsg->cmsg_type) {
 935                case RDS_CMSG_RDMA_ARGS:
 936                        ret = rds_cmsg_rdma_args(rs, rm, cmsg);
 937                        break;
 938
 939                case RDS_CMSG_RDMA_DEST:
 940                        ret = rds_cmsg_rdma_dest(rs, rm, cmsg);
 941                        break;
 942
 943                case RDS_CMSG_RDMA_MAP:
 944                        ret = rds_cmsg_rdma_map(rs, rm, cmsg);
 945                        if (!ret)
 946                                *allocated_mr = 1;
 947                        break;
 948                case RDS_CMSG_ATOMIC_CSWP:
 949                case RDS_CMSG_ATOMIC_FADD:
 950                case RDS_CMSG_MASKED_ATOMIC_CSWP:
 951                case RDS_CMSG_MASKED_ATOMIC_FADD:
 952                        ret = rds_cmsg_atomic(rs, rm, cmsg);
 953                        break;
 954
 955                default:
 956                        return -EINVAL;
 957                }
 958
 959                if (ret)
 960                        break;
 961        }
 962
 963        return ret;
 964}
 965
 966static void rds_send_ping(struct rds_connection *conn);
 967
 968static int rds_send_mprds_hash(struct rds_sock *rs, struct rds_connection *conn)
 969{
 970        int hash;
 971
 972        if (conn->c_npaths == 0)
 973                hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
 974        else
 975                hash = RDS_MPATH_HASH(rs, conn->c_npaths);
 976        if (conn->c_npaths == 0 && hash != 0) {
 977                rds_send_ping(conn);
 978
 979                if (conn->c_npaths == 0) {
 980                        wait_event_interruptible(conn->c_hs_waitq,
 981                                                 (conn->c_npaths != 0));
 982                }
 983                if (conn->c_npaths == 1)
 984                        hash = 0;
 985        }
 986        return hash;
 987}
 988
 989int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 990{
 991        struct sock *sk = sock->sk;
 992        struct rds_sock *rs = rds_sk_to_rs(sk);
 993        DECLARE_SOCKADDR(struct sockaddr_in *, usin, msg->msg_name);
 994        __be32 daddr;
 995        __be16 dport;
 996        struct rds_message *rm = NULL;
 997        struct rds_connection *conn;
 998        int ret = 0;
 999        int queued = 0, allocated_mr = 0;
1000        int nonblock = msg->msg_flags & MSG_DONTWAIT;
1001        long timeo = sock_sndtimeo(sk, nonblock);
1002        struct rds_conn_path *cpath;
1003
1004        /* Mirror Linux UDP mirror of BSD error message compatibility */
1005        /* XXX: Perhaps MSG_MORE someday */
1006        if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT)) {
1007                ret = -EOPNOTSUPP;
1008                goto out;
1009        }
1010
1011        if (msg->msg_namelen) {
1012                /* XXX fail non-unicast destination IPs? */
1013                if (msg->msg_namelen < sizeof(*usin) || usin->sin_family != AF_INET) {
1014                        ret = -EINVAL;
1015                        goto out;
1016                }
1017                daddr = usin->sin_addr.s_addr;
1018                dport = usin->sin_port;
1019        } else {
1020                /* We only care about consistency with ->connect() */
1021                lock_sock(sk);
1022                daddr = rs->rs_conn_addr;
1023                dport = rs->rs_conn_port;
1024                release_sock(sk);
1025        }
1026
1027        lock_sock(sk);
1028        if (daddr == 0 || rs->rs_bound_addr == 0) {
1029                release_sock(sk);
1030                ret = -ENOTCONN; /* XXX not a great errno */
1031                goto out;
1032        }
1033        release_sock(sk);
1034
1035        if (payload_len > rds_sk_sndbuf(rs)) {
1036                ret = -EMSGSIZE;
1037                goto out;
1038        }
1039
1040        /* size of rm including all sgs */
1041        ret = rds_rm_size(msg, payload_len);
1042        if (ret < 0)
1043                goto out;
1044
1045        rm = rds_message_alloc(ret, GFP_KERNEL);
1046        if (!rm) {
1047                ret = -ENOMEM;
1048                goto out;
1049        }
1050
1051        /* Attach data to the rm */
1052        if (payload_len) {
1053                rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE));
1054                if (!rm->data.op_sg) {
1055                        ret = -ENOMEM;
1056                        goto out;
1057                }
1058                ret = rds_message_copy_from_user(rm, &msg->msg_iter);
1059                if (ret)
1060                        goto out;
1061        }
1062        rm->data.op_active = 1;
1063
1064        rm->m_daddr = daddr;
1065
1066        /* rds_conn_create has a spinlock that runs with IRQ off.
1067         * Caching the conn in the socket helps a lot. */
1068        if (rs->rs_conn && rs->rs_conn->c_faddr == daddr)
1069                conn = rs->rs_conn;
1070        else {
1071                conn = rds_conn_create_outgoing(sock_net(sock->sk),
1072                                                rs->rs_bound_addr, daddr,
1073                                        rs->rs_transport,
1074                                        sock->sk->sk_allocation);
1075                if (IS_ERR(conn)) {
1076                        ret = PTR_ERR(conn);
1077                        goto out;
1078                }
1079                rs->rs_conn = conn;
1080        }
1081
1082        /* Parse any control messages the user may have included. */
1083        ret = rds_cmsg_send(rs, rm, msg, &allocated_mr);
1084        if (ret)
1085                goto out;
1086
1087        if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
1088                printk_ratelimited(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
1089                               &rm->rdma, conn->c_trans->xmit_rdma);
1090                ret = -EOPNOTSUPP;
1091                goto out;
1092        }
1093
1094        if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) {
1095                printk_ratelimited(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n",
1096                               &rm->atomic, conn->c_trans->xmit_atomic);
1097                ret = -EOPNOTSUPP;
1098                goto out;
1099        }
1100
1101        if (conn->c_trans->t_mp_capable)
1102                cpath = &conn->c_path[rds_send_mprds_hash(rs, conn)];
1103        else
1104                cpath = &conn->c_path[0];
1105
1106        rds_conn_path_connect_if_down(cpath);
1107
1108        ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
1109        if (ret) {
1110                rs->rs_seen_congestion = 1;
1111                goto out;
1112        }
1113        while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port,
1114                                  dport, &queued)) {
1115                rds_stats_inc(s_send_queue_full);
1116
1117                if (nonblock) {
1118                        ret = -EAGAIN;
1119                        goto out;
1120                }
1121
1122                timeo = wait_event_interruptible_timeout(*sk_sleep(sk),
1123                                        rds_send_queue_rm(rs, conn, cpath, rm,
1124                                                          rs->rs_bound_port,
1125                                                          dport,
1126                                                          &queued),
1127                                        timeo);
1128                rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo);
1129                if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
1130                        continue;
1131
1132                ret = timeo;
1133                if (ret == 0)
1134                        ret = -ETIMEDOUT;
1135                goto out;
1136        }
1137
1138        /*
1139         * By now we've committed to the send.  We reuse rds_send_worker()
1140         * to retry sends in the rds thread if the transport asks us to.
1141         */
1142        rds_stats_inc(s_send_queued);
1143
1144        ret = rds_send_xmit(cpath);
1145        if (ret == -ENOMEM || ret == -EAGAIN)
1146                queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
1147
1148        rds_message_put(rm);
1149        return payload_len;
1150
1151out:
1152        /* If the user included a RDMA_MAP cmsg, we allocated a MR on the fly.
1153         * If the sendmsg goes through, we keep the MR. If it fails with EAGAIN
1154         * or in any other way, we need to destroy the MR again */
1155        if (allocated_mr)
1156                rds_rdma_unuse(rs, rds_rdma_cookie_key(rm->m_rdma_cookie), 1);
1157
1158        if (rm)
1159                rds_message_put(rm);
1160        return ret;
1161}
1162
1163/*
1164 * send out a probe. Can be shared by rds_send_ping,
1165 * rds_send_pong, rds_send_hb.
1166 * rds_send_hb should use h_flags
1167 *   RDS_FLAG_HB_PING|RDS_FLAG_ACK_REQUIRED
1168 * or
1169 *   RDS_FLAG_HB_PONG|RDS_FLAG_ACK_REQUIRED
1170 */
1171int
1172rds_send_probe(struct rds_conn_path *cp, __be16 sport,
1173               __be16 dport, u8 h_flags)
1174{
1175        struct rds_message *rm;
1176        unsigned long flags;
1177        int ret = 0;
1178
1179        rm = rds_message_alloc(0, GFP_ATOMIC);
1180        if (!rm) {
1181                ret = -ENOMEM;
1182                goto out;
1183        }
1184
1185        rm->m_daddr = cp->cp_conn->c_faddr;
1186        rm->data.op_active = 1;
1187
1188        rds_conn_path_connect_if_down(cp);
1189
1190        ret = rds_cong_wait(cp->cp_conn->c_fcong, dport, 1, NULL);
1191        if (ret)
1192                goto out;
1193
1194        spin_lock_irqsave(&cp->cp_lock, flags);
1195        list_add_tail(&rm->m_conn_item, &cp->cp_send_queue);
1196        set_bit(RDS_MSG_ON_CONN, &rm->m_flags);
1197        rds_message_addref(rm);
1198        rm->m_inc.i_conn = cp->cp_conn;
1199        rm->m_inc.i_conn_path = cp;
1200
1201        rds_message_populate_header(&rm->m_inc.i_hdr, sport, dport,
1202                                    cp->cp_next_tx_seq);
1203        rm->m_inc.i_hdr.h_flags |= h_flags;
1204        cp->cp_next_tx_seq++;
1205
1206        if (RDS_HS_PROBE(sport, dport) && cp->cp_conn->c_trans->t_mp_capable) {
1207                u16 npaths = RDS_MPATH_WORKERS;
1208
1209                rds_message_add_extension(&rm->m_inc.i_hdr,
1210                                          RDS_EXTHDR_NPATHS, &npaths,
1211                                          sizeof(npaths));
1212        }
1213        spin_unlock_irqrestore(&cp->cp_lock, flags);
1214
1215        rds_stats_inc(s_send_queued);
1216        rds_stats_inc(s_send_pong);
1217
1218        /* schedule the send work on rds_wq */
1219        queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
1220
1221        rds_message_put(rm);
1222        return 0;
1223
1224out:
1225        if (rm)
1226                rds_message_put(rm);
1227        return ret;
1228}
1229
1230int
1231rds_send_pong(struct rds_conn_path *cp, __be16 dport)
1232{
1233        return rds_send_probe(cp, 0, dport, 0);
1234}
1235
1236void
1237rds_send_ping(struct rds_connection *conn)
1238{
1239        unsigned long flags;
1240        struct rds_conn_path *cp = &conn->c_path[0];
1241
1242        spin_lock_irqsave(&cp->cp_lock, flags);
1243        if (conn->c_ping_triggered) {
1244                spin_unlock_irqrestore(&cp->cp_lock, flags);
1245                return;
1246        }
1247        conn->c_ping_triggered = 1;
1248        spin_unlock_irqrestore(&cp->cp_lock, flags);
1249        rds_send_probe(&conn->c_path[0], RDS_FLAG_PROBE_PORT, 0, 0);
1250}
1251