linux/net/sunrpc/xprtmultipath.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0
   2/*
   3 * Multipath support for RPC
   4 *
   5 * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved.
   6 *
   7 * Trond Myklebust <trond.myklebust@primarydata.com>
   8 *
   9 */
  10#include <linux/types.h>
  11#include <linux/kref.h>
  12#include <linux/list.h>
  13#include <linux/rcupdate.h>
  14#include <linux/rculist.h>
  15#include <linux/slab.h>
  16#include <asm/cmpxchg.h>
  17#include <linux/spinlock.h>
  18#include <linux/sunrpc/xprt.h>
  19#include <linux/sunrpc/addr.h>
  20#include <linux/sunrpc/xprtmultipath.h>
  21
  22typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct rpc_xprt_switch *xps,
  23                const struct rpc_xprt *cur);
  24
  25static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular;
  26static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin;
  27static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall;
  28
  29static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps,
  30                struct rpc_xprt *xprt)
  31{
  32        if (unlikely(xprt_get(xprt) == NULL))
  33                return;
  34        list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list);
  35        smp_wmb();
  36        if (xps->xps_nxprts == 0)
  37                xps->xps_net = xprt->xprt_net;
  38        xps->xps_nxprts++;
  39        xps->xps_nactive++;
  40}
  41
  42/**
  43 * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch
  44 * @xps: pointer to struct rpc_xprt_switch
  45 * @xprt: pointer to struct rpc_xprt
  46 *
  47 * Adds xprt to the end of the list of struct rpc_xprt in xps.
  48 */
  49void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps,
  50                struct rpc_xprt *xprt)
  51{
  52        if (xprt == NULL)
  53                return;
  54        spin_lock(&xps->xps_lock);
  55        if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL)
  56                xprt_switch_add_xprt_locked(xps, xprt);
  57        spin_unlock(&xps->xps_lock);
  58}
  59
  60static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps,
  61                struct rpc_xprt *xprt)
  62{
  63        if (unlikely(xprt == NULL))
  64                return;
  65        xps->xps_nactive--;
  66        xps->xps_nxprts--;
  67        if (xps->xps_nxprts == 0)
  68                xps->xps_net = NULL;
  69        smp_wmb();
  70        list_del_rcu(&xprt->xprt_switch);
  71}
  72
  73/**
  74 * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch
  75 * @xps: pointer to struct rpc_xprt_switch
  76 * @xprt: pointer to struct rpc_xprt
  77 *
  78 * Removes xprt from the list of struct rpc_xprt in xps.
  79 */
  80void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps,
  81                struct rpc_xprt *xprt)
  82{
  83        spin_lock(&xps->xps_lock);
  84        xprt_switch_remove_xprt_locked(xps, xprt);
  85        spin_unlock(&xps->xps_lock);
  86        xprt_put(xprt);
  87}
  88
  89/**
  90 * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch
  91 * @xprt: pointer to struct rpc_xprt
  92 * @gfp_flags: allocation flags
  93 *
  94 * On success, returns an initialised struct rpc_xprt_switch, containing
  95 * the entry xprt. Returns NULL on failure.
  96 */
  97struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt,
  98                gfp_t gfp_flags)
  99{
 100        struct rpc_xprt_switch *xps;
 101
 102        xps = kmalloc(sizeof(*xps), gfp_flags);
 103        if (xps != NULL) {
 104                spin_lock_init(&xps->xps_lock);
 105                kref_init(&xps->xps_kref);
 106                xps->xps_nxprts = xps->xps_nactive = 0;
 107                atomic_long_set(&xps->xps_queuelen, 0);
 108                xps->xps_net = NULL;
 109                INIT_LIST_HEAD(&xps->xps_xprt_list);
 110                xps->xps_iter_ops = &rpc_xprt_iter_singular;
 111                xprt_switch_add_xprt_locked(xps, xprt);
 112        }
 113
 114        return xps;
 115}
 116
 117static void xprt_switch_free_entries(struct rpc_xprt_switch *xps)
 118{
 119        spin_lock(&xps->xps_lock);
 120        while (!list_empty(&xps->xps_xprt_list)) {
 121                struct rpc_xprt *xprt;
 122
 123                xprt = list_first_entry(&xps->xps_xprt_list,
 124                                struct rpc_xprt, xprt_switch);
 125                xprt_switch_remove_xprt_locked(xps, xprt);
 126                spin_unlock(&xps->xps_lock);
 127                xprt_put(xprt);
 128                spin_lock(&xps->xps_lock);
 129        }
 130        spin_unlock(&xps->xps_lock);
 131}
 132
 133static void xprt_switch_free(struct kref *kref)
 134{
 135        struct rpc_xprt_switch *xps = container_of(kref,
 136                        struct rpc_xprt_switch, xps_kref);
 137
 138        xprt_switch_free_entries(xps);
 139        kfree_rcu(xps, xps_rcu);
 140}
 141
 142/**
 143 * xprt_switch_get - Return a reference to a rpc_xprt_switch
 144 * @xps: pointer to struct rpc_xprt_switch
 145 *
 146 * Returns a reference to xps unless the refcount is already zero.
 147 */
 148struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps)
 149{
 150        if (xps != NULL && kref_get_unless_zero(&xps->xps_kref))
 151                return xps;
 152        return NULL;
 153}
 154
 155/**
 156 * xprt_switch_put - Release a reference to a rpc_xprt_switch
 157 * @xps: pointer to struct rpc_xprt_switch
 158 *
 159 * Release the reference to xps, and free it once the refcount is zero.
 160 */
 161void xprt_switch_put(struct rpc_xprt_switch *xps)
 162{
 163        if (xps != NULL)
 164                kref_put(&xps->xps_kref, xprt_switch_free);
 165}
 166
 167/**
 168 * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch
 169 * @xps: pointer to struct rpc_xprt_switch
 170 *
 171 * Sets a round-robin default policy for iterators acting on xps.
 172 */
 173void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps)
 174{
 175        if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin)
 176                WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin);
 177}
 178
 179static
 180const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi)
 181{
 182        if (xpi->xpi_ops != NULL)
 183                return xpi->xpi_ops;
 184        return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops;
 185}
 186
 187static
 188void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi)
 189{
 190}
 191
 192static
 193void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi)
 194{
 195        WRITE_ONCE(xpi->xpi_cursor, NULL);
 196}
 197
 198static
 199bool xprt_is_active(const struct rpc_xprt *xprt)
 200{
 201        return kref_read(&xprt->kref) != 0;
 202}
 203
 204static
 205struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head)
 206{
 207        struct rpc_xprt *pos;
 208
 209        list_for_each_entry_rcu(pos, head, xprt_switch) {
 210                if (xprt_is_active(pos))
 211                        return pos;
 212        }
 213        return NULL;
 214}
 215
 216static
 217struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi)
 218{
 219        struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
 220
 221        if (xps == NULL)
 222                return NULL;
 223        return xprt_switch_find_first_entry(&xps->xps_xprt_list);
 224}
 225
 226static
 227struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head,
 228                const struct rpc_xprt *cur)
 229{
 230        struct rpc_xprt *pos;
 231        bool found = false;
 232
 233        list_for_each_entry_rcu(pos, head, xprt_switch) {
 234                if (cur == pos)
 235                        found = true;
 236                if (found && xprt_is_active(pos))
 237                        return pos;
 238        }
 239        return NULL;
 240}
 241
 242static
 243struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi)
 244{
 245        struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
 246        struct list_head *head;
 247
 248        if (xps == NULL)
 249                return NULL;
 250        head = &xps->xps_xprt_list;
 251        if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2)
 252                return xprt_switch_find_first_entry(head);
 253        return xprt_switch_find_current_entry(head, xpi->xpi_cursor);
 254}
 255
 256bool rpc_xprt_switch_has_addr(struct rpc_xprt_switch *xps,
 257                              const struct sockaddr *sap)
 258{
 259        struct list_head *head;
 260        struct rpc_xprt *pos;
 261
 262        if (xps == NULL || sap == NULL)
 263                return false;
 264
 265        head = &xps->xps_xprt_list;
 266        list_for_each_entry_rcu(pos, head, xprt_switch) {
 267                if (rpc_cmp_addr_port(sap, (struct sockaddr *)&pos->addr)) {
 268                        pr_info("RPC:   addr %s already in xprt switch\n",
 269                                pos->address_strings[RPC_DISPLAY_ADDR]);
 270                        return true;
 271                }
 272        }
 273        return false;
 274}
 275
 276static
 277struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head,
 278                const struct rpc_xprt *cur)
 279{
 280        struct rpc_xprt *pos, *prev = NULL;
 281        bool found = false;
 282
 283        list_for_each_entry_rcu(pos, head, xprt_switch) {
 284                if (cur == prev)
 285                        found = true;
 286                if (found && xprt_is_active(pos))
 287                        return pos;
 288                prev = pos;
 289        }
 290        return NULL;
 291}
 292
 293static
 294struct rpc_xprt *xprt_switch_set_next_cursor(struct rpc_xprt_switch *xps,
 295                struct rpc_xprt **cursor,
 296                xprt_switch_find_xprt_t find_next)
 297{
 298        struct rpc_xprt *pos, *old;
 299
 300        old = smp_load_acquire(cursor);
 301        pos = find_next(xps, old);
 302        smp_store_release(cursor, pos);
 303        return pos;
 304}
 305
 306static
 307struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi,
 308                xprt_switch_find_xprt_t find_next)
 309{
 310        struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
 311
 312        if (xps == NULL)
 313                return NULL;
 314        return xprt_switch_set_next_cursor(xps, &xpi->xpi_cursor, find_next);
 315}
 316
 317static
 318struct rpc_xprt *__xprt_switch_find_next_entry_roundrobin(struct list_head *head,
 319                const struct rpc_xprt *cur)
 320{
 321        struct rpc_xprt *ret;
 322
 323        ret = xprt_switch_find_next_entry(head, cur);
 324        if (ret != NULL)
 325                return ret;
 326        return xprt_switch_find_first_entry(head);
 327}
 328
 329static
 330struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct rpc_xprt_switch *xps,
 331                const struct rpc_xprt *cur)
 332{
 333        struct list_head *head = &xps->xps_xprt_list;
 334        struct rpc_xprt *xprt;
 335        unsigned int nactive;
 336
 337        for (;;) {
 338                unsigned long xprt_queuelen, xps_queuelen;
 339
 340                xprt = __xprt_switch_find_next_entry_roundrobin(head, cur);
 341                if (!xprt)
 342                        break;
 343                xprt_queuelen = atomic_long_read(&xprt->queuelen);
 344                xps_queuelen = atomic_long_read(&xps->xps_queuelen);
 345                nactive = READ_ONCE(xps->xps_nactive);
 346                /* Exit loop if xprt_queuelen <= average queue length */
 347                if (xprt_queuelen * nactive <= xps_queuelen)
 348                        break;
 349                cur = xprt;
 350        }
 351        return xprt;
 352}
 353
 354static
 355struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi)
 356{
 357        return xprt_iter_next_entry_multiple(xpi,
 358                        xprt_switch_find_next_entry_roundrobin);
 359}
 360
 361static
 362struct rpc_xprt *xprt_switch_find_next_entry_all(struct rpc_xprt_switch *xps,
 363                const struct rpc_xprt *cur)
 364{
 365        return xprt_switch_find_next_entry(&xps->xps_xprt_list, cur);
 366}
 367
 368static
 369struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi)
 370{
 371        return xprt_iter_next_entry_multiple(xpi,
 372                        xprt_switch_find_next_entry_all);
 373}
 374
 375/*
 376 * xprt_iter_rewind - Resets the xprt iterator
 377 * @xpi: pointer to rpc_xprt_iter
 378 *
 379 * Resets xpi to ensure that it points to the first entry in the list
 380 * of transports.
 381 */
 382static
 383void xprt_iter_rewind(struct rpc_xprt_iter *xpi)
 384{
 385        rcu_read_lock();
 386        xprt_iter_ops(xpi)->xpi_rewind(xpi);
 387        rcu_read_unlock();
 388}
 389
 390static void __xprt_iter_init(struct rpc_xprt_iter *xpi,
 391                struct rpc_xprt_switch *xps,
 392                const struct rpc_xprt_iter_ops *ops)
 393{
 394        rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps));
 395        xpi->xpi_cursor = NULL;
 396        xpi->xpi_ops = ops;
 397}
 398
 399/**
 400 * xprt_iter_init - Initialise an xprt iterator
 401 * @xpi: pointer to rpc_xprt_iter
 402 * @xps: pointer to rpc_xprt_switch
 403 *
 404 * Initialises the iterator to use the default iterator ops
 405 * as set in xps. This function is mainly intended for internal
 406 * use in the rpc_client.
 407 */
 408void xprt_iter_init(struct rpc_xprt_iter *xpi,
 409                struct rpc_xprt_switch *xps)
 410{
 411        __xprt_iter_init(xpi, xps, NULL);
 412}
 413
 414/**
 415 * xprt_iter_init_listall - Initialise an xprt iterator
 416 * @xpi: pointer to rpc_xprt_iter
 417 * @xps: pointer to rpc_xprt_switch
 418 *
 419 * Initialises the iterator to iterate once through the entire list
 420 * of entries in xps.
 421 */
 422void xprt_iter_init_listall(struct rpc_xprt_iter *xpi,
 423                struct rpc_xprt_switch *xps)
 424{
 425        __xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall);
 426}
 427
 428/**
 429 * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch
 430 * @xpi: pointer to rpc_xprt_iter
 431 * @newswitch: pointer to a new rpc_xprt_switch or NULL
 432 *
 433 * Swaps out the existing xpi->xpi_xpswitch with a new value.
 434 */
 435struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi,
 436                struct rpc_xprt_switch *newswitch)
 437{
 438        struct rpc_xprt_switch __rcu *oldswitch;
 439
 440        /* Atomically swap out the old xpswitch */
 441        oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch));
 442        if (newswitch != NULL)
 443                xprt_iter_rewind(xpi);
 444        return rcu_dereference_protected(oldswitch, true);
 445}
 446
 447/**
 448 * xprt_iter_destroy - Destroys the xprt iterator
 449 * @xpi: pointer to rpc_xprt_iter
 450 */
 451void xprt_iter_destroy(struct rpc_xprt_iter *xpi)
 452{
 453        xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL));
 454}
 455
 456/**
 457 * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor
 458 * @xpi: pointer to rpc_xprt_iter
 459 *
 460 * Returns a pointer to the struct rpc_xprt that is currently
 461 * pointed to by the cursor.
 462 * Caller must be holding rcu_read_lock().
 463 */
 464struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi)
 465{
 466        WARN_ON_ONCE(!rcu_read_lock_held());
 467        return xprt_iter_ops(xpi)->xpi_xprt(xpi);
 468}
 469
 470static
 471struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi,
 472                struct rpc_xprt *(*fn)(struct rpc_xprt_iter *))
 473{
 474        struct rpc_xprt *ret;
 475
 476        do {
 477                ret = fn(xpi);
 478                if (ret == NULL)
 479                        break;
 480                ret = xprt_get(ret);
 481        } while (ret == NULL);
 482        return ret;
 483}
 484
 485/**
 486 * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor
 487 * @xpi: pointer to rpc_xprt_iter
 488 *
 489 * Returns a reference to the struct rpc_xprt that is currently
 490 * pointed to by the cursor.
 491 */
 492struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi)
 493{
 494        struct rpc_xprt *xprt;
 495
 496        rcu_read_lock();
 497        xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt);
 498        rcu_read_unlock();
 499        return xprt;
 500}
 501
 502/**
 503 * xprt_iter_get_next - Returns the next rpc_xprt following the cursor
 504 * @xpi: pointer to rpc_xprt_iter
 505 *
 506 * Returns a reference to the struct rpc_xprt that immediately follows the
 507 * entry pointed to by the cursor.
 508 */
 509struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi)
 510{
 511        struct rpc_xprt *xprt;
 512
 513        rcu_read_lock();
 514        xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next);
 515        rcu_read_unlock();
 516        return xprt;
 517}
 518
 519/* Policy for always returning the first entry in the rpc_xprt_switch */
 520static
 521const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = {
 522        .xpi_rewind = xprt_iter_no_rewind,
 523        .xpi_xprt = xprt_iter_first_entry,
 524        .xpi_next = xprt_iter_first_entry,
 525};
 526
 527/* Policy for round-robin iteration of entries in the rpc_xprt_switch */
 528static
 529const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = {
 530        .xpi_rewind = xprt_iter_default_rewind,
 531        .xpi_xprt = xprt_iter_current_entry,
 532        .xpi_next = xprt_iter_next_entry_roundrobin,
 533};
 534
 535/* Policy for once-through iteration of entries in the rpc_xprt_switch */
 536static
 537const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = {
 538        .xpi_rewind = xprt_iter_default_rewind,
 539        .xpi_xprt = xprt_iter_current_entry,
 540        .xpi_next = xprt_iter_next_entry_all,
 541};
 542