linux/drivers/md/md-cluster.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3 * Copyright (C) 2015, SUSE
   4 */
   5
   6
   7#include <linux/module.h>
   8#include <linux/kthread.h>
   9#include <linux/dlm.h>
  10#include <linux/sched.h>
  11#include <linux/raid/md_p.h>
  12#include "md.h"
  13#include "md-bitmap.h"
  14#include "md-cluster.h"
  15
  16#define LVB_SIZE        64
  17#define NEW_DEV_TIMEOUT 5000
  18
  19struct dlm_lock_resource {
  20        dlm_lockspace_t *ls;
  21        struct dlm_lksb lksb;
  22        char *name; /* lock name. */
  23        uint32_t flags; /* flags to pass to dlm_lock() */
  24        wait_queue_head_t sync_locking; /* wait queue for synchronized locking */
  25        bool sync_locking_done;
  26        void (*bast)(void *arg, int mode); /* blocking AST function pointer*/
  27        struct mddev *mddev; /* pointing back to mddev. */
  28        int mode;
  29};
  30
  31struct resync_info {
  32        __le64 lo;
  33        __le64 hi;
  34};
  35
  36/* md_cluster_info flags */
  37#define         MD_CLUSTER_WAITING_FOR_NEWDISK          1
  38#define         MD_CLUSTER_SUSPEND_READ_BALANCING       2
  39#define         MD_CLUSTER_BEGIN_JOIN_CLUSTER           3
  40
  41/* Lock the send communication. This is done through
  42 * bit manipulation as opposed to a mutex in order to
  43 * accomodate lock and hold. See next comment.
  44 */
  45#define         MD_CLUSTER_SEND_LOCK                    4
  46/* If cluster operations (such as adding a disk) must lock the
  47 * communication channel, so as to perform extra operations
  48 * (update metadata) and no other operation is allowed on the
  49 * MD. Token needs to be locked and held until the operation
  50 * completes witha md_update_sb(), which would eventually release
  51 * the lock.
  52 */
  53#define         MD_CLUSTER_SEND_LOCKED_ALREADY          5
  54/* We should receive message after node joined cluster and
  55 * set up all the related infos such as bitmap and personality */
  56#define         MD_CLUSTER_ALREADY_IN_CLUSTER           6
  57#define         MD_CLUSTER_PENDING_RECV_EVENT           7
  58#define         MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD              8
  59
  60struct md_cluster_info {
  61        struct mddev *mddev; /* the md device which md_cluster_info belongs to */
  62        /* dlm lock space and resources for clustered raid. */
  63        dlm_lockspace_t *lockspace;
  64        int slot_number;
  65        struct completion completion;
  66        struct mutex recv_mutex;
  67        struct dlm_lock_resource *bitmap_lockres;
  68        struct dlm_lock_resource **other_bitmap_lockres;
  69        struct dlm_lock_resource *resync_lockres;
  70        struct list_head suspend_list;
  71
  72        spinlock_t suspend_lock;
  73        /* record the region which write should be suspended */
  74        sector_t suspend_lo;
  75        sector_t suspend_hi;
  76        int suspend_from; /* the slot which broadcast suspend_lo/hi */
  77
  78        struct md_thread *recovery_thread;
  79        unsigned long recovery_map;
  80        /* communication loc resources */
  81        struct dlm_lock_resource *ack_lockres;
  82        struct dlm_lock_resource *message_lockres;
  83        struct dlm_lock_resource *token_lockres;
  84        struct dlm_lock_resource *no_new_dev_lockres;
  85        struct md_thread *recv_thread;
  86        struct completion newdisk_completion;
  87        wait_queue_head_t wait;
  88        unsigned long state;
  89        /* record the region in RESYNCING message */
  90        sector_t sync_low;
  91        sector_t sync_hi;
  92};
  93
  94enum msg_type {
  95        METADATA_UPDATED = 0,
  96        RESYNCING,
  97        NEWDISK,
  98        REMOVE,
  99        RE_ADD,
 100        BITMAP_NEEDS_SYNC,
 101        CHANGE_CAPACITY,
 102        BITMAP_RESIZE,
 103};
 104
 105struct cluster_msg {
 106        __le32 type;
 107        __le32 slot;
 108        /* TODO: Unionize this for smaller footprint */
 109        __le64 low;
 110        __le64 high;
 111        char uuid[16];
 112        __le32 raid_slot;
 113};
 114
 115static void sync_ast(void *arg)
 116{
 117        struct dlm_lock_resource *res;
 118
 119        res = arg;
 120        res->sync_locking_done = true;
 121        wake_up(&res->sync_locking);
 122}
 123
 124static int dlm_lock_sync(struct dlm_lock_resource *res, int mode)
 125{
 126        int ret = 0;
 127
 128        ret = dlm_lock(res->ls, mode, &res->lksb,
 129                        res->flags, res->name, strlen(res->name),
 130                        0, sync_ast, res, res->bast);
 131        if (ret)
 132                return ret;
 133        wait_event(res->sync_locking, res->sync_locking_done);
 134        res->sync_locking_done = false;
 135        if (res->lksb.sb_status == 0)
 136                res->mode = mode;
 137        return res->lksb.sb_status;
 138}
 139
 140static int dlm_unlock_sync(struct dlm_lock_resource *res)
 141{
 142        return dlm_lock_sync(res, DLM_LOCK_NL);
 143}
 144
 145/*
 146 * An variation of dlm_lock_sync, which make lock request could
 147 * be interrupted
 148 */
 149static int dlm_lock_sync_interruptible(struct dlm_lock_resource *res, int mode,
 150                                       struct mddev *mddev)
 151{
 152        int ret = 0;
 153
 154        ret = dlm_lock(res->ls, mode, &res->lksb,
 155                        res->flags, res->name, strlen(res->name),
 156                        0, sync_ast, res, res->bast);
 157        if (ret)
 158                return ret;
 159
 160        wait_event(res->sync_locking, res->sync_locking_done
 161                                      || kthread_should_stop()
 162                                      || test_bit(MD_CLOSING, &mddev->flags));
 163        if (!res->sync_locking_done) {
 164                /*
 165                 * the convert queue contains the lock request when request is
 166                 * interrupted, and sync_ast could still be run, so need to
 167                 * cancel the request and reset completion
 168                 */
 169                ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_CANCEL,
 170                        &res->lksb, res);
 171                res->sync_locking_done = false;
 172                if (unlikely(ret != 0))
 173                        pr_info("failed to cancel previous lock request "
 174                                 "%s return %d\n", res->name, ret);
 175                return -EPERM;
 176        } else
 177                res->sync_locking_done = false;
 178        if (res->lksb.sb_status == 0)
 179                res->mode = mode;
 180        return res->lksb.sb_status;
 181}
 182
 183static struct dlm_lock_resource *lockres_init(struct mddev *mddev,
 184                char *name, void (*bastfn)(void *arg, int mode), int with_lvb)
 185{
 186        struct dlm_lock_resource *res = NULL;
 187        int ret, namelen;
 188        struct md_cluster_info *cinfo = mddev->cluster_info;
 189
 190        res = kzalloc(sizeof(struct dlm_lock_resource), GFP_KERNEL);
 191        if (!res)
 192                return NULL;
 193        init_waitqueue_head(&res->sync_locking);
 194        res->sync_locking_done = false;
 195        res->ls = cinfo->lockspace;
 196        res->mddev = mddev;
 197        res->mode = DLM_LOCK_IV;
 198        namelen = strlen(name);
 199        res->name = kzalloc(namelen + 1, GFP_KERNEL);
 200        if (!res->name) {
 201                pr_err("md-cluster: Unable to allocate resource name for resource %s\n", name);
 202                goto out_err;
 203        }
 204        strlcpy(res->name, name, namelen + 1);
 205        if (with_lvb) {
 206                res->lksb.sb_lvbptr = kzalloc(LVB_SIZE, GFP_KERNEL);
 207                if (!res->lksb.sb_lvbptr) {
 208                        pr_err("md-cluster: Unable to allocate LVB for resource %s\n", name);
 209                        goto out_err;
 210                }
 211                res->flags = DLM_LKF_VALBLK;
 212        }
 213
 214        if (bastfn)
 215                res->bast = bastfn;
 216
 217        res->flags |= DLM_LKF_EXPEDITE;
 218
 219        ret = dlm_lock_sync(res, DLM_LOCK_NL);
 220        if (ret) {
 221                pr_err("md-cluster: Unable to lock NL on new lock resource %s\n", name);
 222                goto out_err;
 223        }
 224        res->flags &= ~DLM_LKF_EXPEDITE;
 225        res->flags |= DLM_LKF_CONVERT;
 226
 227        return res;
 228out_err:
 229        kfree(res->lksb.sb_lvbptr);
 230        kfree(res->name);
 231        kfree(res);
 232        return NULL;
 233}
 234
 235static void lockres_free(struct dlm_lock_resource *res)
 236{
 237        int ret = 0;
 238
 239        if (!res)
 240                return;
 241
 242        /*
 243         * use FORCEUNLOCK flag, so we can unlock even the lock is on the
 244         * waiting or convert queue
 245         */
 246        ret = dlm_unlock(res->ls, res->lksb.sb_lkid, DLM_LKF_FORCEUNLOCK,
 247                &res->lksb, res);
 248        if (unlikely(ret != 0))
 249                pr_err("failed to unlock %s return %d\n", res->name, ret);
 250        else
 251                wait_event(res->sync_locking, res->sync_locking_done);
 252
 253        kfree(res->name);
 254        kfree(res->lksb.sb_lvbptr);
 255        kfree(res);
 256}
 257
 258static void add_resync_info(struct dlm_lock_resource *lockres,
 259                            sector_t lo, sector_t hi)
 260{
 261        struct resync_info *ri;
 262
 263        ri = (struct resync_info *)lockres->lksb.sb_lvbptr;
 264        ri->lo = cpu_to_le64(lo);
 265        ri->hi = cpu_to_le64(hi);
 266}
 267
 268static int read_resync_info(struct mddev *mddev,
 269                            struct dlm_lock_resource *lockres)
 270{
 271        struct resync_info ri;
 272        struct md_cluster_info *cinfo = mddev->cluster_info;
 273        int ret = 0;
 274
 275        dlm_lock_sync(lockres, DLM_LOCK_CR);
 276        memcpy(&ri, lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
 277        if (le64_to_cpu(ri.hi) > 0) {
 278                cinfo->suspend_hi = le64_to_cpu(ri.hi);
 279                cinfo->suspend_lo = le64_to_cpu(ri.lo);
 280                ret = 1;
 281        }
 282        dlm_unlock_sync(lockres);
 283        return ret;
 284}
 285
 286static void recover_bitmaps(struct md_thread *thread)
 287{
 288        struct mddev *mddev = thread->mddev;
 289        struct md_cluster_info *cinfo = mddev->cluster_info;
 290        struct dlm_lock_resource *bm_lockres;
 291        char str[64];
 292        int slot, ret;
 293        sector_t lo, hi;
 294
 295        while (cinfo->recovery_map) {
 296                slot = fls64((u64)cinfo->recovery_map) - 1;
 297
 298                snprintf(str, 64, "bitmap%04d", slot);
 299                bm_lockres = lockres_init(mddev, str, NULL, 1);
 300                if (!bm_lockres) {
 301                        pr_err("md-cluster: Cannot initialize bitmaps\n");
 302                        goto clear_bit;
 303                }
 304
 305                ret = dlm_lock_sync_interruptible(bm_lockres, DLM_LOCK_PW, mddev);
 306                if (ret) {
 307                        pr_err("md-cluster: Could not DLM lock %s: %d\n",
 308                                        str, ret);
 309                        goto clear_bit;
 310                }
 311                ret = md_bitmap_copy_from_slot(mddev, slot, &lo, &hi, true);
 312                if (ret) {
 313                        pr_err("md-cluster: Could not copy data from bitmap %d\n", slot);
 314                        goto clear_bit;
 315                }
 316
 317                /* Clear suspend_area associated with the bitmap */
 318                spin_lock_irq(&cinfo->suspend_lock);
 319                cinfo->suspend_hi = 0;
 320                cinfo->suspend_lo = 0;
 321                cinfo->suspend_from = -1;
 322                spin_unlock_irq(&cinfo->suspend_lock);
 323
 324                /* Kick off a reshape if needed */
 325                if (test_bit(MD_RESYNCING_REMOTE, &mddev->recovery) &&
 326                    test_bit(MD_RECOVERY_RESHAPE, &mddev->recovery) &&
 327                    mddev->reshape_position != MaxSector)
 328                        md_wakeup_thread(mddev->sync_thread);
 329
 330                if (hi > 0) {
 331                        if (lo < mddev->recovery_cp)
 332                                mddev->recovery_cp = lo;
 333                        /* wake up thread to continue resync in case resync
 334                         * is not finished */
 335                        if (mddev->recovery_cp != MaxSector) {
 336                                /*
 337                                 * clear the REMOTE flag since we will launch
 338                                 * resync thread in current node.
 339                                 */
 340                                clear_bit(MD_RESYNCING_REMOTE,
 341                                          &mddev->recovery);
 342                                set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
 343                                md_wakeup_thread(mddev->thread);
 344                        }
 345                }
 346clear_bit:
 347                lockres_free(bm_lockres);
 348                clear_bit(slot, &cinfo->recovery_map);
 349        }
 350}
 351
 352static void recover_prep(void *arg)
 353{
 354        struct mddev *mddev = arg;
 355        struct md_cluster_info *cinfo = mddev->cluster_info;
 356        set_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
 357}
 358
 359static void __recover_slot(struct mddev *mddev, int slot)
 360{
 361        struct md_cluster_info *cinfo = mddev->cluster_info;
 362
 363        set_bit(slot, &cinfo->recovery_map);
 364        if (!cinfo->recovery_thread) {
 365                cinfo->recovery_thread = md_register_thread(recover_bitmaps,
 366                                mddev, "recover");
 367                if (!cinfo->recovery_thread) {
 368                        pr_warn("md-cluster: Could not create recovery thread\n");
 369                        return;
 370                }
 371        }
 372        md_wakeup_thread(cinfo->recovery_thread);
 373}
 374
 375static void recover_slot(void *arg, struct dlm_slot *slot)
 376{
 377        struct mddev *mddev = arg;
 378        struct md_cluster_info *cinfo = mddev->cluster_info;
 379
 380        pr_info("md-cluster: %s Node %d/%d down. My slot: %d. Initiating recovery.\n",
 381                        mddev->bitmap_info.cluster_name,
 382                        slot->nodeid, slot->slot,
 383                        cinfo->slot_number);
 384        /* deduct one since dlm slot starts from one while the num of
 385         * cluster-md begins with 0 */
 386        __recover_slot(mddev, slot->slot - 1);
 387}
 388
 389static void recover_done(void *arg, struct dlm_slot *slots,
 390                int num_slots, int our_slot,
 391                uint32_t generation)
 392{
 393        struct mddev *mddev = arg;
 394        struct md_cluster_info *cinfo = mddev->cluster_info;
 395
 396        cinfo->slot_number = our_slot;
 397        /* completion is only need to be complete when node join cluster,
 398         * it doesn't need to run during another node's failure */
 399        if (test_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state)) {
 400                complete(&cinfo->completion);
 401                clear_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
 402        }
 403        clear_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state);
 404}
 405
 406/* the ops is called when node join the cluster, and do lock recovery
 407 * if node failure occurs */
 408static const struct dlm_lockspace_ops md_ls_ops = {
 409        .recover_prep = recover_prep,
 410        .recover_slot = recover_slot,
 411        .recover_done = recover_done,
 412};
 413
 414/*
 415 * The BAST function for the ack lock resource
 416 * This function wakes up the receive thread in
 417 * order to receive and process the message.
 418 */
 419static void ack_bast(void *arg, int mode)
 420{
 421        struct dlm_lock_resource *res = arg;
 422        struct md_cluster_info *cinfo = res->mddev->cluster_info;
 423
 424        if (mode == DLM_LOCK_EX) {
 425                if (test_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state))
 426                        md_wakeup_thread(cinfo->recv_thread);
 427                else
 428                        set_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state);
 429        }
 430}
 431
 432static void remove_suspend_info(struct mddev *mddev, int slot)
 433{
 434        struct md_cluster_info *cinfo = mddev->cluster_info;
 435        mddev->pers->quiesce(mddev, 1);
 436        spin_lock_irq(&cinfo->suspend_lock);
 437        cinfo->suspend_hi = 0;
 438        cinfo->suspend_lo = 0;
 439        spin_unlock_irq(&cinfo->suspend_lock);
 440        mddev->pers->quiesce(mddev, 0);
 441}
 442
 443static void process_suspend_info(struct mddev *mddev,
 444                int slot, sector_t lo, sector_t hi)
 445{
 446        struct md_cluster_info *cinfo = mddev->cluster_info;
 447        struct mdp_superblock_1 *sb = NULL;
 448        struct md_rdev *rdev;
 449
 450        if (!hi) {
 451                /*
 452                 * clear the REMOTE flag since resync or recovery is finished
 453                 * in remote node.
 454                 */
 455                clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
 456                remove_suspend_info(mddev, slot);
 457                set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
 458                md_wakeup_thread(mddev->thread);
 459                return;
 460        }
 461
 462        rdev_for_each(rdev, mddev)
 463                if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
 464                        sb = page_address(rdev->sb_page);
 465                        break;
 466                }
 467
 468        /*
 469         * The bitmaps are not same for different nodes
 470         * if RESYNCING is happening in one node, then
 471         * the node which received the RESYNCING message
 472         * probably will perform resync with the region
 473         * [lo, hi] again, so we could reduce resync time
 474         * a lot if we can ensure that the bitmaps among
 475         * different nodes are match up well.
 476         *
 477         * sync_low/hi is used to record the region which
 478         * arrived in the previous RESYNCING message,
 479         *
 480         * Call md_bitmap_sync_with_cluster to clear NEEDED_MASK
 481         * and set RESYNC_MASK since  resync thread is running
 482         * in another node, so we don't need to do the resync
 483         * again with the same section.
 484         *
 485         * Skip md_bitmap_sync_with_cluster in case reshape
 486         * happening, because reshaping region is small and
 487         * we don't want to trigger lots of WARN.
 488         */
 489        if (sb && !(le32_to_cpu(sb->feature_map) & MD_FEATURE_RESHAPE_ACTIVE))
 490                md_bitmap_sync_with_cluster(mddev, cinfo->sync_low,
 491                                            cinfo->sync_hi, lo, hi);
 492        cinfo->sync_low = lo;
 493        cinfo->sync_hi = hi;
 494
 495        mddev->pers->quiesce(mddev, 1);
 496        spin_lock_irq(&cinfo->suspend_lock);
 497        cinfo->suspend_from = slot;
 498        cinfo->suspend_lo = lo;
 499        cinfo->suspend_hi = hi;
 500        spin_unlock_irq(&cinfo->suspend_lock);
 501        mddev->pers->quiesce(mddev, 0);
 502}
 503
 504static void process_add_new_disk(struct mddev *mddev, struct cluster_msg *cmsg)
 505{
 506        char disk_uuid[64];
 507        struct md_cluster_info *cinfo = mddev->cluster_info;
 508        char event_name[] = "EVENT=ADD_DEVICE";
 509        char raid_slot[16];
 510        char *envp[] = {event_name, disk_uuid, raid_slot, NULL};
 511        int len;
 512
 513        len = snprintf(disk_uuid, 64, "DEVICE_UUID=");
 514        sprintf(disk_uuid + len, "%pU", cmsg->uuid);
 515        snprintf(raid_slot, 16, "RAID_DISK=%d", le32_to_cpu(cmsg->raid_slot));
 516        pr_info("%s:%d Sending kobject change with %s and %s\n", __func__, __LINE__, disk_uuid, raid_slot);
 517        init_completion(&cinfo->newdisk_completion);
 518        set_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
 519        kobject_uevent_env(&disk_to_dev(mddev->gendisk)->kobj, KOBJ_CHANGE, envp);
 520        wait_for_completion_timeout(&cinfo->newdisk_completion,
 521                        NEW_DEV_TIMEOUT);
 522        clear_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state);
 523}
 524
 525
 526static void process_metadata_update(struct mddev *mddev, struct cluster_msg *msg)
 527{
 528        int got_lock = 0;
 529        struct md_cluster_info *cinfo = mddev->cluster_info;
 530        mddev->good_device_nr = le32_to_cpu(msg->raid_slot);
 531
 532        dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
 533        wait_event(mddev->thread->wqueue,
 534                   (got_lock = mddev_trylock(mddev)) ||
 535                    test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state));
 536        md_reload_sb(mddev, mddev->good_device_nr);
 537        if (got_lock)
 538                mddev_unlock(mddev);
 539}
 540
 541static void process_remove_disk(struct mddev *mddev, struct cluster_msg *msg)
 542{
 543        struct md_rdev *rdev;
 544
 545        rcu_read_lock();
 546        rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
 547        if (rdev) {
 548                set_bit(ClusterRemove, &rdev->flags);
 549                set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
 550                md_wakeup_thread(mddev->thread);
 551        }
 552        else
 553                pr_warn("%s: %d Could not find disk(%d) to REMOVE\n",
 554                        __func__, __LINE__, le32_to_cpu(msg->raid_slot));
 555        rcu_read_unlock();
 556}
 557
 558static void process_readd_disk(struct mddev *mddev, struct cluster_msg *msg)
 559{
 560        struct md_rdev *rdev;
 561
 562        rcu_read_lock();
 563        rdev = md_find_rdev_nr_rcu(mddev, le32_to_cpu(msg->raid_slot));
 564        if (rdev && test_bit(Faulty, &rdev->flags))
 565                clear_bit(Faulty, &rdev->flags);
 566        else
 567                pr_warn("%s: %d Could not find disk(%d) which is faulty",
 568                        __func__, __LINE__, le32_to_cpu(msg->raid_slot));
 569        rcu_read_unlock();
 570}
 571
 572static int process_recvd_msg(struct mddev *mddev, struct cluster_msg *msg)
 573{
 574        int ret = 0;
 575
 576        if (WARN(mddev->cluster_info->slot_number - 1 == le32_to_cpu(msg->slot),
 577                "node %d received it's own msg\n", le32_to_cpu(msg->slot)))
 578                return -1;
 579        switch (le32_to_cpu(msg->type)) {
 580        case METADATA_UPDATED:
 581                process_metadata_update(mddev, msg);
 582                break;
 583        case CHANGE_CAPACITY:
 584                set_capacity_and_notify(mddev->gendisk, mddev->array_sectors);
 585                break;
 586        case RESYNCING:
 587                set_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
 588                process_suspend_info(mddev, le32_to_cpu(msg->slot),
 589                                     le64_to_cpu(msg->low),
 590                                     le64_to_cpu(msg->high));
 591                break;
 592        case NEWDISK:
 593                process_add_new_disk(mddev, msg);
 594                break;
 595        case REMOVE:
 596                process_remove_disk(mddev, msg);
 597                break;
 598        case RE_ADD:
 599                process_readd_disk(mddev, msg);
 600                break;
 601        case BITMAP_NEEDS_SYNC:
 602                __recover_slot(mddev, le32_to_cpu(msg->slot));
 603                break;
 604        case BITMAP_RESIZE:
 605                if (le64_to_cpu(msg->high) != mddev->pers->size(mddev, 0, 0))
 606                        ret = md_bitmap_resize(mddev->bitmap,
 607                                            le64_to_cpu(msg->high), 0, 0);
 608                break;
 609        default:
 610                ret = -1;
 611                pr_warn("%s:%d Received unknown message from %d\n",
 612                        __func__, __LINE__, msg->slot);
 613        }
 614        return ret;
 615}
 616
 617/*
 618 * thread for receiving message
 619 */
 620static void recv_daemon(struct md_thread *thread)
 621{
 622        struct md_cluster_info *cinfo = thread->mddev->cluster_info;
 623        struct dlm_lock_resource *ack_lockres = cinfo->ack_lockres;
 624        struct dlm_lock_resource *message_lockres = cinfo->message_lockres;
 625        struct cluster_msg msg;
 626        int ret;
 627
 628        mutex_lock(&cinfo->recv_mutex);
 629        /*get CR on Message*/
 630        if (dlm_lock_sync(message_lockres, DLM_LOCK_CR)) {
 631                pr_err("md/raid1:failed to get CR on MESSAGE\n");
 632                mutex_unlock(&cinfo->recv_mutex);
 633                return;
 634        }
 635
 636        /* read lvb and wake up thread to process this message_lockres */
 637        memcpy(&msg, message_lockres->lksb.sb_lvbptr, sizeof(struct cluster_msg));
 638        ret = process_recvd_msg(thread->mddev, &msg);
 639        if (ret)
 640                goto out;
 641
 642        /*release CR on ack_lockres*/
 643        ret = dlm_unlock_sync(ack_lockres);
 644        if (unlikely(ret != 0))
 645                pr_info("unlock ack failed return %d\n", ret);
 646        /*up-convert to PR on message_lockres*/
 647        ret = dlm_lock_sync(message_lockres, DLM_LOCK_PR);
 648        if (unlikely(ret != 0))
 649                pr_info("lock PR on msg failed return %d\n", ret);
 650        /*get CR on ack_lockres again*/
 651        ret = dlm_lock_sync(ack_lockres, DLM_LOCK_CR);
 652        if (unlikely(ret != 0))
 653                pr_info("lock CR on ack failed return %d\n", ret);
 654out:
 655        /*release CR on message_lockres*/
 656        ret = dlm_unlock_sync(message_lockres);
 657        if (unlikely(ret != 0))
 658                pr_info("unlock msg failed return %d\n", ret);
 659        mutex_unlock(&cinfo->recv_mutex);
 660}
 661
 662/* lock_token()
 663 * Takes the lock on the TOKEN lock resource so no other
 664 * node can communicate while the operation is underway.
 665 */
 666static int lock_token(struct md_cluster_info *cinfo)
 667{
 668        int error;
 669
 670        error = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
 671        if (error) {
 672                pr_err("md-cluster(%s:%d): failed to get EX on TOKEN (%d)\n",
 673                                __func__, __LINE__, error);
 674        } else {
 675                /* Lock the receive sequence */
 676                mutex_lock(&cinfo->recv_mutex);
 677        }
 678        return error;
 679}
 680
 681/* lock_comm()
 682 * Sets the MD_CLUSTER_SEND_LOCK bit to lock the send channel.
 683 */
 684static int lock_comm(struct md_cluster_info *cinfo, bool mddev_locked)
 685{
 686        int rv, set_bit = 0;
 687        struct mddev *mddev = cinfo->mddev;
 688
 689        /*
 690         * If resync thread run after raid1d thread, then process_metadata_update
 691         * could not continue if raid1d held reconfig_mutex (and raid1d is blocked
 692         * since another node already got EX on Token and waitting the EX of Ack),
 693         * so let resync wake up thread in case flag is set.
 694         */
 695        if (mddev_locked && !test_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
 696                                      &cinfo->state)) {
 697                rv = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
 698                                              &cinfo->state);
 699                WARN_ON_ONCE(rv);
 700                md_wakeup_thread(mddev->thread);
 701                set_bit = 1;
 702        }
 703
 704        wait_event(cinfo->wait,
 705                   !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state));
 706        rv = lock_token(cinfo);
 707        if (set_bit)
 708                clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
 709        return rv;
 710}
 711
 712static void unlock_comm(struct md_cluster_info *cinfo)
 713{
 714        WARN_ON(cinfo->token_lockres->mode != DLM_LOCK_EX);
 715        mutex_unlock(&cinfo->recv_mutex);
 716        dlm_unlock_sync(cinfo->token_lockres);
 717        clear_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state);
 718        wake_up(&cinfo->wait);
 719}
 720
 721/* __sendmsg()
 722 * This function performs the actual sending of the message. This function is
 723 * usually called after performing the encompassing operation
 724 * The function:
 725 * 1. Grabs the message lockresource in EX mode
 726 * 2. Copies the message to the message LVB
 727 * 3. Downconverts message lockresource to CW
 728 * 4. Upconverts ack lock resource from CR to EX. This forces the BAST on other nodes
 729 *    and the other nodes read the message. The thread will wait here until all other
 730 *    nodes have released ack lock resource.
 731 * 5. Downconvert ack lockresource to CR
 732 */
 733static int __sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg)
 734{
 735        int error;
 736        int slot = cinfo->slot_number - 1;
 737
 738        cmsg->slot = cpu_to_le32(slot);
 739        /*get EX on Message*/
 740        error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_EX);
 741        if (error) {
 742                pr_err("md-cluster: failed to get EX on MESSAGE (%d)\n", error);
 743                goto failed_message;
 744        }
 745
 746        memcpy(cinfo->message_lockres->lksb.sb_lvbptr, (void *)cmsg,
 747                        sizeof(struct cluster_msg));
 748        /*down-convert EX to CW on Message*/
 749        error = dlm_lock_sync(cinfo->message_lockres, DLM_LOCK_CW);
 750        if (error) {
 751                pr_err("md-cluster: failed to convert EX to CW on MESSAGE(%d)\n",
 752                                error);
 753                goto failed_ack;
 754        }
 755
 756        /*up-convert CR to EX on Ack*/
 757        error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_EX);
 758        if (error) {
 759                pr_err("md-cluster: failed to convert CR to EX on ACK(%d)\n",
 760                                error);
 761                goto failed_ack;
 762        }
 763
 764        /*down-convert EX to CR on Ack*/
 765        error = dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR);
 766        if (error) {
 767                pr_err("md-cluster: failed to convert EX to CR on ACK(%d)\n",
 768                                error);
 769                goto failed_ack;
 770        }
 771
 772failed_ack:
 773        error = dlm_unlock_sync(cinfo->message_lockres);
 774        if (unlikely(error != 0)) {
 775                pr_err("md-cluster: failed convert to NL on MESSAGE(%d)\n",
 776                        error);
 777                /* in case the message can't be released due to some reason */
 778                goto failed_ack;
 779        }
 780failed_message:
 781        return error;
 782}
 783
 784static int sendmsg(struct md_cluster_info *cinfo, struct cluster_msg *cmsg,
 785                   bool mddev_locked)
 786{
 787        int ret;
 788
 789        ret = lock_comm(cinfo, mddev_locked);
 790        if (!ret) {
 791                ret = __sendmsg(cinfo, cmsg);
 792                unlock_comm(cinfo);
 793        }
 794        return ret;
 795}
 796
 797static int gather_all_resync_info(struct mddev *mddev, int total_slots)
 798{
 799        struct md_cluster_info *cinfo = mddev->cluster_info;
 800        int i, ret = 0;
 801        struct dlm_lock_resource *bm_lockres;
 802        char str[64];
 803        sector_t lo, hi;
 804
 805
 806        for (i = 0; i < total_slots; i++) {
 807                memset(str, '\0', 64);
 808                snprintf(str, 64, "bitmap%04d", i);
 809                bm_lockres = lockres_init(mddev, str, NULL, 1);
 810                if (!bm_lockres)
 811                        return -ENOMEM;
 812                if (i == (cinfo->slot_number - 1)) {
 813                        lockres_free(bm_lockres);
 814                        continue;
 815                }
 816
 817                bm_lockres->flags |= DLM_LKF_NOQUEUE;
 818                ret = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
 819                if (ret == -EAGAIN) {
 820                        if (read_resync_info(mddev, bm_lockres)) {
 821                                pr_info("%s:%d Resync[%llu..%llu] in progress on %d\n",
 822                                                __func__, __LINE__,
 823                                        (unsigned long long) cinfo->suspend_lo,
 824                                        (unsigned long long) cinfo->suspend_hi,
 825                                        i);
 826                                cinfo->suspend_from = i;
 827                        }
 828                        ret = 0;
 829                        lockres_free(bm_lockres);
 830                        continue;
 831                }
 832                if (ret) {
 833                        lockres_free(bm_lockres);
 834                        goto out;
 835                }
 836
 837                /* Read the disk bitmap sb and check if it needs recovery */
 838                ret = md_bitmap_copy_from_slot(mddev, i, &lo, &hi, false);
 839                if (ret) {
 840                        pr_warn("md-cluster: Could not gather bitmaps from slot %d", i);
 841                        lockres_free(bm_lockres);
 842                        continue;
 843                }
 844                if ((hi > 0) && (lo < mddev->recovery_cp)) {
 845                        set_bit(MD_RECOVERY_NEEDED, &mddev->recovery);
 846                        mddev->recovery_cp = lo;
 847                        md_check_recovery(mddev);
 848                }
 849
 850                lockres_free(bm_lockres);
 851        }
 852out:
 853        return ret;
 854}
 855
 856static int join(struct mddev *mddev, int nodes)
 857{
 858        struct md_cluster_info *cinfo;
 859        int ret, ops_rv;
 860        char str[64];
 861
 862        cinfo = kzalloc(sizeof(struct md_cluster_info), GFP_KERNEL);
 863        if (!cinfo)
 864                return -ENOMEM;
 865
 866        INIT_LIST_HEAD(&cinfo->suspend_list);
 867        spin_lock_init(&cinfo->suspend_lock);
 868        init_completion(&cinfo->completion);
 869        set_bit(MD_CLUSTER_BEGIN_JOIN_CLUSTER, &cinfo->state);
 870        init_waitqueue_head(&cinfo->wait);
 871        mutex_init(&cinfo->recv_mutex);
 872
 873        mddev->cluster_info = cinfo;
 874        cinfo->mddev = mddev;
 875
 876        memset(str, 0, 64);
 877        sprintf(str, "%pU", mddev->uuid);
 878        ret = dlm_new_lockspace(str, mddev->bitmap_info.cluster_name,
 879                                DLM_LSFL_FS, LVB_SIZE,
 880                                &md_ls_ops, mddev, &ops_rv, &cinfo->lockspace);
 881        if (ret)
 882                goto err;
 883        wait_for_completion(&cinfo->completion);
 884        if (nodes < cinfo->slot_number) {
 885                pr_err("md-cluster: Slot allotted(%d) is greater than available slots(%d).",
 886                        cinfo->slot_number, nodes);
 887                ret = -ERANGE;
 888                goto err;
 889        }
 890        /* Initiate the communication resources */
 891        ret = -ENOMEM;
 892        cinfo->recv_thread = md_register_thread(recv_daemon, mddev, "cluster_recv");
 893        if (!cinfo->recv_thread) {
 894                pr_err("md-cluster: cannot allocate memory for recv_thread!\n");
 895                goto err;
 896        }
 897        cinfo->message_lockres = lockres_init(mddev, "message", NULL, 1);
 898        if (!cinfo->message_lockres)
 899                goto err;
 900        cinfo->token_lockres = lockres_init(mddev, "token", NULL, 0);
 901        if (!cinfo->token_lockres)
 902                goto err;
 903        cinfo->no_new_dev_lockres = lockres_init(mddev, "no-new-dev", NULL, 0);
 904        if (!cinfo->no_new_dev_lockres)
 905                goto err;
 906
 907        ret = dlm_lock_sync(cinfo->token_lockres, DLM_LOCK_EX);
 908        if (ret) {
 909                ret = -EAGAIN;
 910                pr_err("md-cluster: can't join cluster to avoid lock issue\n");
 911                goto err;
 912        }
 913        cinfo->ack_lockres = lockres_init(mddev, "ack", ack_bast, 0);
 914        if (!cinfo->ack_lockres) {
 915                ret = -ENOMEM;
 916                goto err;
 917        }
 918        /* get sync CR lock on ACK. */
 919        if (dlm_lock_sync(cinfo->ack_lockres, DLM_LOCK_CR))
 920                pr_err("md-cluster: failed to get a sync CR lock on ACK!(%d)\n",
 921                                ret);
 922        dlm_unlock_sync(cinfo->token_lockres);
 923        /* get sync CR lock on no-new-dev. */
 924        if (dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR))
 925                pr_err("md-cluster: failed to get a sync CR lock on no-new-dev!(%d)\n", ret);
 926
 927
 928        pr_info("md-cluster: Joined cluster %s slot %d\n", str, cinfo->slot_number);
 929        snprintf(str, 64, "bitmap%04d", cinfo->slot_number - 1);
 930        cinfo->bitmap_lockres = lockres_init(mddev, str, NULL, 1);
 931        if (!cinfo->bitmap_lockres) {
 932                ret = -ENOMEM;
 933                goto err;
 934        }
 935        if (dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW)) {
 936                pr_err("Failed to get bitmap lock\n");
 937                ret = -EINVAL;
 938                goto err;
 939        }
 940
 941        cinfo->resync_lockres = lockres_init(mddev, "resync", NULL, 0);
 942        if (!cinfo->resync_lockres) {
 943                ret = -ENOMEM;
 944                goto err;
 945        }
 946
 947        return 0;
 948err:
 949        set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
 950        md_unregister_thread(&cinfo->recovery_thread);
 951        md_unregister_thread(&cinfo->recv_thread);
 952        lockres_free(cinfo->message_lockres);
 953        lockres_free(cinfo->token_lockres);
 954        lockres_free(cinfo->ack_lockres);
 955        lockres_free(cinfo->no_new_dev_lockres);
 956        lockres_free(cinfo->resync_lockres);
 957        lockres_free(cinfo->bitmap_lockres);
 958        if (cinfo->lockspace)
 959                dlm_release_lockspace(cinfo->lockspace, 2);
 960        mddev->cluster_info = NULL;
 961        kfree(cinfo);
 962        return ret;
 963}
 964
 965static void load_bitmaps(struct mddev *mddev, int total_slots)
 966{
 967        struct md_cluster_info *cinfo = mddev->cluster_info;
 968
 969        /* load all the node's bitmap info for resync */
 970        if (gather_all_resync_info(mddev, total_slots))
 971                pr_err("md-cluster: failed to gather all resyn infos\n");
 972        set_bit(MD_CLUSTER_ALREADY_IN_CLUSTER, &cinfo->state);
 973        /* wake up recv thread in case something need to be handled */
 974        if (test_and_clear_bit(MD_CLUSTER_PENDING_RECV_EVENT, &cinfo->state))
 975                md_wakeup_thread(cinfo->recv_thread);
 976}
 977
 978static void resync_bitmap(struct mddev *mddev)
 979{
 980        struct md_cluster_info *cinfo = mddev->cluster_info;
 981        struct cluster_msg cmsg = {0};
 982        int err;
 983
 984        cmsg.type = cpu_to_le32(BITMAP_NEEDS_SYNC);
 985        err = sendmsg(cinfo, &cmsg, 1);
 986        if (err)
 987                pr_err("%s:%d: failed to send BITMAP_NEEDS_SYNC message (%d)\n",
 988                        __func__, __LINE__, err);
 989}
 990
 991static void unlock_all_bitmaps(struct mddev *mddev);
 992static int leave(struct mddev *mddev)
 993{
 994        struct md_cluster_info *cinfo = mddev->cluster_info;
 995
 996        if (!cinfo)
 997                return 0;
 998
 999        /*
1000         * BITMAP_NEEDS_SYNC message should be sent when node
1001         * is leaving the cluster with dirty bitmap, also we
1002         * can only deliver it when dlm connection is available.
1003         *
1004         * Also, we should send BITMAP_NEEDS_SYNC message in
1005         * case reshaping is interrupted.
1006         */
1007        if ((cinfo->slot_number > 0 && mddev->recovery_cp != MaxSector) ||
1008            (mddev->reshape_position != MaxSector &&
1009             test_bit(MD_CLOSING, &mddev->flags)))
1010                resync_bitmap(mddev);
1011
1012        set_bit(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1013        md_unregister_thread(&cinfo->recovery_thread);
1014        md_unregister_thread(&cinfo->recv_thread);
1015        lockres_free(cinfo->message_lockres);
1016        lockres_free(cinfo->token_lockres);
1017        lockres_free(cinfo->ack_lockres);
1018        lockres_free(cinfo->no_new_dev_lockres);
1019        lockres_free(cinfo->resync_lockres);
1020        lockres_free(cinfo->bitmap_lockres);
1021        unlock_all_bitmaps(mddev);
1022        dlm_release_lockspace(cinfo->lockspace, 2);
1023        kfree(cinfo);
1024        return 0;
1025}
1026
1027/* slot_number(): Returns the MD slot number to use
1028 * DLM starts the slot numbers from 1, wheras cluster-md
1029 * wants the number to be from zero, so we deduct one
1030 */
1031static int slot_number(struct mddev *mddev)
1032{
1033        struct md_cluster_info *cinfo = mddev->cluster_info;
1034
1035        return cinfo->slot_number - 1;
1036}
1037
1038/*
1039 * Check if the communication is already locked, else lock the communication
1040 * channel.
1041 * If it is already locked, token is in EX mode, and hence lock_token()
1042 * should not be called.
1043 */
1044static int metadata_update_start(struct mddev *mddev)
1045{
1046        struct md_cluster_info *cinfo = mddev->cluster_info;
1047        int ret;
1048
1049        /*
1050         * metadata_update_start is always called with the protection of
1051         * reconfig_mutex, so set WAITING_FOR_TOKEN here.
1052         */
1053        ret = test_and_set_bit_lock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD,
1054                                    &cinfo->state);
1055        WARN_ON_ONCE(ret);
1056        md_wakeup_thread(mddev->thread);
1057
1058        wait_event(cinfo->wait,
1059                   !test_and_set_bit(MD_CLUSTER_SEND_LOCK, &cinfo->state) ||
1060                   test_and_clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state));
1061
1062        /* If token is already locked, return 0 */
1063        if (cinfo->token_lockres->mode == DLM_LOCK_EX) {
1064                clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1065                return 0;
1066        }
1067
1068        ret = lock_token(cinfo);
1069        clear_bit_unlock(MD_CLUSTER_HOLDING_MUTEX_FOR_RECVD, &cinfo->state);
1070        return ret;
1071}
1072
1073static int metadata_update_finish(struct mddev *mddev)
1074{
1075        struct md_cluster_info *cinfo = mddev->cluster_info;
1076        struct cluster_msg cmsg;
1077        struct md_rdev *rdev;
1078        int ret = 0;
1079        int raid_slot = -1;
1080
1081        memset(&cmsg, 0, sizeof(cmsg));
1082        cmsg.type = cpu_to_le32(METADATA_UPDATED);
1083        /* Pick up a good active device number to send.
1084         */
1085        rdev_for_each(rdev, mddev)
1086                if (rdev->raid_disk > -1 && !test_bit(Faulty, &rdev->flags)) {
1087                        raid_slot = rdev->desc_nr;
1088                        break;
1089                }
1090        if (raid_slot >= 0) {
1091                cmsg.raid_slot = cpu_to_le32(raid_slot);
1092                ret = __sendmsg(cinfo, &cmsg);
1093        } else
1094                pr_warn("md-cluster: No good device id found to send\n");
1095        clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1096        unlock_comm(cinfo);
1097        return ret;
1098}
1099
1100static void metadata_update_cancel(struct mddev *mddev)
1101{
1102        struct md_cluster_info *cinfo = mddev->cluster_info;
1103        clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1104        unlock_comm(cinfo);
1105}
1106
1107static int update_bitmap_size(struct mddev *mddev, sector_t size)
1108{
1109        struct md_cluster_info *cinfo = mddev->cluster_info;
1110        struct cluster_msg cmsg = {0};
1111        int ret;
1112
1113        cmsg.type = cpu_to_le32(BITMAP_RESIZE);
1114        cmsg.high = cpu_to_le64(size);
1115        ret = sendmsg(cinfo, &cmsg, 0);
1116        if (ret)
1117                pr_err("%s:%d: failed to send BITMAP_RESIZE message (%d)\n",
1118                        __func__, __LINE__, ret);
1119        return ret;
1120}
1121
1122static int resize_bitmaps(struct mddev *mddev, sector_t newsize, sector_t oldsize)
1123{
1124        struct bitmap_counts *counts;
1125        char str[64];
1126        struct dlm_lock_resource *bm_lockres;
1127        struct bitmap *bitmap = mddev->bitmap;
1128        unsigned long my_pages = bitmap->counts.pages;
1129        int i, rv;
1130
1131        /*
1132         * We need to ensure all the nodes can grow to a larger
1133         * bitmap size before make the reshaping.
1134         */
1135        rv = update_bitmap_size(mddev, newsize);
1136        if (rv)
1137                return rv;
1138
1139        for (i = 0; i < mddev->bitmap_info.nodes; i++) {
1140                if (i == md_cluster_ops->slot_number(mddev))
1141                        continue;
1142
1143                bitmap = get_bitmap_from_slot(mddev, i);
1144                if (IS_ERR(bitmap)) {
1145                        pr_err("can't get bitmap from slot %d\n", i);
1146                        bitmap = NULL;
1147                        goto out;
1148                }
1149                counts = &bitmap->counts;
1150
1151                /*
1152                 * If we can hold the bitmap lock of one node then
1153                 * the slot is not occupied, update the pages.
1154                 */
1155                snprintf(str, 64, "bitmap%04d", i);
1156                bm_lockres = lockres_init(mddev, str, NULL, 1);
1157                if (!bm_lockres) {
1158                        pr_err("Cannot initialize %s lock\n", str);
1159                        goto out;
1160                }
1161                bm_lockres->flags |= DLM_LKF_NOQUEUE;
1162                rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
1163                if (!rv)
1164                        counts->pages = my_pages;
1165                lockres_free(bm_lockres);
1166
1167                if (my_pages != counts->pages)
1168                        /*
1169                         * Let's revert the bitmap size if one node
1170                         * can't resize bitmap
1171                         */
1172                        goto out;
1173                md_bitmap_free(bitmap);
1174        }
1175
1176        return 0;
1177out:
1178        md_bitmap_free(bitmap);
1179        update_bitmap_size(mddev, oldsize);
1180        return -1;
1181}
1182
1183/*
1184 * return 0 if all the bitmaps have the same sync_size
1185 */
1186static int cluster_check_sync_size(struct mddev *mddev)
1187{
1188        int i, rv;
1189        bitmap_super_t *sb;
1190        unsigned long my_sync_size, sync_size = 0;
1191        int node_num = mddev->bitmap_info.nodes;
1192        int current_slot = md_cluster_ops->slot_number(mddev);
1193        struct bitmap *bitmap = mddev->bitmap;
1194        char str[64];
1195        struct dlm_lock_resource *bm_lockres;
1196
1197        sb = kmap_atomic(bitmap->storage.sb_page);
1198        my_sync_size = sb->sync_size;
1199        kunmap_atomic(sb);
1200
1201        for (i = 0; i < node_num; i++) {
1202                if (i == current_slot)
1203                        continue;
1204
1205                bitmap = get_bitmap_from_slot(mddev, i);
1206                if (IS_ERR(bitmap)) {
1207                        pr_err("can't get bitmap from slot %d\n", i);
1208                        return -1;
1209                }
1210
1211                /*
1212                 * If we can hold the bitmap lock of one node then
1213                 * the slot is not occupied, update the sb.
1214                 */
1215                snprintf(str, 64, "bitmap%04d", i);
1216                bm_lockres = lockres_init(mddev, str, NULL, 1);
1217                if (!bm_lockres) {
1218                        pr_err("md-cluster: Cannot initialize %s\n", str);
1219                        md_bitmap_free(bitmap);
1220                        return -1;
1221                }
1222                bm_lockres->flags |= DLM_LKF_NOQUEUE;
1223                rv = dlm_lock_sync(bm_lockres, DLM_LOCK_PW);
1224                if (!rv)
1225                        md_bitmap_update_sb(bitmap);
1226                lockres_free(bm_lockres);
1227
1228                sb = kmap_atomic(bitmap->storage.sb_page);
1229                if (sync_size == 0)
1230                        sync_size = sb->sync_size;
1231                else if (sync_size != sb->sync_size) {
1232                        kunmap_atomic(sb);
1233                        md_bitmap_free(bitmap);
1234                        return -1;
1235                }
1236                kunmap_atomic(sb);
1237                md_bitmap_free(bitmap);
1238        }
1239
1240        return (my_sync_size == sync_size) ? 0 : -1;
1241}
1242
1243/*
1244 * Update the size for cluster raid is a little more complex, we perform it
1245 * by the steps:
1246 * 1. hold token lock and update superblock in initiator node.
1247 * 2. send METADATA_UPDATED msg to other nodes.
1248 * 3. The initiator node continues to check each bitmap's sync_size, if all
1249 *    bitmaps have the same value of sync_size, then we can set capacity and
1250 *    let other nodes to perform it. If one node can't update sync_size
1251 *    accordingly, we need to revert to previous value.
1252 */
1253static void update_size(struct mddev *mddev, sector_t old_dev_sectors)
1254{
1255        struct md_cluster_info *cinfo = mddev->cluster_info;
1256        struct cluster_msg cmsg;
1257        struct md_rdev *rdev;
1258        int ret = 0;
1259        int raid_slot = -1;
1260
1261        md_update_sb(mddev, 1);
1262        if (lock_comm(cinfo, 1)) {
1263                pr_err("%s: lock_comm failed\n", __func__);
1264                return;
1265        }
1266
1267        memset(&cmsg, 0, sizeof(cmsg));
1268        cmsg.type = cpu_to_le32(METADATA_UPDATED);
1269        rdev_for_each(rdev, mddev)
1270                if (rdev->raid_disk >= 0 && !test_bit(Faulty, &rdev->flags)) {
1271                        raid_slot = rdev->desc_nr;
1272                        break;
1273                }
1274        if (raid_slot >= 0) {
1275                cmsg.raid_slot = cpu_to_le32(raid_slot);
1276                /*
1277                 * We can only change capiticy after all the nodes can do it,
1278                 * so need to wait after other nodes already received the msg
1279                 * and handled the change
1280                 */
1281                ret = __sendmsg(cinfo, &cmsg);
1282                if (ret) {
1283                        pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
1284                               __func__, __LINE__);
1285                        unlock_comm(cinfo);
1286                        return;
1287                }
1288        } else {
1289                pr_err("md-cluster: No good device id found to send\n");
1290                unlock_comm(cinfo);
1291                return;
1292        }
1293
1294        /*
1295         * check the sync_size from other node's bitmap, if sync_size
1296         * have already updated in other nodes as expected, send an
1297         * empty metadata msg to permit the change of capacity
1298         */
1299        if (cluster_check_sync_size(mddev) == 0) {
1300                memset(&cmsg, 0, sizeof(cmsg));
1301                cmsg.type = cpu_to_le32(CHANGE_CAPACITY);
1302                ret = __sendmsg(cinfo, &cmsg);
1303                if (ret)
1304                        pr_err("%s:%d: failed to send CHANGE_CAPACITY msg\n",
1305                               __func__, __LINE__);
1306                set_capacity_and_notify(mddev->gendisk, mddev->array_sectors);
1307        } else {
1308                /* revert to previous sectors */
1309                ret = mddev->pers->resize(mddev, old_dev_sectors);
1310                ret = __sendmsg(cinfo, &cmsg);
1311                if (ret)
1312                        pr_err("%s:%d: failed to send METADATA_UPDATED msg\n",
1313                               __func__, __LINE__);
1314        }
1315        unlock_comm(cinfo);
1316}
1317
1318static int resync_start(struct mddev *mddev)
1319{
1320        struct md_cluster_info *cinfo = mddev->cluster_info;
1321        return dlm_lock_sync_interruptible(cinfo->resync_lockres, DLM_LOCK_EX, mddev);
1322}
1323
1324static void resync_info_get(struct mddev *mddev, sector_t *lo, sector_t *hi)
1325{
1326        struct md_cluster_info *cinfo = mddev->cluster_info;
1327
1328        spin_lock_irq(&cinfo->suspend_lock);
1329        *lo = cinfo->suspend_lo;
1330        *hi = cinfo->suspend_hi;
1331        spin_unlock_irq(&cinfo->suspend_lock);
1332}
1333
1334static int resync_info_update(struct mddev *mddev, sector_t lo, sector_t hi)
1335{
1336        struct md_cluster_info *cinfo = mddev->cluster_info;
1337        struct resync_info ri;
1338        struct cluster_msg cmsg = {0};
1339
1340        /* do not send zero again, if we have sent before */
1341        if (hi == 0) {
1342                memcpy(&ri, cinfo->bitmap_lockres->lksb.sb_lvbptr, sizeof(struct resync_info));
1343                if (le64_to_cpu(ri.hi) == 0)
1344                        return 0;
1345        }
1346
1347        add_resync_info(cinfo->bitmap_lockres, lo, hi);
1348        /* Re-acquire the lock to refresh LVB */
1349        dlm_lock_sync(cinfo->bitmap_lockres, DLM_LOCK_PW);
1350        cmsg.type = cpu_to_le32(RESYNCING);
1351        cmsg.low = cpu_to_le64(lo);
1352        cmsg.high = cpu_to_le64(hi);
1353
1354        /*
1355         * mddev_lock is held if resync_info_update is called from
1356         * resync_finish (md_reap_sync_thread -> resync_finish)
1357         */
1358        if (lo == 0 && hi == 0)
1359                return sendmsg(cinfo, &cmsg, 1);
1360        else
1361                return sendmsg(cinfo, &cmsg, 0);
1362}
1363
1364static int resync_finish(struct mddev *mddev)
1365{
1366        struct md_cluster_info *cinfo = mddev->cluster_info;
1367        int ret = 0;
1368
1369        clear_bit(MD_RESYNCING_REMOTE, &mddev->recovery);
1370
1371        /*
1372         * If resync thread is interrupted so we can't say resync is finished,
1373         * another node will launch resync thread to continue.
1374         */
1375        if (!test_bit(MD_CLOSING, &mddev->flags))
1376                ret = resync_info_update(mddev, 0, 0);
1377        dlm_unlock_sync(cinfo->resync_lockres);
1378        return ret;
1379}
1380
1381static int area_resyncing(struct mddev *mddev, int direction,
1382                sector_t lo, sector_t hi)
1383{
1384        struct md_cluster_info *cinfo = mddev->cluster_info;
1385        int ret = 0;
1386
1387        if ((direction == READ) &&
1388                test_bit(MD_CLUSTER_SUSPEND_READ_BALANCING, &cinfo->state))
1389                return 1;
1390
1391        spin_lock_irq(&cinfo->suspend_lock);
1392        if (hi > cinfo->suspend_lo && lo < cinfo->suspend_hi)
1393                ret = 1;
1394        spin_unlock_irq(&cinfo->suspend_lock);
1395        return ret;
1396}
1397
1398/* add_new_disk() - initiates a disk add
1399 * However, if this fails before writing md_update_sb(),
1400 * add_new_disk_cancel() must be called to release token lock
1401 */
1402static int add_new_disk(struct mddev *mddev, struct md_rdev *rdev)
1403{
1404        struct md_cluster_info *cinfo = mddev->cluster_info;
1405        struct cluster_msg cmsg;
1406        int ret = 0;
1407        struct mdp_superblock_1 *sb = page_address(rdev->sb_page);
1408        char *uuid = sb->device_uuid;
1409
1410        memset(&cmsg, 0, sizeof(cmsg));
1411        cmsg.type = cpu_to_le32(NEWDISK);
1412        memcpy(cmsg.uuid, uuid, 16);
1413        cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1414        if (lock_comm(cinfo, 1))
1415                return -EAGAIN;
1416        ret = __sendmsg(cinfo, &cmsg);
1417        if (ret) {
1418                unlock_comm(cinfo);
1419                return ret;
1420        }
1421        cinfo->no_new_dev_lockres->flags |= DLM_LKF_NOQUEUE;
1422        ret = dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_EX);
1423        cinfo->no_new_dev_lockres->flags &= ~DLM_LKF_NOQUEUE;
1424        /* Some node does not "see" the device */
1425        if (ret == -EAGAIN)
1426                ret = -ENOENT;
1427        if (ret)
1428                unlock_comm(cinfo);
1429        else {
1430                dlm_lock_sync(cinfo->no_new_dev_lockres, DLM_LOCK_CR);
1431                /* Since MD_CHANGE_DEVS will be set in add_bound_rdev which
1432                 * will run soon after add_new_disk, the below path will be
1433                 * invoked:
1434                 *   md_wakeup_thread(mddev->thread)
1435                 *      -> conf->thread (raid1d)
1436                 *      -> md_check_recovery -> md_update_sb
1437                 *      -> metadata_update_start/finish
1438                 * MD_CLUSTER_SEND_LOCKED_ALREADY will be cleared eventually.
1439                 *
1440                 * For other failure cases, metadata_update_cancel and
1441                 * add_new_disk_cancel also clear below bit as well.
1442                 * */
1443                set_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1444                wake_up(&cinfo->wait);
1445        }
1446        return ret;
1447}
1448
1449static void add_new_disk_cancel(struct mddev *mddev)
1450{
1451        struct md_cluster_info *cinfo = mddev->cluster_info;
1452        clear_bit(MD_CLUSTER_SEND_LOCKED_ALREADY, &cinfo->state);
1453        unlock_comm(cinfo);
1454}
1455
1456static int new_disk_ack(struct mddev *mddev, bool ack)
1457{
1458        struct md_cluster_info *cinfo = mddev->cluster_info;
1459
1460        if (!test_bit(MD_CLUSTER_WAITING_FOR_NEWDISK, &cinfo->state)) {
1461                pr_warn("md-cluster(%s): Spurious cluster confirmation\n", mdname(mddev));
1462                return -EINVAL;
1463        }
1464
1465        if (ack)
1466                dlm_unlock_sync(cinfo->no_new_dev_lockres);
1467        complete(&cinfo->newdisk_completion);
1468        return 0;
1469}
1470
1471static int remove_disk(struct mddev *mddev, struct md_rdev *rdev)
1472{
1473        struct cluster_msg cmsg = {0};
1474        struct md_cluster_info *cinfo = mddev->cluster_info;
1475        cmsg.type = cpu_to_le32(REMOVE);
1476        cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1477        return sendmsg(cinfo, &cmsg, 1);
1478}
1479
1480static int lock_all_bitmaps(struct mddev *mddev)
1481{
1482        int slot, my_slot, ret, held = 1, i = 0;
1483        char str[64];
1484        struct md_cluster_info *cinfo = mddev->cluster_info;
1485
1486        cinfo->other_bitmap_lockres =
1487                kcalloc(mddev->bitmap_info.nodes - 1,
1488                        sizeof(struct dlm_lock_resource *), GFP_KERNEL);
1489        if (!cinfo->other_bitmap_lockres) {
1490                pr_err("md: can't alloc mem for other bitmap locks\n");
1491                return 0;
1492        }
1493
1494        my_slot = slot_number(mddev);
1495        for (slot = 0; slot < mddev->bitmap_info.nodes; slot++) {
1496                if (slot == my_slot)
1497                        continue;
1498
1499                memset(str, '\0', 64);
1500                snprintf(str, 64, "bitmap%04d", slot);
1501                cinfo->other_bitmap_lockres[i] = lockres_init(mddev, str, NULL, 1);
1502                if (!cinfo->other_bitmap_lockres[i])
1503                        return -ENOMEM;
1504
1505                cinfo->other_bitmap_lockres[i]->flags |= DLM_LKF_NOQUEUE;
1506                ret = dlm_lock_sync(cinfo->other_bitmap_lockres[i], DLM_LOCK_PW);
1507                if (ret)
1508                        held = -1;
1509                i++;
1510        }
1511
1512        return held;
1513}
1514
1515static void unlock_all_bitmaps(struct mddev *mddev)
1516{
1517        struct md_cluster_info *cinfo = mddev->cluster_info;
1518        int i;
1519
1520        /* release other node's bitmap lock if they are existed */
1521        if (cinfo->other_bitmap_lockres) {
1522                for (i = 0; i < mddev->bitmap_info.nodes - 1; i++) {
1523                        if (cinfo->other_bitmap_lockres[i]) {
1524                                lockres_free(cinfo->other_bitmap_lockres[i]);
1525                        }
1526                }
1527                kfree(cinfo->other_bitmap_lockres);
1528                cinfo->other_bitmap_lockres = NULL;
1529        }
1530}
1531
1532static int gather_bitmaps(struct md_rdev *rdev)
1533{
1534        int sn, err;
1535        sector_t lo, hi;
1536        struct cluster_msg cmsg = {0};
1537        struct mddev *mddev = rdev->mddev;
1538        struct md_cluster_info *cinfo = mddev->cluster_info;
1539
1540        cmsg.type = cpu_to_le32(RE_ADD);
1541        cmsg.raid_slot = cpu_to_le32(rdev->desc_nr);
1542        err = sendmsg(cinfo, &cmsg, 1);
1543        if (err)
1544                goto out;
1545
1546        for (sn = 0; sn < mddev->bitmap_info.nodes; sn++) {
1547                if (sn == (cinfo->slot_number - 1))
1548                        continue;
1549                err = md_bitmap_copy_from_slot(mddev, sn, &lo, &hi, false);
1550                if (err) {
1551                        pr_warn("md-cluster: Could not gather bitmaps from slot %d", sn);
1552                        goto out;
1553                }
1554                if ((hi > 0) && (lo < mddev->recovery_cp))
1555                        mddev->recovery_cp = lo;
1556        }
1557out:
1558        return err;
1559}
1560
1561static struct md_cluster_operations cluster_ops = {
1562        .join   = join,
1563        .leave  = leave,
1564        .slot_number = slot_number,
1565        .resync_start = resync_start,
1566        .resync_finish = resync_finish,
1567        .resync_info_update = resync_info_update,
1568        .resync_info_get = resync_info_get,
1569        .metadata_update_start = metadata_update_start,
1570        .metadata_update_finish = metadata_update_finish,
1571        .metadata_update_cancel = metadata_update_cancel,
1572        .area_resyncing = area_resyncing,
1573        .add_new_disk = add_new_disk,
1574        .add_new_disk_cancel = add_new_disk_cancel,
1575        .new_disk_ack = new_disk_ack,
1576        .remove_disk = remove_disk,
1577        .load_bitmaps = load_bitmaps,
1578        .gather_bitmaps = gather_bitmaps,
1579        .resize_bitmaps = resize_bitmaps,
1580        .lock_all_bitmaps = lock_all_bitmaps,
1581        .unlock_all_bitmaps = unlock_all_bitmaps,
1582        .update_size = update_size,
1583};
1584
1585static int __init cluster_init(void)
1586{
1587        pr_warn("md-cluster: support raid1 and raid10 (limited support)\n");
1588        pr_info("Registering Cluster MD functions\n");
1589        register_md_cluster_operations(&cluster_ops, THIS_MODULE);
1590        return 0;
1591}
1592
1593static void cluster_exit(void)
1594{
1595        unregister_md_cluster_operations();
1596}
1597
1598module_init(cluster_init);
1599module_exit(cluster_exit);
1600MODULE_AUTHOR("SUSE");
1601MODULE_LICENSE("GPL");
1602MODULE_DESCRIPTION("Clustering support for MD");
1603