linux/net/sunrpc/xprtmultipath.c
<<
>>
Prefs
   1/*
   2 * Multipath support for RPC
   3 *
   4 * Copyright (c) 2015, 2016, Primary Data, Inc. All rights reserved.
   5 *
   6 * Trond Myklebust <trond.myklebust@primarydata.com>
   7 *
   8 */
   9#include <linux/types.h>
  10#include <linux/kref.h>
  11#include <linux/list.h>
  12#include <linux/rcupdate.h>
  13#include <linux/rculist.h>
  14#include <linux/slab.h>
  15#include <asm/cmpxchg.h>
  16#include <linux/spinlock.h>
  17#include <linux/sunrpc/xprt.h>
  18#include <linux/sunrpc/xprtmultipath.h>
  19
  20typedef struct rpc_xprt *(*xprt_switch_find_xprt_t)(struct list_head *head,
  21                const struct rpc_xprt *cur);
  22
  23static const struct rpc_xprt_iter_ops rpc_xprt_iter_singular;
  24static const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin;
  25static const struct rpc_xprt_iter_ops rpc_xprt_iter_listall;
  26
  27static void xprt_switch_add_xprt_locked(struct rpc_xprt_switch *xps,
  28                struct rpc_xprt *xprt)
  29{
  30        if (unlikely(xprt_get(xprt) == NULL))
  31                return;
  32        list_add_tail_rcu(&xprt->xprt_switch, &xps->xps_xprt_list);
  33        smp_wmb();
  34        if (xps->xps_nxprts == 0)
  35                xps->xps_net = xprt->xprt_net;
  36        xps->xps_nxprts++;
  37}
  38
  39/**
  40 * rpc_xprt_switch_add_xprt - Add a new rpc_xprt to an rpc_xprt_switch
  41 * @xps: pointer to struct rpc_xprt_switch
  42 * @xprt: pointer to struct rpc_xprt
  43 *
  44 * Adds xprt to the end of the list of struct rpc_xprt in xps.
  45 */
  46void rpc_xprt_switch_add_xprt(struct rpc_xprt_switch *xps,
  47                struct rpc_xprt *xprt)
  48{
  49        if (xprt == NULL)
  50                return;
  51        spin_lock(&xps->xps_lock);
  52        if (xps->xps_net == xprt->xprt_net || xps->xps_net == NULL)
  53                xprt_switch_add_xprt_locked(xps, xprt);
  54        spin_unlock(&xps->xps_lock);
  55}
  56
  57static void xprt_switch_remove_xprt_locked(struct rpc_xprt_switch *xps,
  58                struct rpc_xprt *xprt)
  59{
  60        if (unlikely(xprt == NULL))
  61                return;
  62        xps->xps_nxprts--;
  63        if (xps->xps_nxprts == 0)
  64                xps->xps_net = NULL;
  65        smp_wmb();
  66        list_del_rcu(&xprt->xprt_switch);
  67}
  68
  69/**
  70 * rpc_xprt_switch_remove_xprt - Removes an rpc_xprt from a rpc_xprt_switch
  71 * @xps: pointer to struct rpc_xprt_switch
  72 * @xprt: pointer to struct rpc_xprt
  73 *
  74 * Removes xprt from the list of struct rpc_xprt in xps.
  75 */
  76void rpc_xprt_switch_remove_xprt(struct rpc_xprt_switch *xps,
  77                struct rpc_xprt *xprt)
  78{
  79        spin_lock(&xps->xps_lock);
  80        xprt_switch_remove_xprt_locked(xps, xprt);
  81        spin_unlock(&xps->xps_lock);
  82        xprt_put(xprt);
  83}
  84
  85/**
  86 * xprt_switch_alloc - Allocate a new struct rpc_xprt_switch
  87 * @xprt: pointer to struct rpc_xprt
  88 * @gfp_flags: allocation flags
  89 *
  90 * On success, returns an initialised struct rpc_xprt_switch, containing
  91 * the entry xprt. Returns NULL on failure.
  92 */
  93struct rpc_xprt_switch *xprt_switch_alloc(struct rpc_xprt *xprt,
  94                gfp_t gfp_flags)
  95{
  96        struct rpc_xprt_switch *xps;
  97
  98        xps = kmalloc(sizeof(*xps), gfp_flags);
  99        if (xps != NULL) {
 100                spin_lock_init(&xps->xps_lock);
 101                kref_init(&xps->xps_kref);
 102                xps->xps_nxprts = 0;
 103                INIT_LIST_HEAD(&xps->xps_xprt_list);
 104                xps->xps_iter_ops = &rpc_xprt_iter_singular;
 105                xprt_switch_add_xprt_locked(xps, xprt);
 106        }
 107
 108        return xps;
 109}
 110
 111static void xprt_switch_free_entries(struct rpc_xprt_switch *xps)
 112{
 113        spin_lock(&xps->xps_lock);
 114        while (!list_empty(&xps->xps_xprt_list)) {
 115                struct rpc_xprt *xprt;
 116
 117                xprt = list_first_entry(&xps->xps_xprt_list,
 118                                struct rpc_xprt, xprt_switch);
 119                xprt_switch_remove_xprt_locked(xps, xprt);
 120                spin_unlock(&xps->xps_lock);
 121                xprt_put(xprt);
 122                spin_lock(&xps->xps_lock);
 123        }
 124        spin_unlock(&xps->xps_lock);
 125}
 126
 127static void xprt_switch_free(struct kref *kref)
 128{
 129        struct rpc_xprt_switch *xps = container_of(kref,
 130                        struct rpc_xprt_switch, xps_kref);
 131
 132        xprt_switch_free_entries(xps);
 133        kfree_rcu(xps, xps_rcu);
 134}
 135
 136/**
 137 * xprt_switch_get - Return a reference to a rpc_xprt_switch
 138 * @xps: pointer to struct rpc_xprt_switch
 139 *
 140 * Returns a reference to xps unless the refcount is already zero.
 141 */
 142struct rpc_xprt_switch *xprt_switch_get(struct rpc_xprt_switch *xps)
 143{
 144        if (xps != NULL && kref_get_unless_zero(&xps->xps_kref))
 145                return xps;
 146        return NULL;
 147}
 148
 149/**
 150 * xprt_switch_put - Release a reference to a rpc_xprt_switch
 151 * @xps: pointer to struct rpc_xprt_switch
 152 *
 153 * Release the reference to xps, and free it once the refcount is zero.
 154 */
 155void xprt_switch_put(struct rpc_xprt_switch *xps)
 156{
 157        if (xps != NULL)
 158                kref_put(&xps->xps_kref, xprt_switch_free);
 159}
 160
 161/**
 162 * rpc_xprt_switch_set_roundrobin - Set a round-robin policy on rpc_xprt_switch
 163 * @xps: pointer to struct rpc_xprt_switch
 164 *
 165 * Sets a round-robin default policy for iterators acting on xps.
 166 */
 167void rpc_xprt_switch_set_roundrobin(struct rpc_xprt_switch *xps)
 168{
 169        if (READ_ONCE(xps->xps_iter_ops) != &rpc_xprt_iter_roundrobin)
 170                WRITE_ONCE(xps->xps_iter_ops, &rpc_xprt_iter_roundrobin);
 171}
 172
 173static
 174const struct rpc_xprt_iter_ops *xprt_iter_ops(const struct rpc_xprt_iter *xpi)
 175{
 176        if (xpi->xpi_ops != NULL)
 177                return xpi->xpi_ops;
 178        return rcu_dereference(xpi->xpi_xpswitch)->xps_iter_ops;
 179}
 180
 181static
 182void xprt_iter_no_rewind(struct rpc_xprt_iter *xpi)
 183{
 184}
 185
 186static
 187void xprt_iter_default_rewind(struct rpc_xprt_iter *xpi)
 188{
 189        WRITE_ONCE(xpi->xpi_cursor, NULL);
 190}
 191
 192static
 193struct rpc_xprt *xprt_switch_find_first_entry(struct list_head *head)
 194{
 195        return list_first_or_null_rcu(head, struct rpc_xprt, xprt_switch);
 196}
 197
 198static
 199struct rpc_xprt *xprt_iter_first_entry(struct rpc_xprt_iter *xpi)
 200{
 201        struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
 202
 203        if (xps == NULL)
 204                return NULL;
 205        return xprt_switch_find_first_entry(&xps->xps_xprt_list);
 206}
 207
 208static
 209struct rpc_xprt *xprt_switch_find_current_entry(struct list_head *head,
 210                const struct rpc_xprt *cur)
 211{
 212        struct rpc_xprt *pos;
 213
 214        list_for_each_entry_rcu(pos, head, xprt_switch) {
 215                if (cur == pos)
 216                        return pos;
 217        }
 218        return NULL;
 219}
 220
 221static
 222struct rpc_xprt *xprt_iter_current_entry(struct rpc_xprt_iter *xpi)
 223{
 224        struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
 225        struct list_head *head;
 226
 227        if (xps == NULL)
 228                return NULL;
 229        head = &xps->xps_xprt_list;
 230        if (xpi->xpi_cursor == NULL || xps->xps_nxprts < 2)
 231                return xprt_switch_find_first_entry(head);
 232        return xprt_switch_find_current_entry(head, xpi->xpi_cursor);
 233}
 234
 235static
 236struct rpc_xprt *xprt_switch_find_next_entry(struct list_head *head,
 237                const struct rpc_xprt *cur)
 238{
 239        struct rpc_xprt *pos, *prev = NULL;
 240
 241        list_for_each_entry_rcu(pos, head, xprt_switch) {
 242                if (cur == prev)
 243                        return pos;
 244                prev = pos;
 245        }
 246        return NULL;
 247}
 248
 249static
 250struct rpc_xprt *xprt_switch_set_next_cursor(struct list_head *head,
 251                struct rpc_xprt **cursor,
 252                xprt_switch_find_xprt_t find_next)
 253{
 254        struct rpc_xprt *cur, *pos, *old;
 255
 256        cur = READ_ONCE(*cursor);
 257        for (;;) {
 258                old = cur;
 259                pos = find_next(head, old);
 260                if (pos == NULL)
 261                        break;
 262                cur = cmpxchg_relaxed(cursor, old, pos);
 263                if (cur == old)
 264                        break;
 265        }
 266        return pos;
 267}
 268
 269static
 270struct rpc_xprt *xprt_iter_next_entry_multiple(struct rpc_xprt_iter *xpi,
 271                xprt_switch_find_xprt_t find_next)
 272{
 273        struct rpc_xprt_switch *xps = rcu_dereference(xpi->xpi_xpswitch);
 274        struct list_head *head;
 275
 276        if (xps == NULL)
 277                return NULL;
 278        head = &xps->xps_xprt_list;
 279        if (xps->xps_nxprts < 2)
 280                return xprt_switch_find_first_entry(head);
 281        return xprt_switch_set_next_cursor(head, &xpi->xpi_cursor, find_next);
 282}
 283
 284static
 285struct rpc_xprt *xprt_switch_find_next_entry_roundrobin(struct list_head *head,
 286                const struct rpc_xprt *cur)
 287{
 288        struct rpc_xprt *ret;
 289
 290        ret = xprt_switch_find_next_entry(head, cur);
 291        if (ret != NULL)
 292                return ret;
 293        return xprt_switch_find_first_entry(head);
 294}
 295
 296static
 297struct rpc_xprt *xprt_iter_next_entry_roundrobin(struct rpc_xprt_iter *xpi)
 298{
 299        return xprt_iter_next_entry_multiple(xpi,
 300                        xprt_switch_find_next_entry_roundrobin);
 301}
 302
 303static
 304struct rpc_xprt *xprt_iter_next_entry_all(struct rpc_xprt_iter *xpi)
 305{
 306        return xprt_iter_next_entry_multiple(xpi, xprt_switch_find_next_entry);
 307}
 308
 309/*
 310 * xprt_iter_rewind - Resets the xprt iterator
 311 * @xpi: pointer to rpc_xprt_iter
 312 *
 313 * Resets xpi to ensure that it points to the first entry in the list
 314 * of transports.
 315 */
 316static
 317void xprt_iter_rewind(struct rpc_xprt_iter *xpi)
 318{
 319        rcu_read_lock();
 320        xprt_iter_ops(xpi)->xpi_rewind(xpi);
 321        rcu_read_unlock();
 322}
 323
 324static void __xprt_iter_init(struct rpc_xprt_iter *xpi,
 325                struct rpc_xprt_switch *xps,
 326                const struct rpc_xprt_iter_ops *ops)
 327{
 328        rcu_assign_pointer(xpi->xpi_xpswitch, xprt_switch_get(xps));
 329        xpi->xpi_cursor = NULL;
 330        xpi->xpi_ops = ops;
 331}
 332
 333/**
 334 * xprt_iter_init - Initialise an xprt iterator
 335 * @xpi: pointer to rpc_xprt_iter
 336 * @xps: pointer to rpc_xprt_switch
 337 *
 338 * Initialises the iterator to use the default iterator ops
 339 * as set in xps. This function is mainly intended for internal
 340 * use in the rpc_client.
 341 */
 342void xprt_iter_init(struct rpc_xprt_iter *xpi,
 343                struct rpc_xprt_switch *xps)
 344{
 345        __xprt_iter_init(xpi, xps, NULL);
 346}
 347
 348/**
 349 * xprt_iter_init_listall - Initialise an xprt iterator
 350 * @xpi: pointer to rpc_xprt_iter
 351 * @xps: pointer to rpc_xprt_switch
 352 *
 353 * Initialises the iterator to iterate once through the entire list
 354 * of entries in xps.
 355 */
 356void xprt_iter_init_listall(struct rpc_xprt_iter *xpi,
 357                struct rpc_xprt_switch *xps)
 358{
 359        __xprt_iter_init(xpi, xps, &rpc_xprt_iter_listall);
 360}
 361
 362/**
 363 * xprt_iter_xchg_switch - Atomically swap out the rpc_xprt_switch
 364 * @xpi: pointer to rpc_xprt_iter
 365 * @xps: pointer to a new rpc_xprt_switch or NULL
 366 *
 367 * Swaps out the existing xpi->xpi_xpswitch with a new value.
 368 */
 369struct rpc_xprt_switch *xprt_iter_xchg_switch(struct rpc_xprt_iter *xpi,
 370                struct rpc_xprt_switch *newswitch)
 371{
 372        struct rpc_xprt_switch __rcu *oldswitch;
 373
 374        /* Atomically swap out the old xpswitch */
 375        oldswitch = xchg(&xpi->xpi_xpswitch, RCU_INITIALIZER(newswitch));
 376        if (newswitch != NULL)
 377                xprt_iter_rewind(xpi);
 378        return rcu_dereference_protected(oldswitch, true);
 379}
 380
 381/**
 382 * xprt_iter_destroy - Destroys the xprt iterator
 383 * @xpi pointer to rpc_xprt_iter
 384 */
 385void xprt_iter_destroy(struct rpc_xprt_iter *xpi)
 386{
 387        xprt_switch_put(xprt_iter_xchg_switch(xpi, NULL));
 388}
 389
 390/**
 391 * xprt_iter_xprt - Returns the rpc_xprt pointed to by the cursor
 392 * @xpi: pointer to rpc_xprt_iter
 393 *
 394 * Returns a pointer to the struct rpc_xprt that is currently
 395 * pointed to by the cursor.
 396 * Caller must be holding rcu_read_lock().
 397 */
 398struct rpc_xprt *xprt_iter_xprt(struct rpc_xprt_iter *xpi)
 399{
 400        WARN_ON_ONCE(!rcu_read_lock_held());
 401        return xprt_iter_ops(xpi)->xpi_xprt(xpi);
 402}
 403
 404static
 405struct rpc_xprt *xprt_iter_get_helper(struct rpc_xprt_iter *xpi,
 406                struct rpc_xprt *(*fn)(struct rpc_xprt_iter *))
 407{
 408        struct rpc_xprt *ret;
 409
 410        do {
 411                ret = fn(xpi);
 412                if (ret == NULL)
 413                        break;
 414                ret = xprt_get(ret);
 415        } while (ret == NULL);
 416        return ret;
 417}
 418
 419/**
 420 * xprt_iter_get_xprt - Returns the rpc_xprt pointed to by the cursor
 421 * @xpi: pointer to rpc_xprt_iter
 422 *
 423 * Returns a reference to the struct rpc_xprt that is currently
 424 * pointed to by the cursor.
 425 */
 426struct rpc_xprt *xprt_iter_get_xprt(struct rpc_xprt_iter *xpi)
 427{
 428        struct rpc_xprt *xprt;
 429
 430        rcu_read_lock();
 431        xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_xprt);
 432        rcu_read_unlock();
 433        return xprt;
 434}
 435
 436/**
 437 * xprt_iter_get_next - Returns the next rpc_xprt following the cursor
 438 * @xpi: pointer to rpc_xprt_iter
 439 *
 440 * Returns a reference to the struct rpc_xprt that immediately follows the
 441 * entry pointed to by the cursor.
 442 */
 443struct rpc_xprt *xprt_iter_get_next(struct rpc_xprt_iter *xpi)
 444{
 445        struct rpc_xprt *xprt;
 446
 447        rcu_read_lock();
 448        xprt = xprt_iter_get_helper(xpi, xprt_iter_ops(xpi)->xpi_next);
 449        rcu_read_unlock();
 450        return xprt;
 451}
 452
 453/* Policy for always returning the first entry in the rpc_xprt_switch */
 454static
 455const struct rpc_xprt_iter_ops rpc_xprt_iter_singular = {
 456        .xpi_rewind = xprt_iter_no_rewind,
 457        .xpi_xprt = xprt_iter_first_entry,
 458        .xpi_next = xprt_iter_first_entry,
 459};
 460
 461/* Policy for round-robin iteration of entries in the rpc_xprt_switch */
 462static
 463const struct rpc_xprt_iter_ops rpc_xprt_iter_roundrobin = {
 464        .xpi_rewind = xprt_iter_default_rewind,
 465        .xpi_xprt = xprt_iter_current_entry,
 466        .xpi_next = xprt_iter_next_entry_roundrobin,
 467};
 468
 469/* Policy for once-through iteration of entries in the rpc_xprt_switch */
 470static
 471const struct rpc_xprt_iter_ops rpc_xprt_iter_listall = {
 472        .xpi_rewind = xprt_iter_default_rewind,
 473        .xpi_xprt = xprt_iter_current_entry,
 474        .xpi_next = xprt_iter_next_entry_all,
 475};
 476