linux/drivers/md/md-cluster.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3 * Copyright (C) 2015, SUSE
   4 */
   5
   6
   7#include <linux/module.h>
   8#include <linux/kthread.h>
   9#include <linux/dlm.h>
  10#include <linux/sched.h>
  11#include <linux/raid/md_p.h>
  12#include "md.h"
  13#include "md-bitmap.h"
  14#include "md-cluster.h"
  15
  16#define LVB_SIZE        64
  17#define NEW_DEV_TIMEOUT 5000
  18
  19struct dlm_lock_resource {
  20        dlm_lockspace_t *ls;
  21        struct dlm_lksb lksb;
  22        char *name; /* lock name. */
  23        uint32_t flags; /* flags to pass to dlm_lock() */
  24        wait_queue_head_t sync_locking; /* wait queue for synchronized locking */
  25        bool sync_locking_done;
  26        void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
  27        struct mddev *mddev; /* pointing back to mddev. */
  28        int mode;
  29};
  30
  31struct resync_info {
  32        __le64 lo;
  33        __le64 hi;
  34};
  35
  36/* md_cluster_info flags */
  37#define         MD_CLUSTER_WAITING_FOR_NEWDISK          1
  38#define         MD_CLUSTER_SUSPEND_READ_BALANCING       2
  39#define         MD_CLUSTER_BEGIN_JOIN_CLUSTER           3
  40
  41/* Lock the send communication. This is done through
  42 * bit manipulation as opposed to a mutex in order to
  43 * accomodate lock and hold. See next comment.
  44 */
  45#define         MD_CLUSTER_SEND_LOCK                    4
  46/* If cluster operations (such as adding a disk) must lock the
  47 * communication channel, so as to perform extra operations
  48 * (update metadata) and no other operation is allowed on the
  49 * MD. Token needs to be locked and held until the operation
  50 * completes witha md_update_sb(), which would eventually release
  51 * the lock.
  52 */
  53#define         MD_CLUSTER_SEND_LOCKED_ALREADY          5
  54/* We should receive message after node joined cluster and
  55 * set up all the related infos such as bitmap and personality */
  56#define         MD_CLUSTER_ALREADY_IN_CLUSTER           6
  57#define         MD_CLUSTER_PENDING_RECV_EVENT           7
  58#define         MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD              8
  59
  60struct md_cluster_info {
  61        struct mddev *mddev; /* the md device which md_cluster_info belongs to */
  62        /* dlm lock space and resources for clustered raid. */
  63        dlm_lockspace_t *lockspace;
  64        int slot_number;
  65        struct completion completion;
  66        struct mutex recv_mutex;
  67        struct dlm_lock_resource *bitmap_lockres;
  68        struct dlm_lock_resource **other_bitmap_lockres;
  69        struct dlm_lock_resource *resync_lockres;
  70        struct list_head suspend_list;
  71
  72        spinlock_t suspend_lock;
  73        /* record the region which write should be suspended */
  74        sector_t suspend_lo;
  75        sector_t suspend_hi;
  76        int suspend_from; /* the slot which broadcast suspend_lo/hi */
  77
  78        struct md_thread *recovery_thread;
  79        unsigned long recovery_map;
  80        /* communication loc resources */
  81        struct dlm_lock_resource *ack_lockres;
  82        struct dlm_lock_resource *message_lockres;
  83        struct dlm_lock_resource *token_lockres;
  84        struct dlm_lock_resource *no_new_dev_lockres;
  85        struct md_thread *recv_thread;
  86        struct completion newdisk_completion;
  87        wait_queue_head_t wait;
  88        unsigned long state;
  89        /* record the region in RESYNCING message */
  90        sector_t sync_low;
  91        sector_t sync_hi;
  92};
  93
  94enum msg_type {
  95        METADATA_UPDATED = 0,
  96        RESYNCING,
  97        NEWDISK,
  98        REMOVE,
  99        RE_ADD,
 100        BITMAP_NEEDS_SYNC,
 101        CHANGE_CAPACITY,
 102        BITMAP_RESIZE,
 103};
 104
 105struct cluster_msg {
 106        __le32 type;
 107        __le32 slot;
 108        /* TODO: Unionize this for smaller footprint */
 109        __le64 low;
 110        __le64 high;
 111        char uuid[16];
 112        __le32 raid_slot;
 113};
 114
 115static void sync_ast(void *arg)
 116{
 117        struct dlm_lock_resource *res;
 118
 119        res = arg;
 120        res->sync_locking_done = true;
 121        wake_up(&res->sync_locking);
 122}
 123
 124static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
 125{
 126        int ret = 0;
 127
 128        ret = dlm_lock(res->ls, mode, &res->lksb,
 129                        res->flags, res->name, strlen(res->name),
 130                        0, sync_ast, res, res->bast);
 131        if (ret)
 132                return ret;
 133        wait_event(res->sync_locking, res->sync_locking_done);
 134        res->sync_locking_done = false;
 135        if (res->lksb.sb_status == 0)
 136                res->mode = mode;
 137        return res->lksb.sb_status;
 138}
 139
 140static int dlm_unlock_sync(struct dlm_lock_resource *res)
 141{
 142        return dlm_lock_sync(res, DLM_LOCK_NL);
 143}
 144
 145/*
 146 * An variation of dlm_lock_sync, which make lock request could
 147 * be interrupted
 148 */
 149static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode,
 150                                       struct mddev *mddev)
 151{
 152        int ret = 0;
 153
 154        ret = dlm_lock(res->ls, mode, &res->lksb,
 155                        res->flags, res->name, strlen(res->name),
 156                        0, sync_ast, res, res->bast);
 157        if (ret)
 158                return ret;
 159
 160        wait_event(res->sync_locking, res->sync_locking_done
 161                                      || kthread_should_stop()
 162                                      || test_bit(MD_CLOSING, &mddev->flags));
 163        if (!res->sync_locking_done) {
 164                /*
 165                 * the convert queue contains the lock request when request is
 166                 * interrupted, and sync_ast could still be run, so need to
 167                 * cancel the request and reset completion
 168                 */
 169                ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL,
 170                        &res->lksb, res);
 171                res->sync_locking_done = false;
 172                if (unlikely(ret != 0))
 173                        pr_info("failed to cancel previous lock request "
 174                                 "%s return %d\n", res->name, ret);
 175                return -EPERM;
 176        } else
 177                res->sync_locking_done = false;
 178        if (res->lksb.sb_status == 0)
 179                res->mode = mode;
 180        return res->lksb.sb_status;
 181}
 182
 183static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
 184                char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
 185{
 186        struct dlm_lock_resource *res = NULL;
 187        int ret, namelen;
 188        struct md_cluster_info *cinfo = mddev->cluster_info;
 189
 190        res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
 191        if (!res)
 192                return NULL;
 193        init_waitqueue_head(&res->sync_locking);
 194        res->sync_locking_done = false;
 195        res->ls = cinfo->lockspace;
 196        res->mddev = mddev;
 197        res->mode = DLM_LOCK_IV;
 198        namelen = strlen(name);
 199        res->name = kzalloc(namelen + 1, GFP_KERNEL);
 200        if (!res->name) {
 201                pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
 202                goto out_err;
 203        }
 204        strlcpy(res->name, name, namelen + 1);
 205        if (with_lvb) {
 206                res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
 207                if (!res->lksb.sb_lvbptr) {
 208                        pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
 209                        goto out_err;
 210                }
 211                res->flags = DLM_LKF_VALBLK;
 212        }
 213
 214        if (bastfn)
 215                res->bast = bastfn;
 216
 217        res->flags |= DLM_LKF_EXPEDITE;
 218
 219        ret = dlm_lock_sync(res, DLM_LOCK_NL);
 220        if (ret) {
 221                pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
 222                goto out_err;
 223        }
 224        res->flags &= ~DLM_LKF_EXPEDITE;
 225        res->flags |= DLM_LKF_CONVERT;
 226
 227        return res;
 228out_err:
 229        kfree(res->lksb.sb_lvbptr);
 230        kfree(res->name);
 231        kfree(res);
 232        return NULL;
 233}
 234
 235static void lockres_free(struct dlm_lock_resource *res)
 236{
 237        int ret = 0;
 238
 239        if (!res)
 240                return;
 241
 242        /*
 243         * use FORCEUNLOCK flag, so we can unlock even the lock is on the
 244         * waiting or convert queue
 245         */
 246        ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK,
 247                &res->lksb, res);
 248        if (unlikely(ret != 0))
 249                pr_err("failed to unlock %s return %d\n", res->name, ret);
 250        else
 251                wait_event(res->sync_locking, res->sync_locking_done);
 252
 253        kfree(res->name);
 254        kfree(res->lksb.sb_lvbptr);
 255        kfree(res);
 256}
 257
 258static void add_resync_info(struct dlm_lock_resource *lockres,
 259                            sector_t lo, sector_t hi)
 260{
 261        struct resync_info *ri;
 262
 263        ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
 264        ri->lo = cpu_to_le64(lo);
 265        ri->hi = cpu_to_le64(hi);
 266}
 267
 268static int read_resync_info(struct mddev *mddev,
 269                            struct dlm_lock_resource *lockres)
 270{
 271        struct resync_info ri;
 272        struct md_cluster_info *cinfo = mddev->cluster_info;
 273        int ret = 0;
 274
 275        dlm_lock_sync(lockres, DLM_LOCK_CR);
 276        memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
 277        if (le64_to_cpu(ri.hi) > 0) {
 278                cinfo->suspend_hi = le64_to_cpu(ri.hi);
 279                cinfo->suspend_lo = le64_to_cpu(ri.lo);
 280                ret = 1;
 281        }
 282        dlm_unlock_sync(lockres);
 283        return ret;
 284}
 285
 286static void recover_bitmaps(struct md_thread *thread)
 287{
 288        struct mddev *mddev = thread->mddev;
 289        struct md_cluster_info *cinfo = mddev->cluster_info;
 290        struct dlm_lock_resource *bm_lockres;
 291        char str[64];
 292        int slot, ret;
 293        sector_t lo, hi;
 294
 295        while (cinfo->recovery_map) {
 296                slot = fls64((u64)cinfo->recovery_map) - 1;
 297
 298                snprintf(str, 64, "bitmap%04d", slot);
 299                bm_lockres = lockres_init(mddev, str, NULL, 1);
 300                if (!bm_lockres) {
 301                        pr_err("md-cluster: Cannot initialize bitmaps\n");
 302                        goto clear_bit;
 303                }
 304
 305                ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev);
 306                if (ret) {
 307                        pr_err("md-cluster: Could not DLM lock %s: %d\n",
 308                                        str, ret);
 309                        goto clear_bit;
 310                }
 311                ret = md_bitmap_copy_from_slot(mddev, slot, &lo, &hi, true);
 312                if (ret) {
 313                        pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
 314                        goto clear_bit;
 315                }
 316
 317                /* Clear suspend_area associated with the bitmap */
 318                spin_lock_irq(&cinfo->suspend_lock);
 319                cinfo->suspend_hi = 0;
 320                cinfo->suspend_lo = 0;
 321                cinfo->suspend_from = -1;
 322                spin_unlock_irq(&cinfo->suspend_lock);
 323
 324                /* Kick off a reshape if needed */
 325                if (test_bit(MD_RESYNCING_REMOTE, &mddev->recovery) &&
 326                    test_bit(MD_RECOVERY_RESHAPE, &mddev->recovery) &&
 327                    mddev->reshape_position != MaxSector)
 328                        md_wakeup_thread(mddev->sync_thread);
 329
 330                if (hi > 0) {
 331                        if (lo < mddev->recovery_cp)
 332                                mddev->recovery_cp = lo;
 333                        /* wake up thread to continue resync in case resync
 334                         * is not finished */
 335                        if (mddev->recovery_cp != MaxSector) {
 336                                /*
 337                                 * clear the REMOTE flag since we will launch
 338                                 * resync thread in current node.
 339                                 */
 340                                clear_bit(MD_RESYNCING_REMOTE,
 341                                          &mddev->recovery);
 342                                set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
 343                                md_wakeup_thread(mddev->thread);
 344                        }
 345                }
 346clear_bit:
 347                lockres_free(bm_lockres);
 348                clear_bit(slot, &cinfo->recovery_map);
 349        }
 350}
 351
 352static void recover_prep(void *arg)
 353{
 354        struct mddev *mddev = arg;
 355        struct md_cluster_info *cinfo = mddev->cluster_info;
 356        set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
 357}
 358
 359static void __recover_slot(struct mddev *mddev, int slot)
 360{
 361        struct md_cluster_info *cinfo = mddev->cluster_info;
 362
 363        set_bit(slot, &cinfo->recovery_map);
 364        if (!cinfo->recovery_thread) {
 365                cinfo->recovery_thread = md_register_thread(recover_bitmaps,
 366                                mddev, "recover");
 367                if (!cinfo->recovery_thread) {
 368                        pr_warn("md-cluster: Could not create recovery thread\n");
 369                        return;
 370                }
 371        }
 372        md_wakeup_thread(cinfo->recovery_thread);
 373}
 374
 375static void recover_slot(void *arg, struct dlm_slot *slot)
 376{
 377        struct mddev *mddev = arg;
 378        struct md_cluster_info *cinfo = mddev->cluster_info;
 379
 380        pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
 381                        mddev->bitmap_info.cluster_name,
 382                        slot->nodeid, slot->slot,
 383                        cinfo->slot_number);
 384        /* deduct one since dlm slot starts from one while the num of
 385         * cluster-md begins with 0 */
 386        __recover_slot(mddev, slot->slot - 1);
 387}
 388
 389static void recover_done(void *arg, struct dlm_slot *slots,
 390                int num_slots, int our_slot,
 391                uint32_t generation)
 392{
 393        struct mddev *mddev = arg;
 394        struct md_cluster_info *cinfo = mddev->cluster_info;
 395
 396        cinfo->slot_number = our_slot;
 397        /* completion is only need to be complete when node join cluster,
 398         * it doesn't need to run during another node's failure */
 399        if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
 400                complete(&cinfo->completion);
 401                clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
 402        }
 403        clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
 404}
 405
 406/* the ops is called when node join the cluster, and do lock recovery
 407 * if node failure occurs */
 408static const struct dlm_lockspace_ops md_ls_ops = {
 409        .recover_prep = recover_prep,
 410        .recover_slot = recover_slot,
 411        .recover_done = recover_done,
 412};
 413
 414/*
 415 * The BAST function for the ack lock resource
 416 * This function wakes up the receive thread in
 417 * order to receive and process the message.
 418 */
 419static void ack_bast(void *arg, int mode)
 420{
 421        struct dlm_lock_resource *res = arg;
 422        struct md_cluster_info *cinfo = res->mddev->cluster_info;
 423
 424        if (mode == DLM_LOCK_EX) {
 425                if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state))
 426                        md_wakeup_thread(cinfo->recv_thread);
 427                else
 428                        set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state);
 429        }
 430}
 431
 432static void remove_suspend_info(struct mddev *mddev, int slot)
 433{
 434        struct md_cluster_info *cinfo = mddev->cluster_info;
 435        mddev->pers->quiesce(mddev, 1);
 436        spin_lock_irq(&cinfo->suspend_lock);
 437        cinfo->suspend_hi = 0;
 438        cinfo->suspend_lo = 0;
 439        spin_unlock_irq(&cinfo->suspend_lock);
 440        mddev->pers->quiesce(mddev, 0);
 441}
 442
 443static void process_suspend_info(struct mddev *mddev,
 444                int slot, sector_t lo, sector_t hi)
 445{
 446        struct md_cluster_info *cinfo = mddev->cluster_info;
 447        struct mdp_superblock_1 *sb = NULL;
 448        struct md_rdev *rdev;
 449
 450        if (!hi) {
 451                /*
 452                 * clear the REMOTE flag since resync or recovery is finished
 453                 * in remote node.
 454                 */
 455                clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
 456                remove_suspend_info(mddev, slot);
 457                set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
 458                md_wakeup_thread(mddev->thread);
 459                return;
 460        }
 461
 462        rdev_for_each(rdev, mddev)
 463                if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
 464                        sb = page_address(rdev->sb_page);
 465                        break;
 466                }
 467
 468        /*
 469         * The bitmaps are not same for different nodes
 470         * if RESYNCING is happening in one node, then
 471         * the node which received the RESYNCING message
 472         * probably will perform resync with the region
 473         * [lo, hi] again, so we could reduce resync time
 474         * a lot if we can ensure that the bitmaps among
 475         * different nodes are match up well.
 476         *
 477         * sync_low/hi is used to record the region which
 478         * arrived in the previous RESYNCING message,
 479         *
 480         * Call md_bitmap_sync_with_cluster to clear NEEDED_MASK
 481         * and set RESYNC_MASK since  resync thread is running
 482         * in another node, so we don't need to do the resync
 483         * again with the same section.
 484         *
 485         * Skip md_bitmap_sync_with_cluster in case reshape
 486         * happening, because reshaping region is small and
 487         * we don't want to trigger lots of WARN.
 488         */
 489        if (sb && !(le32_to_cpu(sb->feature_map) & MD_FEATURE_RESHAPE_ACTIVE))
 490                md_bitmap_sync_with_cluster(mddev, cinfo->sync_low,
 491                                            cinfo->sync_hi, lo, hi);
 492        cinfo->sync_low = lo;
 493        cinfo->sync_hi = hi;
 494
 495        mddev->pers->quiesce(mddev, 1);
 496        spin_lock_irq(&cinfo->suspend_lock);
 497        cinfo->suspend_from = slot;
 498        cinfo->suspend_lo = lo;
 499        cinfo->suspend_hi = hi;
 500        spin_unlock_irq(&cinfo->suspend_lock);
 501        mddev->pers->quiesce(mddev, 0);
 502}
 503
 504static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
 505{
 506        char disk_uuid[64];
 507        struct md_cluster_info *cinfo = mddev->cluster_info;
 508        char event_name[] = "EVENT=ADD_DEVICE";
 509        char raid_slot[16];
 510        char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
 511        int len;
 512
 513        len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
 514        sprintf(disk_uuid + len, "%pU", cmsg->uuid);
 515        snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot));
 516        pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
 517        init_completion(&cinfo->newdisk_completion);
 518        set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
 519        kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
 520        wait_for_completion_timeout(&cinfo->newdisk_completion,
 521                        NEW_DEV_TIMEOUT);
 522        clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
 523}
 524
 525
 526static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
 527{
 528        int got_lock = 0;
 529        struct md_cluster_info *cinfo = mddev->cluster_info;
 530        mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
 531
 532        dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
 533        wait_event(mddev->thread->wqueue,
 534                   (got_lock = mddev_trylock(mddev)) ||
 535                    test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state));
 536        md_reload_sb(mddev, mddev->good_device_nr);
 537        if (got_lock)
 538                mddev_unlock(mddev);
 539}
 540
 541static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
 542{
 543        struct md_rdev *rdev;
 544
 545        rcu_read_lock();
 546        rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
 547        if (rdev) {
 548                set_bit(ClusterRemove, &rdev->flags);
 549                set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
 550                md_wakeup_thread(mddev->thread);
 551        }
 552        else
 553                pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
 554                        __func__, __LINE__, le32_to_cpu(msg->raid_slot));
 555        rcu_read_unlock();
 556}
 557
 558static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
 559{
 560        struct md_rdev *rdev;
 561
 562        rcu_read_lock();
 563        rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
 564        if (rdev && test_bit(Faulty, &rdev->flags))
 565                clear_bit(Faulty, &rdev->flags);
 566        else
 567                pr_warn("%s: %d Could not find disk(%d) which is faulty",
 568                        __func__, __LINE__, le32_to_cpu(msg->raid_slot));
 569        rcu_read_unlock();
 570}
 571
 572static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
 573{
 574        int ret = 0;
 575
 576        if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
 577                "node %d received it's own msg\n", le32_to_cpu(msg->slot)))
 578                return -1;
 579        switch (le32_to_cpu(msg->type)) {
 580        case METADATA_UPDATED:
 581                process_metadata_update(mddev, msg);
 582                break;
 583        case CHANGE_CAPACITY:
 584                set_capacity(mddev->gendisk, mddev->array_sectors);
 585                revalidate_disk(mddev->gendisk);
 586                break;
 587        case RESYNCING:
 588                set_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
 589                process_suspend_info(mddev, le32_to_cpu(msg->slot),
 590                                     le64_to_cpu(msg->low),
 591                                     le64_to_cpu(msg->high));
 592                break;
 593        case NEWDISK:
 594                process_add_new_disk(mddev, msg);
 595                break;
 596        case REMOVE:
 597                process_remove_disk(mddev, msg);
 598                break;
 599        case RE_ADD:
 600                process_readd_disk(mddev, msg);
 601                break;
 602        case BITMAP_NEEDS_SYNC:
 603                __recover_slot(mddev, le32_to_cpu(msg->slot));
 604                break;
 605        case BITMAP_RESIZE:
 606                if (le64_to_cpu(msg->high) != mddev->pers->size(mddev, 0, 0))
 607                        ret = md_bitmap_resize(mddev->bitmap,
 608                                            le64_to_cpu(msg->high), 0, 0);
 609                break;
 610        default:
 611                ret = -1;
 612                pr_warn("%s:%d Received unknown message from %d\n",
 613                        __func__, __LINE__, msg->slot);
 614        }
 615        return ret;
 616}
 617
 618/*
 619 * thread for receiving message
 620 */
 621static void recv_daemon(struct md_thread *thread)
 622{
 623        struct md_cluster_info *cinfo = thread->mddev->cluster_info;
 624        struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
 625        struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
 626        struct cluster_msg msg;
 627        int ret;
 628
 629        mutex_lock(&cinfo->recv_mutex);
 630        /*get CR on Message*/
 631        if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
 632                pr_err("md/raid1:failed to get CR on MESSAGE\n");
 633                mutex_unlock(&cinfo->recv_mutex);
 634                return;
 635        }
 636
 637        /* read lvb and wake up thread to process this message_lockres */
 638        memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
 639        ret = process_recvd_msg(thread->mddev, &msg);
 640        if (ret)
 641                goto out;
 642
 643        /*release CR on ack_lockres*/
 644        ret = dlm_unlock_sync(ack_lockres);
 645        if (unlikely(ret != 0))
 646                pr_info("unlock ack failed return %d\n", ret);
 647        /*up-convert to PR on message_lockres*/
 648        ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
 649        if (unlikely(ret != 0))
 650                pr_info("lock PR on msg failed return %d\n", ret);
 651        /*get CR on ack_lockres again*/
 652        ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
 653        if (unlikely(ret != 0))
 654                pr_info("lock CR on ack failed return %d\n", ret);
 655out:
 656        /*release CR on message_lockres*/
 657        ret = dlm_unlock_sync(message_lockres);
 658        if (unlikely(ret != 0))
 659                pr_info("unlock msg failed return %d\n", ret);
 660        mutex_unlock(&cinfo->recv_mutex);
 661}
 662
 663/* lock_token()
 664 * Takes the lock on the TOKEN lock resource so no other
 665 * node can communicate while the operation is underway.
 666 */
 667static int lock_token(struct md_cluster_info *cinfo, bool mddev_locked)
 668{
 669        int error, set_bit = 0;
 670        struct mddev *mddev = cinfo->mddev;
 671
 672        /*
 673         * If resync thread run after raid1d thread, then process_metadata_update
 674         * could not continue if raid1d held reconfig_mutex (and raid1d is blocked
 675         * since another node already got EX on Token and waitting the EX of Ack),
 676         * so let resync wake up thread in case flag is set.
 677         */
 678        if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
 679                                      &cinfo->state)) {
 680                error = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
 681                                              &cinfo->state);
 682                WARN_ON_ONCE(error);
 683                md_wakeup_thread(mddev->thread);
 684                set_bit = 1;
 685        }
 686        error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
 687        if (set_bit)
 688                clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
 689
 690        if (error)
 691                pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
 692                                __func__, __LINE__, error);
 693
 694        /* Lock the receive sequence */
 695        mutex_lock(&cinfo->recv_mutex);
 696        return error;
 697}
 698
 699/* lock_comm()
 700 * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
 701 */
 702static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked)
 703{
 704        wait_event(cinfo->wait,
 705                   !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
 706
 707        return lock_token(cinfo, mddev_locked);
 708}
 709
 710static void unlock_comm(struct md_cluster_info *cinfo)
 711{
 712        WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
 713        mutex_unlock(&cinfo->recv_mutex);
 714        dlm_unlock_sync(cinfo->token_lockres);
 715        clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
 716        wake_up(&cinfo->wait);
 717}
 718
 719/* __sendmsg()
 720 * This function performs the actual sending of the message. This function is
 721 * usually called after performing the encompassing operation
 722 * The function:
 723 * 1. Grabs the message lockresource in EX mode
 724 * 2. Copies the message to the message LVB
 725 * 3. Downconverts message lockresource to CW
 726 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
 727 *    and the other nodes read the message. The thread will wait here until all other
 728 *    nodes have released ack lock resource.
 729 * 5. Downconvert ack lockresource to CR
 730 */
 731static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
 732{
 733        int error;
 734        int slot = cinfo->slot_number - 1;
 735
 736        cmsg->slot = cpu_to_le32(slot);
 737        /*get EX on Message*/
 738        error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
 739        if (error) {
 740                pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
 741                goto failed_message;
 742        }
 743
 744        memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
 745                        sizeof(struct cluster_msg));
 746        /*down-convert EX to CW on Message*/
 747        error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
 748        if (error) {
 749                pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
 750                                error);
 751                goto failed_ack;
 752        }
 753
 754        /*up-convert CR to EX on Ack*/
 755        error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
 756        if (error) {
 757                pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
 758                                error);
 759                goto failed_ack;
 760        }
 761
 762        /*down-convert EX to CR on Ack*/
 763        error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
 764        if (error) {
 765                pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
 766                                error);
 767                goto failed_ack;
 768        }
 769
 770failed_ack:
 771        error = dlm_unlock_sync(cinfo->message_lockres);
 772        if (unlikely(error != 0)) {
 773                pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
 774                        error);
 775                /* in case the message can't be released due to some reason */
 776                goto failed_ack;
 777        }
 778failed_message:
 779        return error;
 780}
 781
 782static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg,
 783                   bool mddev_locked)
 784{
 785        int ret;
 786
 787        lock_comm(cinfo, mddev_locked);
 788        ret = __sendmsg(cinfo, cmsg);
 789        unlock_comm(cinfo);
 790        return ret;
 791}
 792
 793static int gather_all_resync_info(struct mddev *mddev, int total_slots)
 794{
 795        struct md_cluster_info *cinfo = mddev->cluster_info;
 796        int i, ret = 0;
 797        struct dlm_lock_resource *bm_lockres;
 798        char str[64];
 799        sector_t lo, hi;
 800
 801
 802        for (i = 0; i < total_slots; i++) {
 803                memset(str, '\0', 64);
 804                snprintf(str, 64, "bitmap%04d", i);
 805                bm_lockres = lockres_init(mddev, str, NULL, 1);
 806                if (!bm_lockres)
 807                        return -ENOMEM;
 808                if (i == (cinfo->slot_number - 1)) {
 809                        lockres_free(bm_lockres);
 810                        continue;
 811                }
 812
 813                bm_lockres->flags |= DLM_LKF_NOQUEUE;
 814                ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
 815                if (ret == -EAGAIN) {
 816                        if (read_resync_info(mddev, bm_lockres)) {
 817                                pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
 818                                                __func__, __LINE__,
 819                                        (unsigned long long) cinfo->suspend_lo,
 820                                        (unsigned long long) cinfo->suspend_hi,
 821                                        i);
 822                                cinfo->suspend_from = i;
 823                        }
 824                        ret = 0;
 825                        lockres_free(bm_lockres);
 826                        continue;
 827                }
 828                if (ret) {
 829                        lockres_free(bm_lockres);
 830                        goto out;
 831                }
 832
 833                /* Read the disk bitmap sb and check if it needs recovery */
 834                ret = md_bitmap_copy_from_slot(mddev, i, &lo, &hi, false);
 835                if (ret) {
 836                        pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
 837                        lockres_free(bm_lockres);
 838                        continue;
 839                }
 840                if ((hi > 0) && (lo < mddev->recovery_cp)) {
 841                        set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
 842                        mddev->recovery_cp = lo;
 843                        md_check_recovery(mddev);
 844                }
 845
 846                lockres_free(bm_lockres);
 847        }
 848out:
 849        return ret;
 850}
 851
 852static int join(struct mddev *mddev, int nodes)
 853{
 854        struct md_cluster_info *cinfo;
 855        int ret, ops_rv;
 856        char str[64];
 857
 858        cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
 859        if (!cinfo)
 860                return -ENOMEM;
 861
 862        INIT_LIST_HEAD(&cinfo->suspend_list);
 863        spin_lock_init(&cinfo->suspend_lock);
 864        init_completion(&cinfo->completion);
 865        set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
 866        init_waitqueue_head(&cinfo->wait);
 867        mutex_init(&cinfo->recv_mutex);
 868
 869        mddev->cluster_info = cinfo;
 870        cinfo->mddev = mddev;
 871
 872        memset(str, 0, 64);
 873        sprintf(str, "%pU", mddev->uuid);
 874        ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
 875                                DLM_LSFL_FS, LVB_SIZE,
 876                                &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
 877        if (ret)
 878                goto err;
 879        wait_for_completion(&cinfo->completion);
 880        if (nodes < cinfo->slot_number) {
 881                pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
 882                        cinfo->slot_number, nodes);
 883                ret = -ERANGE;
 884                goto err;
 885        }
 886        /* Initiate the communication resources */
 887        ret = -ENOMEM;
 888        cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
 889        if (!cinfo->recv_thread) {
 890                pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
 891                goto err;
 892        }
 893        cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
 894        if (!cinfo->message_lockres)
 895                goto err;
 896        cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
 897        if (!cinfo->token_lockres)
 898                goto err;
 899        cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
 900        if (!cinfo->no_new_dev_lockres)
 901                goto err;
 902
 903        ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
 904        if (ret) {
 905                ret = -EAGAIN;
 906                pr_err("md-cluster: can't join cluster to avoid lock issue\n");
 907                goto err;
 908        }
 909        cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
 910        if (!cinfo->ack_lockres) {
 911                ret = -ENOMEM;
 912                goto err;
 913        }
 914        /* get sync CR lock on ACK. */
 915        if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
 916                pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
 917                                ret);
 918        dlm_unlock_sync(cinfo->token_lockres);
 919        /* get sync CR lock on no-new-dev. */
 920        if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
 921                pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
 922
 923
 924        pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
 925        snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
 926        cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
 927        if (!cinfo->bitmap_lockres) {
 928                ret = -ENOMEM;
 929                goto err;
 930        }
 931        if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
 932                pr_err("Failed to get bitmap lock\n");
 933                ret = -EINVAL;
 934                goto err;
 935        }
 936
 937        cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
 938        if (!cinfo->resync_lockres) {
 939                ret = -ENOMEM;
 940                goto err;
 941        }
 942
 943        return 0;
 944err:
 945        set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
 946        md_unregister_thread(&cinfo->recovery_thread);
 947        md_unregister_thread(&cinfo->recv_thread);
 948        lockres_free(cinfo->message_lockres);
 949        lockres_free(cinfo->token_lockres);
 950        lockres_free(cinfo->ack_lockres);
 951        lockres_free(cinfo->no_new_dev_lockres);
 952        lockres_free(cinfo->resync_lockres);
 953        lockres_free(cinfo->bitmap_lockres);
 954        if (cinfo->lockspace)
 955                dlm_release_lockspace(cinfo->lockspace, 2);
 956        mddev->cluster_info = NULL;
 957        kfree(cinfo);
 958        return ret;
 959}
 960
 961static void load_bitmaps(struct mddev *mddev, int total_slots)
 962{
 963        struct md_cluster_info *cinfo = mddev->cluster_info;
 964
 965        /* load all the node's bitmap info for resync */
 966        if (gather_all_resync_info(mddev, total_slots))
 967                pr_err("md-cluster: failed to gather all resyn infos\n");
 968        set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state);
 969        /* wake up recv thread in case something need to be handled */
 970        if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state))
 971                md_wakeup_thread(cinfo->recv_thread);
 972}
 973
 974static void resync_bitmap(struct mddev *mddev)
 975{
 976        struct md_cluster_info *cinfo = mddev->cluster_info;
 977        struct cluster_msg cmsg = {0};
 978        int err;
 979
 980        cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
 981        err = sendmsg(cinfo, &cmsg, 1);
 982        if (err)
 983                pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
 984                        __func__, __LINE__, err);
 985}
 986
 987static void unlock_all_bitmaps(struct mddev *mddev);
 988static int leave(struct mddev *mddev)
 989{
 990        struct md_cluster_info *cinfo = mddev->cluster_info;
 991
 992        if (!cinfo)
 993                return 0;
 994
 995        /*
 996         * BITMAP_NEEDS_SYNC message should be sent when node
 997         * is leaving the cluster with dirty bitmap, also we
 998         * can only deliver it when dlm connection is available.
 999         *
1000         * Also, we should send BITMAP_NEEDS_SYNC message in
1001         * case reshaping is interrupted.
1002         */
1003        if ((cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector) ||
1004            (mddev->reshape_position != MaxSector &&
1005             test_bit(MD_CLOSING, &mddev->flags)))
1006                resync_bitmap(mddev);
1007
1008        set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1009        md_unregister_thread(&cinfo->recovery_thread);
1010        md_unregister_thread(&cinfo->recv_thread);
1011        lockres_free(cinfo->message_lockres);
1012        lockres_free(cinfo->token_lockres);
1013        lockres_free(cinfo->ack_lockres);
1014        lockres_free(cinfo->no_new_dev_lockres);
1015        lockres_free(cinfo->resync_lockres);
1016        lockres_free(cinfo->bitmap_lockres);
1017        unlock_all_bitmaps(mddev);
1018        dlm_release_lockspace(cinfo->lockspace, 2);
1019        kfree(cinfo);
1020        return 0;
1021}
1022
1023/* slot_number(): Returns the MD slot number to use
1024 * DLM starts the slot numbers from 1, wheras cluster-md
1025 * wants the number to be from zero, so we deduct one
1026 */
1027static int slot_number(struct mddev *mddev)
1028{
1029        struct md_cluster_info *cinfo = mddev->cluster_info;
1030
1031        return cinfo->slot_number - 1;
1032}
1033
1034/*
1035 * Check if the communication is already locked, else lock the communication
1036 * channel.
1037 * If it is already locked, token is in EX mode, and hence lock_token()
1038 * should not be called.
1039 */
1040static int metadata_update_start(struct mddev *mddev)
1041{
1042        struct md_cluster_info *cinfo = mddev->cluster_info;
1043        int ret;
1044
1045        /*
1046         * metadata_update_start is always called with the protection of
1047         * reconfig_mutex, so set WAITING_FOR_TOKEN here.
1048         */
1049        ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
1050                                    &cinfo->state);
1051        WARN_ON_ONCE(ret);
1052        md_wakeup_thread(mddev->thread);
1053
1054        wait_event(cinfo->wait,
1055                   !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
1056                   test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
1057
1058        /* If token is already locked, return 0 */
1059        if (cinfo->token_lockres->mode == DLM_LOCK_EX) {
1060                clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1061                return 0;
1062        }
1063
1064        ret = lock_token(cinfo, 1);
1065        clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1066        return ret;
1067}
1068
1069static int metadata_update_finish(struct mddev *mddev)
1070{
1071        struct md_cluster_info *cinfo = mddev->cluster_info;
1072        struct cluster_msg cmsg;
1073        struct md_rdev *rdev;
1074        int ret = 0;
1075        int raid_slot = -1;
1076
1077        memset(&cmsg, 0, sizeof(cmsg));
1078        cmsg.type = cpu_to_le32(METADATA_UPDATED);
1079        /* Pick up a good active device number to send.
1080         */
1081        rdev_for_each(rdev, mddev)
1082                if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
1083                        raid_slot = rdev->desc_nr;
1084                        break;
1085                }
1086        if (raid_slot >= 0) {
1087                cmsg.raid_slot = cpu_to_le32(raid_slot);
1088                ret = __sendmsg(cinfo, &cmsg);
1089        } else
1090                pr_warn("md-cluster: No good device id found to send\n");
1091        clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1092        unlock_comm(cinfo);
1093        return ret;
1094}
1095
1096static void metadata_update_cancel(struct mddev *mddev)
1097{
1098        struct md_cluster_info *cinfo = mddev->cluster_info;
1099        clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1100        unlock_comm(cinfo);
1101}
1102
1103static int update_bitmap_size(struct mddev *mddev, sector_t size)
1104{
1105        struct md_cluster_info *cinfo = mddev->cluster_info;
1106        struct cluster_msg cmsg = {0};
1107        int ret;
1108
1109        cmsg.type = cpu_to_le32(BITMAP_RESIZE);
1110        cmsg.high = cpu_to_le64(size);
1111        ret = sendmsg(cinfo, &cmsg, 0);
1112        if (ret)
1113                pr_err("%s:%d: failed to send BITMAP_RESIZE message (%d)\n",
1114                        __func__, __LINE__, ret);
1115        return ret;
1116}
1117
1118static int resize_bitmaps(struct mddev *mddev, sector_t newsize, sector_t oldsize)
1119{
1120        struct bitmap_counts *counts;
1121        char str[64];
1122        struct dlm_lock_resource *bm_lockres;
1123        struct bitmap *bitmap = mddev->bitmap;
1124        unsigned long my_pages = bitmap->counts.pages;
1125        int i, rv;
1126
1127        /*
1128         * We need to ensure all the nodes can grow to a larger
1129         * bitmap size before make the reshaping.
1130         */
1131        rv = update_bitmap_size(mddev, newsize);
1132        if (rv)
1133                return rv;
1134
1135        for (i = 0; i < mddev->bitmap_info.nodes; i++) {
1136                if (i == md_cluster_ops->slot_number(mddev))
1137                        continue;
1138
1139                bitmap = get_bitmap_from_slot(mddev, i);
1140                if (IS_ERR(bitmap)) {
1141                        pr_err("can't get bitmap from slot %d\n", i);
1142                        goto out;
1143                }
1144                counts = &bitmap->counts;
1145
1146                /*
1147                 * If we can hold the bitmap lock of one node then
1148                 * the slot is not occupied, update the pages.
1149                 */
1150                snprintf(str, 64, "bitmap%04d", i);
1151                bm_lockres = lockres_init(mddev, str, NULL, 1);
1152                if (!bm_lockres) {
1153                        pr_err("Cannot initialize %s lock\n", str);
1154                        goto out;
1155                }
1156                bm_lockres->flags |= DLM_LKF_NOQUEUE;
1157                rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
1158                if (!rv)
1159                        counts->pages = my_pages;
1160                lockres_free(bm_lockres);
1161
1162                if (my_pages != counts->pages)
1163                        /*
1164                         * Let's revert the bitmap size if one node
1165                         * can't resize bitmap
1166                         */
1167                        goto out;
1168        }
1169
1170        return 0;
1171out:
1172        md_bitmap_free(bitmap);
1173        update_bitmap_size(mddev, oldsize);
1174        return -1;
1175}
1176
1177/*
1178 * return 0 if all the bitmaps have the same sync_size
1179 */
1180static int cluster_check_sync_size(struct mddev *mddev)
1181{
1182        int i, rv;
1183        bitmap_super_t *sb;
1184        unsigned long my_sync_size, sync_size = 0;
1185        int node_num = mddev->bitmap_info.nodes;
1186        int current_slot = md_cluster_ops->slot_number(mddev);
1187        struct bitmap *bitmap = mddev->bitmap;
1188        char str[64];
1189        struct dlm_lock_resource *bm_lockres;
1190
1191        sb = kmap_atomic(bitmap->storage.sb_page);
1192        my_sync_size = sb->sync_size;
1193        kunmap_atomic(sb);
1194
1195        for (i = 0; i < node_num; i++) {
1196                if (i == current_slot)
1197                        continue;
1198
1199                bitmap = get_bitmap_from_slot(mddev, i);
1200                if (IS_ERR(bitmap)) {
1201                        pr_err("can't get bitmap from slot %d\n", i);
1202                        return -1;
1203                }
1204
1205                /*
1206                 * If we can hold the bitmap lock of one node then
1207                 * the slot is not occupied, update the sb.
1208                 */
1209                snprintf(str, 64, "bitmap%04d", i);
1210                bm_lockres = lockres_init(mddev, str, NULL, 1);
1211                if (!bm_lockres) {
1212                        pr_err("md-cluster: Cannot initialize %s\n", str);
1213                        md_bitmap_free(bitmap);
1214                        return -1;
1215                }
1216                bm_lockres->flags |= DLM_LKF_NOQUEUE;
1217                rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
1218                if (!rv)
1219                        md_bitmap_update_sb(bitmap);
1220                lockres_free(bm_lockres);
1221
1222                sb = kmap_atomic(bitmap->storage.sb_page);
1223                if (sync_size == 0)
1224                        sync_size = sb->sync_size;
1225                else if (sync_size != sb->sync_size) {
1226                        kunmap_atomic(sb);
1227                        md_bitmap_free(bitmap);
1228                        return -1;
1229                }
1230                kunmap_atomic(sb);
1231                md_bitmap_free(bitmap);
1232        }
1233
1234        return (my_sync_size == sync_size) ? 0 : -1;
1235}
1236
1237/*
1238 * Update the size for cluster raid is a little more complex, we perform it
1239 * by the steps:
1240 * 1. hold token lock and update superblock in initiator node.
1241 * 2. send METADATA_UPDATED msg to other nodes.
1242 * 3. The initiator node continues to check each bitmap's sync_size, if all
1243 *    bitmaps have the same value of sync_size, then we can set capacity and
1244 *    let other nodes to perform it. If one node can't update sync_size
1245 *    accordingly, we need to revert to previous value.
1246 */
1247static void update_size(struct mddev *mddev, sector_t old_dev_sectors)
1248{
1249        struct md_cluster_info *cinfo = mddev->cluster_info;
1250        struct cluster_msg cmsg;
1251        struct md_rdev *rdev;
1252        int ret = 0;
1253        int raid_slot = -1;
1254
1255        md_update_sb(mddev, 1);
1256        lock_comm(cinfo, 1);
1257
1258        memset(&cmsg, 0, sizeof(cmsg));
1259        cmsg.type = cpu_to_le32(METADATA_UPDATED);
1260        rdev_for_each(rdev, mddev)
1261                if (rdev->raid_disk >= 0 && !test_bit(Faulty, &rdev->flags)) {
1262                        raid_slot = rdev->desc_nr;
1263                        break;
1264                }
1265        if (raid_slot >= 0) {
1266                cmsg.raid_slot = cpu_to_le32(raid_slot);
1267                /*
1268                 * We can only change capiticy after all the nodes can do it,
1269                 * so need to wait after other nodes already received the msg
1270                 * and handled the change
1271                 */
1272                ret = __sendmsg(cinfo, &cmsg);
1273                if (ret) {
1274                        pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
1275                               __func__, __LINE__);
1276                        unlock_comm(cinfo);
1277                        return;
1278                }
1279        } else {
1280                pr_err("md-cluster: No good device id found to send\n");
1281                unlock_comm(cinfo);
1282                return;
1283        }
1284
1285        /*
1286         * check the sync_size from other node's bitmap, if sync_size
1287         * have already updated in other nodes as expected, send an
1288         * empty metadata msg to permit the change of capacity
1289         */
1290        if (cluster_check_sync_size(mddev) == 0) {
1291                memset(&cmsg, 0, sizeof(cmsg));
1292                cmsg.type = cpu_to_le32(CHANGE_CAPACITY);
1293                ret = __sendmsg(cinfo, &cmsg);
1294                if (ret)
1295                        pr_err("%s:%d: failed to send CHANGE_CAPACITY msg\n",
1296                               __func__, __LINE__);
1297                set_capacity(mddev->gendisk, mddev->array_sectors);
1298                revalidate_disk(mddev->gendisk);
1299        } else {
1300                /* revert to previous sectors */
1301                ret = mddev->pers->resize(mddev, old_dev_sectors);
1302                if (!ret)
1303                        revalidate_disk(mddev->gendisk);
1304                ret = __sendmsg(cinfo, &cmsg);
1305                if (ret)
1306                        pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
1307                               __func__, __LINE__);
1308        }
1309        unlock_comm(cinfo);
1310}
1311
1312static int resync_start(struct mddev *mddev)
1313{
1314        struct md_cluster_info *cinfo = mddev->cluster_info;
1315        return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev);
1316}
1317
1318static void resync_info_get(struct mddev *mddev, sector_t *lo, sector_t *hi)
1319{
1320        struct md_cluster_info *cinfo = mddev->cluster_info;
1321
1322        spin_lock_irq(&cinfo->suspend_lock);
1323        *lo = cinfo->suspend_lo;
1324        *hi = cinfo->suspend_hi;
1325        spin_unlock_irq(&cinfo->suspend_lock);
1326}
1327
1328static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
1329{
1330        struct md_cluster_info *cinfo = mddev->cluster_info;
1331        struct resync_info ri;
1332        struct cluster_msg cmsg = {0};
1333
1334        /* do not send zero again, if we have sent before */
1335        if (hi == 0) {
1336                memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
1337                if (le64_to_cpu(ri.hi) == 0)
1338                        return 0;
1339        }
1340
1341        add_resync_info(cinfo->bitmap_lockres, lo, hi);
1342        /* Re-acquire the lock to refresh LVB */
1343        dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
1344        cmsg.type = cpu_to_le32(RESYNCING);
1345        cmsg.low = cpu_to_le64(lo);
1346        cmsg.high = cpu_to_le64(hi);
1347
1348        /*
1349         * mddev_lock is held if resync_info_update is called from
1350         * resync_finish (md_reap_sync_thread -> resync_finish)
1351         */
1352        if (lo == 0 && hi == 0)
1353                return sendmsg(cinfo, &cmsg, 1);
1354        else
1355                return sendmsg(cinfo, &cmsg, 0);
1356}
1357
1358static int resync_finish(struct mddev *mddev)
1359{
1360        struct md_cluster_info *cinfo = mddev->cluster_info;
1361        int ret = 0;
1362
1363        clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
1364
1365        /*
1366         * If resync thread is interrupted so we can't say resync is finished,
1367         * another node will launch resync thread to continue.
1368         */
1369        if (!test_bit(MD_CLOSING, &mddev->flags))
1370                ret = resync_info_update(mddev, 0, 0);
1371        dlm_unlock_sync(cinfo->resync_lockres);
1372        return ret;
1373}
1374
1375static int area_resyncing(struct mddev *mddev, int direction,
1376                sector_t lo, sector_t hi)
1377{
1378        struct md_cluster_info *cinfo = mddev->cluster_info;
1379        int ret = 0;
1380
1381        if ((direction == READ) &&
1382                test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
1383                return 1;
1384
1385        spin_lock_irq(&cinfo->suspend_lock);
1386        if (hi > cinfo->suspend_lo && lo < cinfo->suspend_hi)
1387                ret = 1;
1388        spin_unlock_irq(&cinfo->suspend_lock);
1389        return ret;
1390}
1391
1392/* add_new_disk() - initiates a disk add
1393 * However, if this fails before writing md_update_sb(),
1394 * add_new_disk_cancel() must be called to release token lock
1395 */
1396static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
1397{
1398        struct md_cluster_info *cinfo = mddev->cluster_info;
1399        struct cluster_msg cmsg;
1400        int ret = 0;
1401        struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
1402        char *uuid = sb->device_uuid;
1403
1404        memset(&cmsg, 0, sizeof(cmsg));
1405        cmsg.type = cpu_to_le32(NEWDISK);
1406        memcpy(cmsg.uuid, uuid, 16);
1407        cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1408        lock_comm(cinfo, 1);
1409        ret = __sendmsg(cinfo, &cmsg);
1410        if (ret) {
1411                unlock_comm(cinfo);
1412                return ret;
1413        }
1414        cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
1415        ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
1416        cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
1417        /* Some node does not "see" the device */
1418        if (ret == -EAGAIN)
1419                ret = -ENOENT;
1420        if (ret)
1421                unlock_comm(cinfo);
1422        else {
1423                dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
1424                /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which
1425                 * will run soon after add_new_disk, the below path will be
1426                 * invoked:
1427                 *   md_wakeup_thread(mddev->thread)
1428                 *      -> conf->thread (raid1d)
1429                 *      -> md_check_recovery -> md_update_sb
1430                 *      -> metadata_update_start/finish
1431                 * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually.
1432                 *
1433                 * For other failure cases, metadata_update_cancel and
1434                 * add_new_disk_cancel also clear below bit as well.
1435                 * */
1436                set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1437                wake_up(&cinfo->wait);
1438        }
1439        return ret;
1440}
1441
1442static void add_new_disk_cancel(struct mddev *mddev)
1443{
1444        struct md_cluster_info *cinfo = mddev->cluster_info;
1445        clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1446        unlock_comm(cinfo);
1447}
1448
1449static int new_disk_ack(struct mddev *mddev, bool ack)
1450{
1451        struct md_cluster_info *cinfo = mddev->cluster_info;
1452
1453        if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
1454                pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
1455                return -EINVAL;
1456        }
1457
1458        if (ack)
1459                dlm_unlock_sync(cinfo->no_new_dev_lockres);
1460        complete(&cinfo->newdisk_completion);
1461        return 0;
1462}
1463
1464static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
1465{
1466        struct cluster_msg cmsg = {0};
1467        struct md_cluster_info *cinfo = mddev->cluster_info;
1468        cmsg.type = cpu_to_le32(REMOVE);
1469        cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1470        return sendmsg(cinfo, &cmsg, 1);
1471}
1472
1473static int lock_all_bitmaps(struct mddev *mddev)
1474{
1475        int slot, my_slot, ret, held = 1, i = 0;
1476        char str[64];
1477        struct md_cluster_info *cinfo = mddev->cluster_info;
1478
1479        cinfo->other_bitmap_lockres =
1480                kcalloc(mddev->bitmap_info.nodes - 1,
1481                        sizeof(struct dlm_lock_resource *), GFP_KERNEL);
1482        if (!cinfo->other_bitmap_lockres) {
1483                pr_err("md: can't alloc mem for other bitmap locks\n");
1484                return 0;
1485        }
1486
1487        my_slot = slot_number(mddev);
1488        for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
1489                if (slot == my_slot)
1490                        continue;
1491
1492                memset(str, '\0', 64);
1493                snprintf(str, 64, "bitmap%04d", slot);
1494                cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
1495                if (!cinfo->other_bitmap_lockres[i])
1496                        return -ENOMEM;
1497
1498                cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
1499                ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
1500                if (ret)
1501                        held = -1;
1502                i++;
1503        }
1504
1505        return held;
1506}
1507
1508static void unlock_all_bitmaps(struct mddev *mddev)
1509{
1510        struct md_cluster_info *cinfo = mddev->cluster_info;
1511        int i;
1512
1513        /* release other node's bitmap lock if they are existed */
1514        if (cinfo->other_bitmap_lockres) {
1515                for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
1516                        if (cinfo->other_bitmap_lockres[i]) {
1517                                lockres_free(cinfo->other_bitmap_lockres[i]);
1518                        }
1519                }
1520                kfree(cinfo->other_bitmap_lockres);
1521        }
1522}
1523
1524static int gather_bitmaps(struct md_rdev *rdev)
1525{
1526        int sn, err;
1527        sector_t lo, hi;
1528        struct cluster_msg cmsg = {0};
1529        struct mddev *mddev = rdev->mddev;
1530        struct md_cluster_info *cinfo = mddev->cluster_info;
1531
1532        cmsg.type = cpu_to_le32(RE_ADD);
1533        cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1534        err = sendmsg(cinfo, &cmsg, 1);
1535        if (err)
1536                goto out;
1537
1538        for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
1539                if (sn == (cinfo->slot_number - 1))
1540                        continue;
1541                err = md_bitmap_copy_from_slot(mddev, sn, &lo, &hi, false);
1542                if (err) {
1543                        pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
1544                        goto out;
1545                }
1546                if ((hi > 0) && (lo < mddev->recovery_cp))
1547                        mddev->recovery_cp = lo;
1548        }
1549out:
1550        return err;
1551}
1552
1553static struct md_cluster_operations cluster_ops = {
1554        .join   = join,
1555        .leave  = leave,
1556        .slot_number = slot_number,
1557        .resync_start = resync_start,
1558        .resync_finish = resync_finish,
1559        .resync_info_update = resync_info_update,
1560        .resync_info_get = resync_info_get,
1561        .metadata_update_start = metadata_update_start,
1562        .metadata_update_finish = metadata_update_finish,
1563        .metadata_update_cancel = metadata_update_cancel,
1564        .area_resyncing = area_resyncing,
1565        .add_new_disk = add_new_disk,
1566        .add_new_disk_cancel = add_new_disk_cancel,
1567        .new_disk_ack = new_disk_ack,
1568        .remove_disk = remove_disk,
1569        .load_bitmaps = load_bitmaps,
1570        .gather_bitmaps = gather_bitmaps,
1571        .resize_bitmaps = resize_bitmaps,
1572        .lock_all_bitmaps = lock_all_bitmaps,
1573        .unlock_all_bitmaps = unlock_all_bitmaps,
1574        .update_size = update_size,
1575};
1576
1577static int __init cluster_init(void)
1578{
1579        pr_warn("md-cluster: support raid1 and raid10 (limited support)\n");
1580        pr_info("Registering Cluster MD functions\n");
1581        register_md_cluster_operations(&cluster_ops, THIS_MODULE);
1582        return 0;
1583}
1584
1585static void cluster_exit(void)
1586{
1587        unregister_md_cluster_operations();
1588}
1589
1590module_init(cluster_init);
1591module_exit(cluster_exit);
1592MODULE_AUTHOR("SUSE");
1593MODULE_LICENSE("GPL");
1594MODULE_DESCRIPTION("Clustering support for MD");
1595