linux/drivers/block/drbd/drbd_main.c
<<
>>
Prefs
   1/*
   2   drbd.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   Thanks to Carter Burden, Bart Grantham and Gennadiy Nerubayev
  11   from Logicworks, Inc. for making SDP replication support possible.
  12
  13   drbd is free software; you can redistribute it and/or modify
  14   it under the terms of the GNU General Public License as published by
  15   the Free Software Foundation; either version 2, or (at your option)
  16   any later version.
  17
  18   drbd is distributed in the hope that it will be useful,
  19   but WITHOUT ANY WARRANTY; without even the implied warranty of
  20   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  21   GNU General Public License for more details.
  22
  23   You should have received a copy of the GNU General Public License
  24   along with drbd; see the file COPYING.  If not, write to
  25   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  26
  27 */
  28
  29#include <linux/module.h>
  30#include <linux/drbd.h>
  31#include <asm/uaccess.h>
  32#include <asm/types.h>
  33#include <net/sock.h>
  34#include <linux/ctype.h>
  35#include <linux/mutex.h>
  36#include <linux/fs.h>
  37#include <linux/file.h>
  38#include <linux/proc_fs.h>
  39#include <linux/init.h>
  40#include <linux/mm.h>
  41#include <linux/memcontrol.h>
  42#include <linux/mm_inline.h>
  43#include <linux/slab.h>
  44#include <linux/random.h>
  45#include <linux/reboot.h>
  46#include <linux/notifier.h>
  47#include <linux/kthread.h>
  48
  49#define __KERNEL_SYSCALLS__
  50#include <linux/unistd.h>
  51#include <linux/vmalloc.h>
  52
  53#include <linux/drbd_limits.h>
  54#include "drbd_int.h"
  55#include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
  56
  57#include "drbd_vli.h"
  58
  59struct after_state_chg_work {
  60        struct drbd_work w;
  61        union drbd_state os;
  62        union drbd_state ns;
  63        enum chg_state_flags flags;
  64        struct completion *done;
  65};
  66
  67static DEFINE_MUTEX(drbd_main_mutex);
  68int drbdd_init(struct drbd_thread *);
  69int drbd_worker(struct drbd_thread *);
  70int drbd_asender(struct drbd_thread *);
  71
  72int drbd_init(void);
  73static int drbd_open(struct block_device *bdev, fmode_t mode);
  74static int drbd_release(struct gendisk *gd, fmode_t mode);
  75static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused);
  76static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
  77                           union drbd_state ns, enum chg_state_flags flags);
  78static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused);
  79static void md_sync_timer_fn(unsigned long data);
  80static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused);
  81static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused);
  82
  83MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
  84              "Lars Ellenberg <lars@linbit.com>");
  85MODULE_DESCRIPTION("drbd - Distributed Replicated Block Device v" REL_VERSION);
  86MODULE_VERSION(REL_VERSION);
  87MODULE_LICENSE("GPL");
  88MODULE_PARM_DESC(minor_count, "Maximum number of drbd devices (1-255)");
  89MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
  90
  91#include <linux/moduleparam.h>
  92/* allow_open_on_secondary */
  93MODULE_PARM_DESC(allow_oos, "DONT USE!");
  94/* thanks to these macros, if compiled into the kernel (not-module),
  95 * this becomes the boot parameter drbd.minor_count */
  96module_param(minor_count, uint, 0444);
  97module_param(disable_sendpage, bool, 0644);
  98module_param(allow_oos, bool, 0);
  99module_param(cn_idx, uint, 0444);
 100module_param(proc_details, int, 0644);
 101
 102#ifdef CONFIG_DRBD_FAULT_INJECTION
 103int enable_faults;
 104int fault_rate;
 105static int fault_count;
 106int fault_devs;
 107/* bitmap of enabled faults */
 108module_param(enable_faults, int, 0664);
 109/* fault rate % value - applies to all enabled faults */
 110module_param(fault_rate, int, 0664);
 111/* count of faults inserted */
 112module_param(fault_count, int, 0664);
 113/* bitmap of devices to insert faults on */
 114module_param(fault_devs, int, 0644);
 115#endif
 116
 117/* module parameter, defined */
 118unsigned int minor_count = 32;
 119int disable_sendpage;
 120int allow_oos;
 121unsigned int cn_idx = CN_IDX_DRBD;
 122int proc_details;       /* Detail level in proc drbd*/
 123
 124/* Module parameter for setting the user mode helper program
 125 * to run. Default is /sbin/drbdadm */
 126char usermode_helper[80] = "/sbin/drbdadm";
 127
 128module_param_string(usermode_helper, usermode_helper, sizeof(usermode_helper), 0644);
 129
 130/* in 2.6.x, our device mapping and config info contains our virtual gendisks
 131 * as member "struct gendisk *vdisk;"
 132 */
 133struct drbd_conf **minor_table;
 134
 135struct kmem_cache *drbd_request_cache;
 136struct kmem_cache *drbd_ee_cache;       /* epoch entries */
 137struct kmem_cache *drbd_bm_ext_cache;   /* bitmap extents */
 138struct kmem_cache *drbd_al_ext_cache;   /* activity log extents */
 139mempool_t *drbd_request_mempool;
 140mempool_t *drbd_ee_mempool;
 141
 142/* I do not use a standard mempool, because:
 143   1) I want to hand out the pre-allocated objects first.
 144   2) I want to be able to interrupt sleeping allocation with a signal.
 145   Note: This is a single linked list, the next pointer is the private
 146         member of struct page.
 147 */
 148struct page *drbd_pp_pool;
 149spinlock_t   drbd_pp_lock;
 150int          drbd_pp_vacant;
 151wait_queue_head_t drbd_pp_wait;
 152
 153DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
 154
 155static const struct block_device_operations drbd_ops = {
 156        .owner =   THIS_MODULE,
 157        .open =    drbd_open,
 158        .release = drbd_release,
 159};
 160
 161#define ARRY_SIZE(A) (sizeof(A)/sizeof(A[0]))
 162
 163#ifdef __CHECKER__
 164/* When checking with sparse, and this is an inline function, sparse will
 165   give tons of false positives. When this is a real functions sparse works.
 166 */
 167int _get_ldev_if_state(struct drbd_conf *mdev, enum drbd_disk_state mins)
 168{
 169        int io_allowed;
 170
 171        atomic_inc(&mdev->local_cnt);
 172        io_allowed = (mdev->state.disk >= mins);
 173        if (!io_allowed) {
 174                if (atomic_dec_and_test(&mdev->local_cnt))
 175                        wake_up(&mdev->misc_wait);
 176        }
 177        return io_allowed;
 178}
 179
 180#endif
 181
 182/**
 183 * DOC: The transfer log
 184 *
 185 * The transfer log is a single linked list of &struct drbd_tl_epoch objects.
 186 * mdev->newest_tle points to the head, mdev->oldest_tle points to the tail
 187 * of the list. There is always at least one &struct drbd_tl_epoch object.
 188 *
 189 * Each &struct drbd_tl_epoch has a circular double linked list of requests
 190 * attached.
 191 */
 192static int tl_init(struct drbd_conf *mdev)
 193{
 194        struct drbd_tl_epoch *b;
 195
 196        /* during device minor initialization, we may well use GFP_KERNEL */
 197        b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_KERNEL);
 198        if (!b)
 199                return 0;
 200        INIT_LIST_HEAD(&b->requests);
 201        INIT_LIST_HEAD(&b->w.list);
 202        b->next = NULL;
 203        b->br_number = 4711;
 204        b->n_writes = 0;
 205        b->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
 206
 207        mdev->oldest_tle = b;
 208        mdev->newest_tle = b;
 209        INIT_LIST_HEAD(&mdev->out_of_sequence_requests);
 210
 211        mdev->tl_hash = NULL;
 212        mdev->tl_hash_s = 0;
 213
 214        return 1;
 215}
 216
 217static void tl_cleanup(struct drbd_conf *mdev)
 218{
 219        D_ASSERT(mdev->oldest_tle == mdev->newest_tle);
 220        D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
 221        kfree(mdev->oldest_tle);
 222        mdev->oldest_tle = NULL;
 223        kfree(mdev->unused_spare_tle);
 224        mdev->unused_spare_tle = NULL;
 225        kfree(mdev->tl_hash);
 226        mdev->tl_hash = NULL;
 227        mdev->tl_hash_s = 0;
 228}
 229
 230/**
 231 * _tl_add_barrier() - Adds a barrier to the transfer log
 232 * @mdev:       DRBD device.
 233 * @new:        Barrier to be added before the current head of the TL.
 234 *
 235 * The caller must hold the req_lock.
 236 */
 237void _tl_add_barrier(struct drbd_conf *mdev, struct drbd_tl_epoch *new)
 238{
 239        struct drbd_tl_epoch *newest_before;
 240
 241        INIT_LIST_HEAD(&new->requests);
 242        INIT_LIST_HEAD(&new->w.list);
 243        new->w.cb = NULL; /* if this is != NULL, we need to dec_ap_pending in tl_clear */
 244        new->next = NULL;
 245        new->n_writes = 0;
 246
 247        newest_before = mdev->newest_tle;
 248        /* never send a barrier number == 0, because that is special-cased
 249         * when using TCQ for our write ordering code */
 250        new->br_number = (newest_before->br_number+1) ?: 1;
 251        if (mdev->newest_tle != new) {
 252                mdev->newest_tle->next = new;
 253                mdev->newest_tle = new;
 254        }
 255}
 256
 257/**
 258 * tl_release() - Free or recycle the oldest &struct drbd_tl_epoch object of the TL
 259 * @mdev:       DRBD device.
 260 * @barrier_nr: Expected identifier of the DRBD write barrier packet.
 261 * @set_size:   Expected number of requests before that barrier.
 262 *
 263 * In case the passed barrier_nr or set_size does not match the oldest
 264 * &struct drbd_tl_epoch objects this function will cause a termination
 265 * of the connection.
 266 */
 267void tl_release(struct drbd_conf *mdev, unsigned int barrier_nr,
 268                       unsigned int set_size)
 269{
 270        struct drbd_tl_epoch *b, *nob; /* next old barrier */
 271        struct list_head *le, *tle;
 272        struct drbd_request *r;
 273
 274        spin_lock_irq(&mdev->req_lock);
 275
 276        b = mdev->oldest_tle;
 277
 278        /* first some paranoia code */
 279        if (b == NULL) {
 280                dev_err(DEV, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
 281                        barrier_nr);
 282                goto bail;
 283        }
 284        if (b->br_number != barrier_nr) {
 285                dev_err(DEV, "BAD! BarrierAck #%u received, expected #%u!\n",
 286                        barrier_nr, b->br_number);
 287                goto bail;
 288        }
 289        if (b->n_writes != set_size) {
 290                dev_err(DEV, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
 291                        barrier_nr, set_size, b->n_writes);
 292                goto bail;
 293        }
 294
 295        /* Clean up list of requests processed during current epoch */
 296        list_for_each_safe(le, tle, &b->requests) {
 297                r = list_entry(le, struct drbd_request, tl_requests);
 298                _req_mod(r, barrier_acked);
 299        }
 300        /* There could be requests on the list waiting for completion
 301           of the write to the local disk. To avoid corruptions of
 302           slab's data structures we have to remove the lists head.
 303
 304           Also there could have been a barrier ack out of sequence, overtaking
 305           the write acks - which would be a bug and violating write ordering.
 306           To not deadlock in case we lose connection while such requests are
 307           still pending, we need some way to find them for the
 308           _req_mode(connection_lost_while_pending).
 309
 310           These have been list_move'd to the out_of_sequence_requests list in
 311           _req_mod(, barrier_acked) above.
 312           */
 313        list_del_init(&b->requests);
 314
 315        nob = b->next;
 316        if (test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
 317                _tl_add_barrier(mdev, b);
 318                if (nob)
 319                        mdev->oldest_tle = nob;
 320                /* if nob == NULL b was the only barrier, and becomes the new
 321                   barrier. Therefore mdev->oldest_tle points already to b */
 322        } else {
 323                D_ASSERT(nob != NULL);
 324                mdev->oldest_tle = nob;
 325                kfree(b);
 326        }
 327
 328        spin_unlock_irq(&mdev->req_lock);
 329        dec_ap_pending(mdev);
 330
 331        return;
 332
 333bail:
 334        spin_unlock_irq(&mdev->req_lock);
 335        drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
 336}
 337
 338/**
 339 * _tl_restart() - Walks the transfer log, and applies an action to all requests
 340 * @mdev:       DRBD device.
 341 * @what:       The action/event to perform with all request objects
 342 *
 343 * @what might be one of connection_lost_while_pending, resend, fail_frozen_disk_io,
 344 * restart_frozen_disk_io.
 345 */
 346static void _tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
 347{
 348        struct drbd_tl_epoch *b, *tmp, **pn;
 349        struct list_head *le, *tle, carry_reads;
 350        struct drbd_request *req;
 351        int rv, n_writes, n_reads;
 352
 353        b = mdev->oldest_tle;
 354        pn = &mdev->oldest_tle;
 355        while (b) {
 356                n_writes = 0;
 357                n_reads = 0;
 358                INIT_LIST_HEAD(&carry_reads);
 359                list_for_each_safe(le, tle, &b->requests) {
 360                        req = list_entry(le, struct drbd_request, tl_requests);
 361                        rv = _req_mod(req, what);
 362
 363                        n_writes += (rv & MR_WRITE) >> MR_WRITE_SHIFT;
 364                        n_reads  += (rv & MR_READ) >> MR_READ_SHIFT;
 365                }
 366                tmp = b->next;
 367
 368                if (n_writes) {
 369                        if (what == resend) {
 370                                b->n_writes = n_writes;
 371                                if (b->w.cb == NULL) {
 372                                        b->w.cb = w_send_barrier;
 373                                        inc_ap_pending(mdev);
 374                                        set_bit(CREATE_BARRIER, &mdev->flags);
 375                                }
 376
 377                                drbd_queue_work(&mdev->data.work, &b->w);
 378                        }
 379                        pn = &b->next;
 380                } else {
 381                        if (n_reads)
 382                                list_add(&carry_reads, &b->requests);
 383                        /* there could still be requests on that ring list,
 384                         * in case local io is still pending */
 385                        list_del(&b->requests);
 386
 387                        /* dec_ap_pending corresponding to queue_barrier.
 388                         * the newest barrier may not have been queued yet,
 389                         * in which case w.cb is still NULL. */
 390                        if (b->w.cb != NULL)
 391                                dec_ap_pending(mdev);
 392
 393                        if (b == mdev->newest_tle) {
 394                                /* recycle, but reinit! */
 395                                D_ASSERT(tmp == NULL);
 396                                INIT_LIST_HEAD(&b->requests);
 397                                list_splice(&carry_reads, &b->requests);
 398                                INIT_LIST_HEAD(&b->w.list);
 399                                b->w.cb = NULL;
 400                                b->br_number = net_random();
 401                                b->n_writes = 0;
 402
 403                                *pn = b;
 404                                break;
 405                        }
 406                        *pn = tmp;
 407                        kfree(b);
 408                }
 409                b = tmp;
 410                list_splice(&carry_reads, &b->requests);
 411        }
 412}
 413
 414
 415/**
 416 * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
 417 * @mdev:       DRBD device.
 418 *
 419 * This is called after the connection to the peer was lost. The storage covered
 420 * by the requests on the transfer gets marked as our of sync. Called from the
 421 * receiver thread and the worker thread.
 422 */
 423void tl_clear(struct drbd_conf *mdev)
 424{
 425        struct list_head *le, *tle;
 426        struct drbd_request *r;
 427
 428        spin_lock_irq(&mdev->req_lock);
 429
 430        _tl_restart(mdev, connection_lost_while_pending);
 431
 432        /* we expect this list to be empty. */
 433        D_ASSERT(list_empty(&mdev->out_of_sequence_requests));
 434
 435        /* but just in case, clean it up anyways! */
 436        list_for_each_safe(le, tle, &mdev->out_of_sequence_requests) {
 437                r = list_entry(le, struct drbd_request, tl_requests);
 438                /* It would be nice to complete outside of spinlock.
 439                 * But this is easier for now. */
 440                _req_mod(r, connection_lost_while_pending);
 441        }
 442
 443        /* ensure bit indicating barrier is required is clear */
 444        clear_bit(CREATE_BARRIER, &mdev->flags);
 445
 446        memset(mdev->app_reads_hash, 0, APP_R_HSIZE*sizeof(void *));
 447
 448        spin_unlock_irq(&mdev->req_lock);
 449}
 450
 451void tl_restart(struct drbd_conf *mdev, enum drbd_req_event what)
 452{
 453        spin_lock_irq(&mdev->req_lock);
 454        _tl_restart(mdev, what);
 455        spin_unlock_irq(&mdev->req_lock);
 456}
 457
 458/**
 459 * cl_wide_st_chg() - TRUE if the state change is a cluster wide one
 460 * @mdev:       DRBD device.
 461 * @os:         old (current) state.
 462 * @ns:         new (wanted) state.
 463 */
 464static int cl_wide_st_chg(struct drbd_conf *mdev,
 465                          union drbd_state os, union drbd_state ns)
 466{
 467        return (os.conn >= C_CONNECTED && ns.conn >= C_CONNECTED &&
 468                 ((os.role != R_PRIMARY && ns.role == R_PRIMARY) ||
 469                  (os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
 470                  (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S) ||
 471                  (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))) ||
 472                (os.conn >= C_CONNECTED && ns.conn == C_DISCONNECTING) ||
 473                (os.conn == C_CONNECTED && ns.conn == C_VERIFY_S);
 474}
 475
 476int drbd_change_state(struct drbd_conf *mdev, enum chg_state_flags f,
 477                      union drbd_state mask, union drbd_state val)
 478{
 479        unsigned long flags;
 480        union drbd_state os, ns;
 481        int rv;
 482
 483        spin_lock_irqsave(&mdev->req_lock, flags);
 484        os = mdev->state;
 485        ns.i = (os.i & ~mask.i) | val.i;
 486        rv = _drbd_set_state(mdev, ns, f, NULL);
 487        ns = mdev->state;
 488        spin_unlock_irqrestore(&mdev->req_lock, flags);
 489
 490        return rv;
 491}
 492
 493/**
 494 * drbd_force_state() - Impose a change which happens outside our control on our state
 495 * @mdev:       DRBD device.
 496 * @mask:       mask of state bits to change.
 497 * @val:        value of new state bits.
 498 */
 499void drbd_force_state(struct drbd_conf *mdev,
 500        union drbd_state mask, union drbd_state val)
 501{
 502        drbd_change_state(mdev, CS_HARD, mask, val);
 503}
 504
 505static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns);
 506static int is_valid_state_transition(struct drbd_conf *,
 507                                     union drbd_state, union drbd_state);
 508static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
 509                                       union drbd_state ns, const char **warn_sync_abort);
 510int drbd_send_state_req(struct drbd_conf *,
 511                        union drbd_state, union drbd_state);
 512
 513static enum drbd_state_ret_codes _req_st_cond(struct drbd_conf *mdev,
 514                                    union drbd_state mask, union drbd_state val)
 515{
 516        union drbd_state os, ns;
 517        unsigned long flags;
 518        int rv;
 519
 520        if (test_and_clear_bit(CL_ST_CHG_SUCCESS, &mdev->flags))
 521                return SS_CW_SUCCESS;
 522
 523        if (test_and_clear_bit(CL_ST_CHG_FAIL, &mdev->flags))
 524                return SS_CW_FAILED_BY_PEER;
 525
 526        rv = 0;
 527        spin_lock_irqsave(&mdev->req_lock, flags);
 528        os = mdev->state;
 529        ns.i = (os.i & ~mask.i) | val.i;
 530        ns = sanitize_state(mdev, os, ns, NULL);
 531
 532        if (!cl_wide_st_chg(mdev, os, ns))
 533                rv = SS_CW_NO_NEED;
 534        if (!rv) {
 535                rv = is_valid_state(mdev, ns);
 536                if (rv == SS_SUCCESS) {
 537                        rv = is_valid_state_transition(mdev, ns, os);
 538                        if (rv == SS_SUCCESS)
 539                                rv = 0; /* cont waiting, otherwise fail. */
 540                }
 541        }
 542        spin_unlock_irqrestore(&mdev->req_lock, flags);
 543
 544        return rv;
 545}
 546
 547/**
 548 * drbd_req_state() - Perform an eventually cluster wide state change
 549 * @mdev:       DRBD device.
 550 * @mask:       mask of state bits to change.
 551 * @val:        value of new state bits.
 552 * @f:          flags
 553 *
 554 * Should not be called directly, use drbd_request_state() or
 555 * _drbd_request_state().
 556 */
 557static int drbd_req_state(struct drbd_conf *mdev,
 558                          union drbd_state mask, union drbd_state val,
 559                          enum chg_state_flags f)
 560{
 561        struct completion done;
 562        unsigned long flags;
 563        union drbd_state os, ns;
 564        int rv;
 565
 566        init_completion(&done);
 567
 568        if (f & CS_SERIALIZE)
 569                mutex_lock(&mdev->state_mutex);
 570
 571        spin_lock_irqsave(&mdev->req_lock, flags);
 572        os = mdev->state;
 573        ns.i = (os.i & ~mask.i) | val.i;
 574        ns = sanitize_state(mdev, os, ns, NULL);
 575
 576        if (cl_wide_st_chg(mdev, os, ns)) {
 577                rv = is_valid_state(mdev, ns);
 578                if (rv == SS_SUCCESS)
 579                        rv = is_valid_state_transition(mdev, ns, os);
 580                spin_unlock_irqrestore(&mdev->req_lock, flags);
 581
 582                if (rv < SS_SUCCESS) {
 583                        if (f & CS_VERBOSE)
 584                                print_st_err(mdev, os, ns, rv);
 585                        goto abort;
 586                }
 587
 588                drbd_state_lock(mdev);
 589                if (!drbd_send_state_req(mdev, mask, val)) {
 590                        drbd_state_unlock(mdev);
 591                        rv = SS_CW_FAILED_BY_PEER;
 592                        if (f & CS_VERBOSE)
 593                                print_st_err(mdev, os, ns, rv);
 594                        goto abort;
 595                }
 596
 597                wait_event(mdev->state_wait,
 598                        (rv = _req_st_cond(mdev, mask, val)));
 599
 600                if (rv < SS_SUCCESS) {
 601                        drbd_state_unlock(mdev);
 602                        if (f & CS_VERBOSE)
 603                                print_st_err(mdev, os, ns, rv);
 604                        goto abort;
 605                }
 606                spin_lock_irqsave(&mdev->req_lock, flags);
 607                os = mdev->state;
 608                ns.i = (os.i & ~mask.i) | val.i;
 609                rv = _drbd_set_state(mdev, ns, f, &done);
 610                drbd_state_unlock(mdev);
 611        } else {
 612                rv = _drbd_set_state(mdev, ns, f, &done);
 613        }
 614
 615        spin_unlock_irqrestore(&mdev->req_lock, flags);
 616
 617        if (f & CS_WAIT_COMPLETE && rv == SS_SUCCESS) {
 618                D_ASSERT(current != mdev->worker.task);
 619                wait_for_completion(&done);
 620        }
 621
 622abort:
 623        if (f & CS_SERIALIZE)
 624                mutex_unlock(&mdev->state_mutex);
 625
 626        return rv;
 627}
 628
 629/**
 630 * _drbd_request_state() - Request a state change (with flags)
 631 * @mdev:       DRBD device.
 632 * @mask:       mask of state bits to change.
 633 * @val:        value of new state bits.
 634 * @f:          flags
 635 *
 636 * Cousin of drbd_request_state(), useful with the CS_WAIT_COMPLETE
 637 * flag, or when logging of failed state change requests is not desired.
 638 */
 639int _drbd_request_state(struct drbd_conf *mdev, union drbd_state mask,
 640                        union drbd_state val,   enum chg_state_flags f)
 641{
 642        int rv;
 643
 644        wait_event(mdev->state_wait,
 645                   (rv = drbd_req_state(mdev, mask, val, f)) != SS_IN_TRANSIENT_STATE);
 646
 647        return rv;
 648}
 649
 650static void print_st(struct drbd_conf *mdev, char *name, union drbd_state ns)
 651{
 652        dev_err(DEV, " %s = { cs:%s ro:%s/%s ds:%s/%s %c%c%c%c }\n",
 653            name,
 654            drbd_conn_str(ns.conn),
 655            drbd_role_str(ns.role),
 656            drbd_role_str(ns.peer),
 657            drbd_disk_str(ns.disk),
 658            drbd_disk_str(ns.pdsk),
 659            is_susp(ns) ? 's' : 'r',
 660            ns.aftr_isp ? 'a' : '-',
 661            ns.peer_isp ? 'p' : '-',
 662            ns.user_isp ? 'u' : '-'
 663            );
 664}
 665
 666void print_st_err(struct drbd_conf *mdev,
 667        union drbd_state os, union drbd_state ns, int err)
 668{
 669        if (err == SS_IN_TRANSIENT_STATE)
 670                return;
 671        dev_err(DEV, "State change failed: %s\n", drbd_set_st_err_str(err));
 672        print_st(mdev, " state", os);
 673        print_st(mdev, "wanted", ns);
 674}
 675
 676
 677#define drbd_peer_str drbd_role_str
 678#define drbd_pdsk_str drbd_disk_str
 679
 680#define drbd_susp_str(A)     ((A) ? "1" : "0")
 681#define drbd_aftr_isp_str(A) ((A) ? "1" : "0")
 682#define drbd_peer_isp_str(A) ((A) ? "1" : "0")
 683#define drbd_user_isp_str(A) ((A) ? "1" : "0")
 684
 685#define PSC(A) \
 686        ({ if (ns.A != os.A) { \
 687                pbp += sprintf(pbp, #A "( %s -> %s ) ", \
 688                              drbd_##A##_str(os.A), \
 689                              drbd_##A##_str(ns.A)); \
 690        } })
 691
 692/**
 693 * is_valid_state() - Returns an SS_ error code if ns is not valid
 694 * @mdev:       DRBD device.
 695 * @ns:         State to consider.
 696 */
 697static int is_valid_state(struct drbd_conf *mdev, union drbd_state ns)
 698{
 699        /* See drbd_state_sw_errors in drbd_strings.c */
 700
 701        enum drbd_fencing_p fp;
 702        int rv = SS_SUCCESS;
 703
 704        fp = FP_DONT_CARE;
 705        if (get_ldev(mdev)) {
 706                fp = mdev->ldev->dc.fencing;
 707                put_ldev(mdev);
 708        }
 709
 710        if (get_net_conf(mdev)) {
 711                if (!mdev->net_conf->two_primaries &&
 712                    ns.role == R_PRIMARY && ns.peer == R_PRIMARY)
 713                        rv = SS_TWO_PRIMARIES;
 714                put_net_conf(mdev);
 715        }
 716
 717        if (rv <= 0)
 718                /* already found a reason to abort */;
 719        else if (ns.role == R_SECONDARY && mdev->open_cnt)
 720                rv = SS_DEVICE_IN_USE;
 721
 722        else if (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.disk < D_UP_TO_DATE)
 723                rv = SS_NO_UP_TO_DATE_DISK;
 724
 725        else if (fp >= FP_RESOURCE &&
 726                 ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk >= D_UNKNOWN)
 727                rv = SS_PRIMARY_NOP;
 728
 729        else if (ns.role == R_PRIMARY && ns.disk <= D_INCONSISTENT && ns.pdsk <= D_INCONSISTENT)
 730                rv = SS_NO_UP_TO_DATE_DISK;
 731
 732        else if (ns.conn > C_CONNECTED && ns.disk < D_INCONSISTENT)
 733                rv = SS_NO_LOCAL_DISK;
 734
 735        else if (ns.conn > C_CONNECTED && ns.pdsk < D_INCONSISTENT)
 736                rv = SS_NO_REMOTE_DISK;
 737
 738        else if (ns.conn > C_CONNECTED && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE)
 739                rv = SS_NO_UP_TO_DATE_DISK;
 740
 741        else if ((ns.conn == C_CONNECTED ||
 742                  ns.conn == C_WF_BITMAP_S ||
 743                  ns.conn == C_SYNC_SOURCE ||
 744                  ns.conn == C_PAUSED_SYNC_S) &&
 745                  ns.disk == D_OUTDATED)
 746                rv = SS_CONNECTED_OUTDATES;
 747
 748        else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
 749                 (mdev->sync_conf.verify_alg[0] == 0))
 750                rv = SS_NO_VERIFY_ALG;
 751
 752        else if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
 753                  mdev->agreed_pro_version < 88)
 754                rv = SS_NOT_SUPPORTED;
 755
 756        return rv;
 757}
 758
 759/**
 760 * is_valid_state_transition() - Returns an SS_ error code if the state transition is not possible
 761 * @mdev:       DRBD device.
 762 * @ns:         new state.
 763 * @os:         old state.
 764 */
 765static int is_valid_state_transition(struct drbd_conf *mdev,
 766                                     union drbd_state ns, union drbd_state os)
 767{
 768        int rv = SS_SUCCESS;
 769
 770        if ((ns.conn == C_STARTING_SYNC_T || ns.conn == C_STARTING_SYNC_S) &&
 771            os.conn > C_CONNECTED)
 772                rv = SS_RESYNC_RUNNING;
 773
 774        if (ns.conn == C_DISCONNECTING && os.conn == C_STANDALONE)
 775                rv = SS_ALREADY_STANDALONE;
 776
 777        if (ns.disk > D_ATTACHING && os.disk == D_DISKLESS)
 778                rv = SS_IS_DISKLESS;
 779
 780        if (ns.conn == C_WF_CONNECTION && os.conn < C_UNCONNECTED)
 781                rv = SS_NO_NET_CONFIG;
 782
 783        if (ns.disk == D_OUTDATED && os.disk < D_OUTDATED && os.disk != D_ATTACHING)
 784                rv = SS_LOWER_THAN_OUTDATED;
 785
 786        if (ns.conn == C_DISCONNECTING && os.conn == C_UNCONNECTED)
 787                rv = SS_IN_TRANSIENT_STATE;
 788
 789        if (ns.conn == os.conn && ns.conn == C_WF_REPORT_PARAMS)
 790                rv = SS_IN_TRANSIENT_STATE;
 791
 792        if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) && os.conn < C_CONNECTED)
 793                rv = SS_NEED_CONNECTION;
 794
 795        if ((ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T) &&
 796            ns.conn != os.conn && os.conn > C_CONNECTED)
 797                rv = SS_RESYNC_RUNNING;
 798
 799        if ((ns.conn == C_STARTING_SYNC_S || ns.conn == C_STARTING_SYNC_T) &&
 800            os.conn < C_CONNECTED)
 801                rv = SS_NEED_CONNECTION;
 802
 803        return rv;
 804}
 805
 806/**
 807 * sanitize_state() - Resolves implicitly necessary additional changes to a state transition
 808 * @mdev:       DRBD device.
 809 * @os:         old state.
 810 * @ns:         new state.
 811 * @warn_sync_abort:
 812 *
 813 * When we loose connection, we have to set the state of the peers disk (pdsk)
 814 * to D_UNKNOWN. This rule and many more along those lines are in this function.
 815 */
 816static union drbd_state sanitize_state(struct drbd_conf *mdev, union drbd_state os,
 817                                       union drbd_state ns, const char **warn_sync_abort)
 818{
 819        enum drbd_fencing_p fp;
 820
 821        fp = FP_DONT_CARE;
 822        if (get_ldev(mdev)) {
 823                fp = mdev->ldev->dc.fencing;
 824                put_ldev(mdev);
 825        }
 826
 827        /* Disallow Network errors to configure a device's network part */
 828        if ((ns.conn >= C_TIMEOUT && ns.conn <= C_TEAR_DOWN) &&
 829            os.conn <= C_DISCONNECTING)
 830                ns.conn = os.conn;
 831
 832        /* After a network error (+C_TEAR_DOWN) only C_UNCONNECTED or C_DISCONNECTING can follow.
 833         * If you try to go into some Sync* state, that shall fail (elsewhere). */
 834        if (os.conn >= C_TIMEOUT && os.conn <= C_TEAR_DOWN &&
 835            ns.conn != C_UNCONNECTED && ns.conn != C_DISCONNECTING && ns.conn <= C_TEAR_DOWN)
 836                ns.conn = os.conn;
 837
 838        /* we cannot fail (again) if we already detached */
 839        if (ns.disk == D_FAILED && os.disk == D_DISKLESS)
 840                ns.disk = D_DISKLESS;
 841
 842        /* if we are only D_ATTACHING yet,
 843         * we can (and should) go directly to D_DISKLESS. */
 844        if (ns.disk == D_FAILED && os.disk == D_ATTACHING)
 845                ns.disk = D_DISKLESS;
 846
 847        /* After C_DISCONNECTING only C_STANDALONE may follow */
 848        if (os.conn == C_DISCONNECTING && ns.conn != C_STANDALONE)
 849                ns.conn = os.conn;
 850
 851        if (ns.conn < C_CONNECTED) {
 852                ns.peer_isp = 0;
 853                ns.peer = R_UNKNOWN;
 854                if (ns.pdsk > D_UNKNOWN || ns.pdsk < D_INCONSISTENT)
 855                        ns.pdsk = D_UNKNOWN;
 856        }
 857
 858        /* Clear the aftr_isp when becoming unconfigured */
 859        if (ns.conn == C_STANDALONE && ns.disk == D_DISKLESS && ns.role == R_SECONDARY)
 860                ns.aftr_isp = 0;
 861
 862        /* Abort resync if a disk fails/detaches */
 863        if (os.conn > C_CONNECTED && ns.conn > C_CONNECTED &&
 864            (ns.disk <= D_FAILED || ns.pdsk <= D_FAILED)) {
 865                if (warn_sync_abort)
 866                        *warn_sync_abort =
 867                                os.conn == C_VERIFY_S || os.conn == C_VERIFY_T ?
 868                                "Online-verify" : "Resync";
 869                ns.conn = C_CONNECTED;
 870        }
 871
 872        if (ns.conn >= C_CONNECTED &&
 873            ((ns.disk == D_CONSISTENT || ns.disk == D_OUTDATED) ||
 874             (ns.disk == D_NEGOTIATING && ns.conn == C_WF_BITMAP_T))) {
 875                switch (ns.conn) {
 876                case C_WF_BITMAP_T:
 877                case C_PAUSED_SYNC_T:
 878                        ns.disk = D_OUTDATED;
 879                        break;
 880                case C_CONNECTED:
 881                case C_WF_BITMAP_S:
 882                case C_SYNC_SOURCE:
 883                case C_PAUSED_SYNC_S:
 884                        ns.disk = D_UP_TO_DATE;
 885                        break;
 886                case C_SYNC_TARGET:
 887                        ns.disk = D_INCONSISTENT;
 888                        dev_warn(DEV, "Implicitly set disk state Inconsistent!\n");
 889                        break;
 890                }
 891                if (os.disk == D_OUTDATED && ns.disk == D_UP_TO_DATE)
 892                        dev_warn(DEV, "Implicitly set disk from Outdated to UpToDate\n");
 893        }
 894
 895        if (ns.conn >= C_CONNECTED &&
 896            (ns.pdsk == D_CONSISTENT || ns.pdsk == D_OUTDATED)) {
 897                switch (ns.conn) {
 898                case C_CONNECTED:
 899                case C_WF_BITMAP_T:
 900                case C_PAUSED_SYNC_T:
 901                case C_SYNC_TARGET:
 902                        ns.pdsk = D_UP_TO_DATE;
 903                        break;
 904                case C_WF_BITMAP_S:
 905                case C_PAUSED_SYNC_S:
 906                        /* remap any consistent state to D_OUTDATED,
 907                         * but disallow "upgrade" of not even consistent states.
 908                         */
 909                        ns.pdsk =
 910                                (D_DISKLESS < os.pdsk && os.pdsk < D_OUTDATED)
 911                                ? os.pdsk : D_OUTDATED;
 912                        break;
 913                case C_SYNC_SOURCE:
 914                        ns.pdsk = D_INCONSISTENT;
 915                        dev_warn(DEV, "Implicitly set pdsk Inconsistent!\n");
 916                        break;
 917                }
 918                if (os.pdsk == D_OUTDATED && ns.pdsk == D_UP_TO_DATE)
 919                        dev_warn(DEV, "Implicitly set pdsk from Outdated to UpToDate\n");
 920        }
 921
 922        /* Connection breaks down before we finished "Negotiating" */
 923        if (ns.conn < C_CONNECTED && ns.disk == D_NEGOTIATING &&
 924            get_ldev_if_state(mdev, D_NEGOTIATING)) {
 925                if (mdev->ed_uuid == mdev->ldev->md.uuid[UI_CURRENT]) {
 926                        ns.disk = mdev->new_state_tmp.disk;
 927                        ns.pdsk = mdev->new_state_tmp.pdsk;
 928                } else {
 929                        dev_alert(DEV, "Connection lost while negotiating, no data!\n");
 930                        ns.disk = D_DISKLESS;
 931                        ns.pdsk = D_UNKNOWN;
 932                }
 933                put_ldev(mdev);
 934        }
 935
 936        if (fp == FP_STONITH &&
 937            (ns.role == R_PRIMARY && ns.conn < C_CONNECTED && ns.pdsk > D_OUTDATED) &&
 938            !(os.role == R_PRIMARY && os.conn < C_CONNECTED && os.pdsk > D_OUTDATED))
 939                ns.susp_fen = 1; /* Suspend IO while fence-peer handler runs (peer lost) */
 940
 941        if (mdev->sync_conf.on_no_data == OND_SUSPEND_IO &&
 942            (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE) &&
 943            !(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE))
 944                ns.susp_nod = 1; /* Suspend IO while no data available (no accessible data available) */
 945
 946        if (ns.aftr_isp || ns.peer_isp || ns.user_isp) {
 947                if (ns.conn == C_SYNC_SOURCE)
 948                        ns.conn = C_PAUSED_SYNC_S;
 949                if (ns.conn == C_SYNC_TARGET)
 950                        ns.conn = C_PAUSED_SYNC_T;
 951        } else {
 952                if (ns.conn == C_PAUSED_SYNC_S)
 953                        ns.conn = C_SYNC_SOURCE;
 954                if (ns.conn == C_PAUSED_SYNC_T)
 955                        ns.conn = C_SYNC_TARGET;
 956        }
 957
 958        return ns;
 959}
 960
 961/* helper for __drbd_set_state */
 962static void set_ov_position(struct drbd_conf *mdev, enum drbd_conns cs)
 963{
 964        if (cs == C_VERIFY_T) {
 965                /* starting online verify from an arbitrary position
 966                 * does not fit well into the existing protocol.
 967                 * on C_VERIFY_T, we initialize ov_left and friends
 968                 * implicitly in receive_DataRequest once the
 969                 * first P_OV_REQUEST is received */
 970                mdev->ov_start_sector = ~(sector_t)0;
 971        } else {
 972                unsigned long bit = BM_SECT_TO_BIT(mdev->ov_start_sector);
 973                if (bit >= mdev->rs_total)
 974                        mdev->ov_start_sector =
 975                                BM_BIT_TO_SECT(mdev->rs_total - 1);
 976                mdev->ov_position = mdev->ov_start_sector;
 977        }
 978}
 979
 980static void drbd_resume_al(struct drbd_conf *mdev)
 981{
 982        if (test_and_clear_bit(AL_SUSPENDED, &mdev->flags))
 983                dev_info(DEV, "Resumed AL updates\n");
 984}
 985
 986/**
 987 * __drbd_set_state() - Set a new DRBD state
 988 * @mdev:       DRBD device.
 989 * @ns:         new state.
 990 * @flags:      Flags
 991 * @done:       Optional completion, that will get completed after the after_state_ch() finished
 992 *
 993 * Caller needs to hold req_lock, and global_state_lock. Do not call directly.
 994 */
 995int __drbd_set_state(struct drbd_conf *mdev,
 996                    union drbd_state ns, enum chg_state_flags flags,
 997                    struct completion *done)
 998{
 999        union drbd_state os;
1000        int rv = SS_SUCCESS;
1001        const char *warn_sync_abort = NULL;
1002        struct after_state_chg_work *ascw;
1003
1004        os = mdev->state;
1005
1006        ns = sanitize_state(mdev, os, ns, &warn_sync_abort);
1007
1008        if (ns.i == os.i)
1009                return SS_NOTHING_TO_DO;
1010
1011        if (!(flags & CS_HARD)) {
1012                /*  pre-state-change checks ; only look at ns  */
1013                /* See drbd_state_sw_errors in drbd_strings.c */
1014
1015                rv = is_valid_state(mdev, ns);
1016                if (rv < SS_SUCCESS) {
1017                        /* If the old state was illegal as well, then let
1018                           this happen...*/
1019
1020                        if (is_valid_state(mdev, os) == rv)
1021                                rv = is_valid_state_transition(mdev, ns, os);
1022                } else
1023                        rv = is_valid_state_transition(mdev, ns, os);
1024        }
1025
1026        if (rv < SS_SUCCESS) {
1027                if (flags & CS_VERBOSE)
1028                        print_st_err(mdev, os, ns, rv);
1029                return rv;
1030        }
1031
1032        if (warn_sync_abort)
1033                dev_warn(DEV, "%s aborted.\n", warn_sync_abort);
1034
1035        {
1036                char *pbp, pb[300];
1037                pbp = pb;
1038                *pbp = 0;
1039                PSC(role);
1040                PSC(peer);
1041                PSC(conn);
1042                PSC(disk);
1043                PSC(pdsk);
1044                if (is_susp(ns) != is_susp(os))
1045                        pbp += sprintf(pbp, "susp( %s -> %s ) ",
1046                                       drbd_susp_str(is_susp(os)),
1047                                       drbd_susp_str(is_susp(ns)));
1048                PSC(aftr_isp);
1049                PSC(peer_isp);
1050                PSC(user_isp);
1051                dev_info(DEV, "%s\n", pb);
1052        }
1053
1054        /* solve the race between becoming unconfigured,
1055         * worker doing the cleanup, and
1056         * admin reconfiguring us:
1057         * on (re)configure, first set CONFIG_PENDING,
1058         * then wait for a potentially exiting worker,
1059         * start the worker, and schedule one no_op.
1060         * then proceed with configuration.
1061         */
1062        if (ns.disk == D_DISKLESS &&
1063            ns.conn == C_STANDALONE &&
1064            ns.role == R_SECONDARY &&
1065            !test_and_set_bit(CONFIG_PENDING, &mdev->flags))
1066                set_bit(DEVICE_DYING, &mdev->flags);
1067
1068        /* if we are going -> D_FAILED or D_DISKLESS, grab one extra reference
1069         * on the ldev here, to be sure the transition -> D_DISKLESS resp.
1070         * drbd_ldev_destroy() won't happen before our corresponding
1071         * after_state_ch works run, where we put_ldev again. */
1072        if ((os.disk != D_FAILED && ns.disk == D_FAILED) ||
1073            (os.disk != D_DISKLESS && ns.disk == D_DISKLESS))
1074                atomic_inc(&mdev->local_cnt);
1075
1076        mdev->state = ns;
1077        wake_up(&mdev->misc_wait);
1078        wake_up(&mdev->state_wait);
1079
1080        /* aborted verify run. log the last position */
1081        if ((os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) &&
1082            ns.conn < C_CONNECTED) {
1083                mdev->ov_start_sector =
1084                        BM_BIT_TO_SECT(mdev->rs_total - mdev->ov_left);
1085                dev_info(DEV, "Online Verify reached sector %llu\n",
1086                        (unsigned long long)mdev->ov_start_sector);
1087        }
1088
1089        if ((os.conn == C_PAUSED_SYNC_T || os.conn == C_PAUSED_SYNC_S) &&
1090            (ns.conn == C_SYNC_TARGET  || ns.conn == C_SYNC_SOURCE)) {
1091                dev_info(DEV, "Syncer continues.\n");
1092                mdev->rs_paused += (long)jiffies
1093                                  -(long)mdev->rs_mark_time[mdev->rs_last_mark];
1094                if (ns.conn == C_SYNC_TARGET)
1095                        mod_timer(&mdev->resync_timer, jiffies);
1096        }
1097
1098        if ((os.conn == C_SYNC_TARGET  || os.conn == C_SYNC_SOURCE) &&
1099            (ns.conn == C_PAUSED_SYNC_T || ns.conn == C_PAUSED_SYNC_S)) {
1100                dev_info(DEV, "Resync suspended\n");
1101                mdev->rs_mark_time[mdev->rs_last_mark] = jiffies;
1102        }
1103
1104        if (os.conn == C_CONNECTED &&
1105            (ns.conn == C_VERIFY_S || ns.conn == C_VERIFY_T)) {
1106                unsigned long now = jiffies;
1107                int i;
1108
1109                mdev->ov_position = 0;
1110                mdev->rs_total = drbd_bm_bits(mdev);
1111                if (mdev->agreed_pro_version >= 90)
1112                        set_ov_position(mdev, ns.conn);
1113                else
1114                        mdev->ov_start_sector = 0;
1115                mdev->ov_left = mdev->rs_total
1116                              - BM_SECT_TO_BIT(mdev->ov_position);
1117                mdev->rs_start = now;
1118                mdev->rs_last_events = 0;
1119                mdev->rs_last_sect_ev = 0;
1120                mdev->ov_last_oos_size = 0;
1121                mdev->ov_last_oos_start = 0;
1122
1123                for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1124                        mdev->rs_mark_left[i] = mdev->rs_total;
1125                        mdev->rs_mark_time[i] = now;
1126                }
1127
1128                if (ns.conn == C_VERIFY_S) {
1129                        dev_info(DEV, "Starting Online Verify from sector %llu\n",
1130                                        (unsigned long long)mdev->ov_position);
1131                        mod_timer(&mdev->resync_timer, jiffies);
1132                }
1133        }
1134
1135        if (get_ldev(mdev)) {
1136                u32 mdf = mdev->ldev->md.flags & ~(MDF_CONSISTENT|MDF_PRIMARY_IND|
1137                                                 MDF_CONNECTED_IND|MDF_WAS_UP_TO_DATE|
1138                                                 MDF_PEER_OUT_DATED|MDF_CRASHED_PRIMARY);
1139
1140                if (test_bit(CRASHED_PRIMARY, &mdev->flags))
1141                        mdf |= MDF_CRASHED_PRIMARY;
1142                if (mdev->state.role == R_PRIMARY ||
1143                    (mdev->state.pdsk < D_INCONSISTENT && mdev->state.peer == R_PRIMARY))
1144                        mdf |= MDF_PRIMARY_IND;
1145                if (mdev->state.conn > C_WF_REPORT_PARAMS)
1146                        mdf |= MDF_CONNECTED_IND;
1147                if (mdev->state.disk > D_INCONSISTENT)
1148                        mdf |= MDF_CONSISTENT;
1149                if (mdev->state.disk > D_OUTDATED)
1150                        mdf |= MDF_WAS_UP_TO_DATE;
1151                if (mdev->state.pdsk <= D_OUTDATED && mdev->state.pdsk >= D_INCONSISTENT)
1152                        mdf |= MDF_PEER_OUT_DATED;
1153                if (mdf != mdev->ldev->md.flags) {
1154                        mdev->ldev->md.flags = mdf;
1155                        drbd_md_mark_dirty(mdev);
1156                }
1157                if (os.disk < D_CONSISTENT && ns.disk >= D_CONSISTENT)
1158                        drbd_set_ed_uuid(mdev, mdev->ldev->md.uuid[UI_CURRENT]);
1159                put_ldev(mdev);
1160        }
1161
1162        /* Peer was forced D_UP_TO_DATE & R_PRIMARY, consider to resync */
1163        if (os.disk == D_INCONSISTENT && os.pdsk == D_INCONSISTENT &&
1164            os.peer == R_SECONDARY && ns.peer == R_PRIMARY)
1165                set_bit(CONSIDER_RESYNC, &mdev->flags);
1166
1167        /* Receiver should clean up itself */
1168        if (os.conn != C_DISCONNECTING && ns.conn == C_DISCONNECTING)
1169                drbd_thread_stop_nowait(&mdev->receiver);
1170
1171        /* Now the receiver finished cleaning up itself, it should die */
1172        if (os.conn != C_STANDALONE && ns.conn == C_STANDALONE)
1173                drbd_thread_stop_nowait(&mdev->receiver);
1174
1175        /* Upon network failure, we need to restart the receiver. */
1176        if (os.conn > C_TEAR_DOWN &&
1177            ns.conn <= C_TEAR_DOWN && ns.conn >= C_TIMEOUT)
1178                drbd_thread_restart_nowait(&mdev->receiver);
1179
1180        /* Resume AL writing if we get a connection */
1181        if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED)
1182                drbd_resume_al(mdev);
1183
1184        ascw = kmalloc(sizeof(*ascw), GFP_ATOMIC);
1185        if (ascw) {
1186                ascw->os = os;
1187                ascw->ns = ns;
1188                ascw->flags = flags;
1189                ascw->w.cb = w_after_state_ch;
1190                ascw->done = done;
1191                drbd_queue_work(&mdev->data.work, &ascw->w);
1192        } else {
1193                dev_warn(DEV, "Could not kmalloc an ascw\n");
1194        }
1195
1196        return rv;
1197}
1198
1199static int w_after_state_ch(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1200{
1201        struct after_state_chg_work *ascw =
1202                container_of(w, struct after_state_chg_work, w);
1203        after_state_ch(mdev, ascw->os, ascw->ns, ascw->flags);
1204        if (ascw->flags & CS_WAIT_COMPLETE) {
1205                D_ASSERT(ascw->done != NULL);
1206                complete(ascw->done);
1207        }
1208        kfree(ascw);
1209
1210        return 1;
1211}
1212
1213static void abw_start_sync(struct drbd_conf *mdev, int rv)
1214{
1215        if (rv) {
1216                dev_err(DEV, "Writing the bitmap failed not starting resync.\n");
1217                _drbd_request_state(mdev, NS(conn, C_CONNECTED), CS_VERBOSE);
1218                return;
1219        }
1220
1221        switch (mdev->state.conn) {
1222        case C_STARTING_SYNC_T:
1223                _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
1224                break;
1225        case C_STARTING_SYNC_S:
1226                drbd_start_resync(mdev, C_SYNC_SOURCE);
1227                break;
1228        }
1229}
1230
1231/**
1232 * after_state_ch() - Perform after state change actions that may sleep
1233 * @mdev:       DRBD device.
1234 * @os:         old state.
1235 * @ns:         new state.
1236 * @flags:      Flags
1237 */
1238static void after_state_ch(struct drbd_conf *mdev, union drbd_state os,
1239                           union drbd_state ns, enum chg_state_flags flags)
1240{
1241        enum drbd_fencing_p fp;
1242        enum drbd_req_event what = nothing;
1243        union drbd_state nsm = (union drbd_state){ .i = -1 };
1244
1245        if (os.conn != C_CONNECTED && ns.conn == C_CONNECTED) {
1246                clear_bit(CRASHED_PRIMARY, &mdev->flags);
1247                if (mdev->p_uuid)
1248                        mdev->p_uuid[UI_FLAGS] &= ~((u64)2);
1249        }
1250
1251        fp = FP_DONT_CARE;
1252        if (get_ldev(mdev)) {
1253                fp = mdev->ldev->dc.fencing;
1254                put_ldev(mdev);
1255        }
1256
1257        /* Inform userspace about the change... */
1258        drbd_bcast_state(mdev, ns);
1259
1260        if (!(os.role == R_PRIMARY && os.disk < D_UP_TO_DATE && os.pdsk < D_UP_TO_DATE) &&
1261            (ns.role == R_PRIMARY && ns.disk < D_UP_TO_DATE && ns.pdsk < D_UP_TO_DATE))
1262                drbd_khelper(mdev, "pri-on-incon-degr");
1263
1264        /* Here we have the actions that are performed after a
1265           state change. This function might sleep */
1266
1267        nsm.i = -1;
1268        if (ns.susp_nod) {
1269                if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1270                        if (ns.conn == C_CONNECTED)
1271                                what = resend, nsm.susp_nod = 0;
1272                        else /* ns.conn > C_CONNECTED */
1273                                dev_err(DEV, "Unexpected Resynd going on!\n");
1274                }
1275
1276                if (os.disk == D_ATTACHING && ns.disk > D_ATTACHING)
1277                        what = restart_frozen_disk_io, nsm.susp_nod = 0;
1278
1279        }
1280
1281        if (ns.susp_fen) {
1282                /* case1: The outdate peer handler is successful: */
1283                if (os.pdsk > D_OUTDATED  && ns.pdsk <= D_OUTDATED) {
1284                        tl_clear(mdev);
1285                        if (test_bit(NEW_CUR_UUID, &mdev->flags)) {
1286                                drbd_uuid_new_current(mdev);
1287                                clear_bit(NEW_CUR_UUID, &mdev->flags);
1288                        }
1289                        spin_lock_irq(&mdev->req_lock);
1290                        _drbd_set_state(_NS(mdev, susp_fen, 0), CS_VERBOSE, NULL);
1291                        spin_unlock_irq(&mdev->req_lock);
1292                }
1293                /* case2: The connection was established again: */
1294                if (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED) {
1295                        clear_bit(NEW_CUR_UUID, &mdev->flags);
1296                        what = resend;
1297                        nsm.susp_fen = 0;
1298                }
1299        }
1300
1301        if (what != nothing) {
1302                spin_lock_irq(&mdev->req_lock);
1303                _tl_restart(mdev, what);
1304                nsm.i &= mdev->state.i;
1305                _drbd_set_state(mdev, nsm, CS_VERBOSE, NULL);
1306                spin_unlock_irq(&mdev->req_lock);
1307        }
1308
1309        /* Do not change the order of the if above and the two below... */
1310        if (os.pdsk == D_DISKLESS && ns.pdsk > D_DISKLESS) {      /* attach on the peer */
1311                drbd_send_uuids(mdev);
1312                drbd_send_state(mdev);
1313        }
1314        if (os.conn != C_WF_BITMAP_S && ns.conn == C_WF_BITMAP_S)
1315                drbd_queue_bitmap_io(mdev, &drbd_send_bitmap, NULL, "send_bitmap (WFBitMapS)");
1316
1317        /* Lost contact to peer's copy of the data */
1318        if ((os.pdsk >= D_INCONSISTENT &&
1319             os.pdsk != D_UNKNOWN &&
1320             os.pdsk != D_OUTDATED)
1321        &&  (ns.pdsk < D_INCONSISTENT ||
1322             ns.pdsk == D_UNKNOWN ||
1323             ns.pdsk == D_OUTDATED)) {
1324                if (get_ldev(mdev)) {
1325                        if ((ns.role == R_PRIMARY || ns.peer == R_PRIMARY) &&
1326                            mdev->ldev->md.uuid[UI_BITMAP] == 0 && ns.disk >= D_UP_TO_DATE) {
1327                                if (is_susp(mdev->state)) {
1328                                        set_bit(NEW_CUR_UUID, &mdev->flags);
1329                                } else {
1330                                        drbd_uuid_new_current(mdev);
1331                                        drbd_send_uuids(mdev);
1332                                }
1333                        }
1334                        put_ldev(mdev);
1335                }
1336        }
1337
1338        if (ns.pdsk < D_INCONSISTENT && get_ldev(mdev)) {
1339                if (ns.peer == R_PRIMARY && mdev->ldev->md.uuid[UI_BITMAP] == 0) {
1340                        drbd_uuid_new_current(mdev);
1341                        drbd_send_uuids(mdev);
1342                }
1343
1344                /* D_DISKLESS Peer becomes secondary */
1345                if (os.peer == R_PRIMARY && ns.peer == R_SECONDARY)
1346                        drbd_al_to_on_disk_bm(mdev);
1347                put_ldev(mdev);
1348        }
1349
1350        /* Last part of the attaching process ... */
1351        if (ns.conn >= C_CONNECTED &&
1352            os.disk == D_ATTACHING && ns.disk == D_NEGOTIATING) {
1353                drbd_send_sizes(mdev, 0, 0);  /* to start sync... */
1354                drbd_send_uuids(mdev);
1355                drbd_send_state(mdev);
1356        }
1357
1358        /* We want to pause/continue resync, tell peer. */
1359        if (ns.conn >= C_CONNECTED &&
1360             ((os.aftr_isp != ns.aftr_isp) ||
1361              (os.user_isp != ns.user_isp)))
1362                drbd_send_state(mdev);
1363
1364        /* In case one of the isp bits got set, suspend other devices. */
1365        if ((!os.aftr_isp && !os.peer_isp && !os.user_isp) &&
1366            (ns.aftr_isp || ns.peer_isp || ns.user_isp))
1367                suspend_other_sg(mdev);
1368
1369        /* Make sure the peer gets informed about eventual state
1370           changes (ISP bits) while we were in WFReportParams. */
1371        if (os.conn == C_WF_REPORT_PARAMS && ns.conn >= C_CONNECTED)
1372                drbd_send_state(mdev);
1373
1374        /* We are in the progress to start a full sync... */
1375        if ((os.conn != C_STARTING_SYNC_T && ns.conn == C_STARTING_SYNC_T) ||
1376            (os.conn != C_STARTING_SYNC_S && ns.conn == C_STARTING_SYNC_S))
1377                drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, &abw_start_sync, "set_n_write from StartingSync");
1378
1379        /* We are invalidating our self... */
1380        if (os.conn < C_CONNECTED && ns.conn < C_CONNECTED &&
1381            os.disk > D_INCONSISTENT && ns.disk == D_INCONSISTENT)
1382                drbd_queue_bitmap_io(mdev, &drbd_bmio_set_n_write, NULL, "set_n_write from invalidate");
1383
1384        /* first half of local IO error, failure to attach,
1385         * or administrative detach */
1386        if (os.disk != D_FAILED && ns.disk == D_FAILED) {
1387                enum drbd_io_error_p eh;
1388                int was_io_error;
1389                /* corresponding get_ldev was in __drbd_set_state, to serialize
1390                 * our cleanup here with the transition to D_DISKLESS,
1391                 * so it is safe to dreference ldev here. */
1392                eh = mdev->ldev->dc.on_io_error;
1393                was_io_error = test_and_clear_bit(WAS_IO_ERROR, &mdev->flags);
1394
1395                /* current state still has to be D_FAILED,
1396                 * there is only one way out: to D_DISKLESS,
1397                 * and that may only happen after our put_ldev below. */
1398                if (mdev->state.disk != D_FAILED)
1399                        dev_err(DEV,
1400                                "ASSERT FAILED: disk is %s during detach\n",
1401                                drbd_disk_str(mdev->state.disk));
1402
1403                if (drbd_send_state(mdev))
1404                        dev_warn(DEV, "Notified peer that I am detaching my disk\n");
1405                else
1406                        dev_err(DEV, "Sending state for detaching disk failed\n");
1407
1408                drbd_rs_cancel_all(mdev);
1409
1410                /* In case we want to get something to stable storage still,
1411                 * this may be the last chance.
1412                 * Following put_ldev may transition to D_DISKLESS. */
1413                drbd_md_sync(mdev);
1414                put_ldev(mdev);
1415
1416                if (was_io_error && eh == EP_CALL_HELPER)
1417                        drbd_khelper(mdev, "local-io-error");
1418        }
1419
1420        /* second half of local IO error, failure to attach,
1421         * or administrative detach,
1422         * after local_cnt references have reached zero again */
1423        if (os.disk != D_DISKLESS && ns.disk == D_DISKLESS) {
1424                /* We must still be diskless,
1425                 * re-attach has to be serialized with this! */
1426                if (mdev->state.disk != D_DISKLESS)
1427                        dev_err(DEV,
1428                                "ASSERT FAILED: disk is %s while going diskless\n",
1429                                drbd_disk_str(mdev->state.disk));
1430
1431                mdev->rs_total = 0;
1432                mdev->rs_failed = 0;
1433                atomic_set(&mdev->rs_pending_cnt, 0);
1434
1435                if (drbd_send_state(mdev))
1436                        dev_warn(DEV, "Notified peer that I'm now diskless.\n");
1437                else
1438                        dev_err(DEV, "Sending state for being diskless failed\n");
1439                /* corresponding get_ldev in __drbd_set_state
1440                 * this may finaly trigger drbd_ldev_destroy. */
1441                put_ldev(mdev);
1442        }
1443
1444        /* Disks got bigger while they were detached */
1445        if (ns.disk > D_NEGOTIATING && ns.pdsk > D_NEGOTIATING &&
1446            test_and_clear_bit(RESYNC_AFTER_NEG, &mdev->flags)) {
1447                if (ns.conn == C_CONNECTED)
1448                        resync_after_online_grow(mdev);
1449        }
1450
1451        /* A resync finished or aborted, wake paused devices... */
1452        if ((os.conn > C_CONNECTED && ns.conn <= C_CONNECTED) ||
1453            (os.peer_isp && !ns.peer_isp) ||
1454            (os.user_isp && !ns.user_isp))
1455                resume_next_sg(mdev);
1456
1457        /* sync target done with resync.  Explicitly notify peer, even though
1458         * it should (at least for non-empty resyncs) already know itself. */
1459        if (os.disk < D_UP_TO_DATE && os.conn >= C_SYNC_SOURCE && ns.conn == C_CONNECTED)
1460                drbd_send_state(mdev);
1461
1462        /* free tl_hash if we Got thawed and are C_STANDALONE */
1463        if (ns.conn == C_STANDALONE && !is_susp(ns) && mdev->tl_hash)
1464                drbd_free_tl_hash(mdev);
1465
1466        /* Upon network connection, we need to start the receiver */
1467        if (os.conn == C_STANDALONE && ns.conn == C_UNCONNECTED)
1468                drbd_thread_start(&mdev->receiver);
1469
1470        /* Terminate worker thread if we are unconfigured - it will be
1471           restarted as needed... */
1472        if (ns.disk == D_DISKLESS &&
1473            ns.conn == C_STANDALONE &&
1474            ns.role == R_SECONDARY) {
1475                if (os.aftr_isp != ns.aftr_isp)
1476                        resume_next_sg(mdev);
1477                /* set in __drbd_set_state, unless CONFIG_PENDING was set */
1478                if (test_bit(DEVICE_DYING, &mdev->flags))
1479                        drbd_thread_stop_nowait(&mdev->worker);
1480        }
1481
1482        drbd_md_sync(mdev);
1483}
1484
1485
1486static int drbd_thread_setup(void *arg)
1487{
1488        struct drbd_thread *thi = (struct drbd_thread *) arg;
1489        struct drbd_conf *mdev = thi->mdev;
1490        unsigned long flags;
1491        int retval;
1492
1493restart:
1494        retval = thi->function(thi);
1495
1496        spin_lock_irqsave(&thi->t_lock, flags);
1497
1498        /* if the receiver has been "Exiting", the last thing it did
1499         * was set the conn state to "StandAlone",
1500         * if now a re-connect request comes in, conn state goes C_UNCONNECTED,
1501         * and receiver thread will be "started".
1502         * drbd_thread_start needs to set "Restarting" in that case.
1503         * t_state check and assignment needs to be within the same spinlock,
1504         * so either thread_start sees Exiting, and can remap to Restarting,
1505         * or thread_start see None, and can proceed as normal.
1506         */
1507
1508        if (thi->t_state == Restarting) {
1509                dev_info(DEV, "Restarting %s\n", current->comm);
1510                thi->t_state = Running;
1511                spin_unlock_irqrestore(&thi->t_lock, flags);
1512                goto restart;
1513        }
1514
1515        thi->task = NULL;
1516        thi->t_state = None;
1517        smp_mb();
1518        complete(&thi->stop);
1519        spin_unlock_irqrestore(&thi->t_lock, flags);
1520
1521        dev_info(DEV, "Terminating %s\n", current->comm);
1522
1523        /* Release mod reference taken when thread was started */
1524        module_put(THIS_MODULE);
1525        return retval;
1526}
1527
1528static void drbd_thread_init(struct drbd_conf *mdev, struct drbd_thread *thi,
1529                      int (*func) (struct drbd_thread *))
1530{
1531        spin_lock_init(&thi->t_lock);
1532        thi->task    = NULL;
1533        thi->t_state = None;
1534        thi->function = func;
1535        thi->mdev = mdev;
1536}
1537
1538int drbd_thread_start(struct drbd_thread *thi)
1539{
1540        struct drbd_conf *mdev = thi->mdev;
1541        struct task_struct *nt;
1542        unsigned long flags;
1543
1544        const char *me =
1545                thi == &mdev->receiver ? "receiver" :
1546                thi == &mdev->asender  ? "asender"  :
1547                thi == &mdev->worker   ? "worker"   : "NONSENSE";
1548
1549        /* is used from state engine doing drbd_thread_stop_nowait,
1550         * while holding the req lock irqsave */
1551        spin_lock_irqsave(&thi->t_lock, flags);
1552
1553        switch (thi->t_state) {
1554        case None:
1555                dev_info(DEV, "Starting %s thread (from %s [%d])\n",
1556                                me, current->comm, current->pid);
1557
1558                /* Get ref on module for thread - this is released when thread exits */
1559                if (!try_module_get(THIS_MODULE)) {
1560                        dev_err(DEV, "Failed to get module reference in drbd_thread_start\n");
1561                        spin_unlock_irqrestore(&thi->t_lock, flags);
1562                        return FALSE;
1563                }
1564
1565                init_completion(&thi->stop);
1566                D_ASSERT(thi->task == NULL);
1567                thi->reset_cpu_mask = 1;
1568                thi->t_state = Running;
1569                spin_unlock_irqrestore(&thi->t_lock, flags);
1570                flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
1571
1572                nt = kthread_create(drbd_thread_setup, (void *) thi,
1573                                    "drbd%d_%s", mdev_to_minor(mdev), me);
1574
1575                if (IS_ERR(nt)) {
1576                        dev_err(DEV, "Couldn't start thread\n");
1577
1578                        module_put(THIS_MODULE);
1579                        return FALSE;
1580                }
1581                spin_lock_irqsave(&thi->t_lock, flags);
1582                thi->task = nt;
1583                thi->t_state = Running;
1584                spin_unlock_irqrestore(&thi->t_lock, flags);
1585                wake_up_process(nt);
1586                break;
1587        case Exiting:
1588                thi->t_state = Restarting;
1589                dev_info(DEV, "Restarting %s thread (from %s [%d])\n",
1590                                me, current->comm, current->pid);
1591                /* fall through */
1592        case Running:
1593        case Restarting:
1594        default:
1595                spin_unlock_irqrestore(&thi->t_lock, flags);
1596                break;
1597        }
1598
1599        return TRUE;
1600}
1601
1602
1603void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
1604{
1605        unsigned long flags;
1606
1607        enum drbd_thread_state ns = restart ? Restarting : Exiting;
1608
1609        /* may be called from state engine, holding the req lock irqsave */
1610        spin_lock_irqsave(&thi->t_lock, flags);
1611
1612        if (thi->t_state == None) {
1613                spin_unlock_irqrestore(&thi->t_lock, flags);
1614                if (restart)
1615                        drbd_thread_start(thi);
1616                return;
1617        }
1618
1619        if (thi->t_state != ns) {
1620                if (thi->task == NULL) {
1621                        spin_unlock_irqrestore(&thi->t_lock, flags);
1622                        return;
1623                }
1624
1625                thi->t_state = ns;
1626                smp_mb();
1627                init_completion(&thi->stop);
1628                if (thi->task != current)
1629                        force_sig(DRBD_SIGKILL, thi->task);
1630
1631        }
1632
1633        spin_unlock_irqrestore(&thi->t_lock, flags);
1634
1635        if (wait)
1636                wait_for_completion(&thi->stop);
1637}
1638
1639#ifdef CONFIG_SMP
1640/**
1641 * drbd_calc_cpu_mask() - Generate CPU masks, spread over all CPUs
1642 * @mdev:       DRBD device.
1643 *
1644 * Forces all threads of a device onto the same CPU. This is beneficial for
1645 * DRBD's performance. May be overwritten by user's configuration.
1646 */
1647void drbd_calc_cpu_mask(struct drbd_conf *mdev)
1648{
1649        int ord, cpu;
1650
1651        /* user override. */
1652        if (cpumask_weight(mdev->cpu_mask))
1653                return;
1654
1655        ord = mdev_to_minor(mdev) % cpumask_weight(cpu_online_mask);
1656        for_each_online_cpu(cpu) {
1657                if (ord-- == 0) {
1658                        cpumask_set_cpu(cpu, mdev->cpu_mask);
1659                        return;
1660                }
1661        }
1662        /* should not be reached */
1663        cpumask_setall(mdev->cpu_mask);
1664}
1665
1666/**
1667 * drbd_thread_current_set_cpu() - modifies the cpu mask of the _current_ thread
1668 * @mdev:       DRBD device.
1669 *
1670 * call in the "main loop" of _all_ threads, no need for any mutex, current won't die
1671 * prematurely.
1672 */
1673void drbd_thread_current_set_cpu(struct drbd_conf *mdev)
1674{
1675        struct task_struct *p = current;
1676        struct drbd_thread *thi =
1677                p == mdev->asender.task  ? &mdev->asender  :
1678                p == mdev->receiver.task ? &mdev->receiver :
1679                p == mdev->worker.task   ? &mdev->worker   :
1680                NULL;
1681        ERR_IF(thi == NULL)
1682                return;
1683        if (!thi->reset_cpu_mask)
1684                return;
1685        thi->reset_cpu_mask = 0;
1686        set_cpus_allowed_ptr(p, mdev->cpu_mask);
1687}
1688#endif
1689
1690/* the appropriate socket mutex must be held already */
1691int _drbd_send_cmd(struct drbd_conf *mdev, struct socket *sock,
1692                          enum drbd_packets cmd, struct p_header80 *h,
1693                          size_t size, unsigned msg_flags)
1694{
1695        int sent, ok;
1696
1697        ERR_IF(!h) return FALSE;
1698        ERR_IF(!size) return FALSE;
1699
1700        h->magic   = BE_DRBD_MAGIC;
1701        h->command = cpu_to_be16(cmd);
1702        h->length  = cpu_to_be16(size-sizeof(struct p_header80));
1703
1704        sent = drbd_send(mdev, sock, h, size, msg_flags);
1705
1706        ok = (sent == size);
1707        if (!ok)
1708                dev_err(DEV, "short sent %s size=%d sent=%d\n",
1709                    cmdname(cmd), (int)size, sent);
1710        return ok;
1711}
1712
1713/* don't pass the socket. we may only look at it
1714 * when we hold the appropriate socket mutex.
1715 */
1716int drbd_send_cmd(struct drbd_conf *mdev, int use_data_socket,
1717                  enum drbd_packets cmd, struct p_header80 *h, size_t size)
1718{
1719        int ok = 0;
1720        struct socket *sock;
1721
1722        if (use_data_socket) {
1723                mutex_lock(&mdev->data.mutex);
1724                sock = mdev->data.socket;
1725        } else {
1726                mutex_lock(&mdev->meta.mutex);
1727                sock = mdev->meta.socket;
1728        }
1729
1730        /* drbd_disconnect() could have called drbd_free_sock()
1731         * while we were waiting in down()... */
1732        if (likely(sock != NULL))
1733                ok = _drbd_send_cmd(mdev, sock, cmd, h, size, 0);
1734
1735        if (use_data_socket)
1736                mutex_unlock(&mdev->data.mutex);
1737        else
1738                mutex_unlock(&mdev->meta.mutex);
1739        return ok;
1740}
1741
1742int drbd_send_cmd2(struct drbd_conf *mdev, enum drbd_packets cmd, char *data,
1743                   size_t size)
1744{
1745        struct p_header80 h;
1746        int ok;
1747
1748        h.magic   = BE_DRBD_MAGIC;
1749        h.command = cpu_to_be16(cmd);
1750        h.length  = cpu_to_be16(size);
1751
1752        if (!drbd_get_data_sock(mdev))
1753                return 0;
1754
1755        ok = (sizeof(h) ==
1756                drbd_send(mdev, mdev->data.socket, &h, sizeof(h), 0));
1757        ok = ok && (size ==
1758                drbd_send(mdev, mdev->data.socket, data, size, 0));
1759
1760        drbd_put_data_sock(mdev);
1761
1762        return ok;
1763}
1764
1765int drbd_send_sync_param(struct drbd_conf *mdev, struct syncer_conf *sc)
1766{
1767        struct p_rs_param_95 *p;
1768        struct socket *sock;
1769        int size, rv;
1770        const int apv = mdev->agreed_pro_version;
1771
1772        size = apv <= 87 ? sizeof(struct p_rs_param)
1773                : apv == 88 ? sizeof(struct p_rs_param)
1774                        + strlen(mdev->sync_conf.verify_alg) + 1
1775                : apv <= 94 ? sizeof(struct p_rs_param_89)
1776                : /* apv >= 95 */ sizeof(struct p_rs_param_95);
1777
1778        /* used from admin command context and receiver/worker context.
1779         * to avoid kmalloc, grab the socket right here,
1780         * then use the pre-allocated sbuf there */
1781        mutex_lock(&mdev->data.mutex);
1782        sock = mdev->data.socket;
1783
1784        if (likely(sock != NULL)) {
1785                enum drbd_packets cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
1786
1787                p = &mdev->data.sbuf.rs_param_95;
1788
1789                /* initialize verify_alg and csums_alg */
1790                memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
1791
1792                p->rate = cpu_to_be32(sc->rate);
1793                p->c_plan_ahead = cpu_to_be32(sc->c_plan_ahead);
1794                p->c_delay_target = cpu_to_be32(sc->c_delay_target);
1795                p->c_fill_target = cpu_to_be32(sc->c_fill_target);
1796                p->c_max_rate = cpu_to_be32(sc->c_max_rate);
1797
1798                if (apv >= 88)
1799                        strcpy(p->verify_alg, mdev->sync_conf.verify_alg);
1800                if (apv >= 89)
1801                        strcpy(p->csums_alg, mdev->sync_conf.csums_alg);
1802
1803                rv = _drbd_send_cmd(mdev, sock, cmd, &p->head, size, 0);
1804        } else
1805                rv = 0; /* not ok */
1806
1807        mutex_unlock(&mdev->data.mutex);
1808
1809        return rv;
1810}
1811
1812int drbd_send_protocol(struct drbd_conf *mdev)
1813{
1814        struct p_protocol *p;
1815        int size, cf, rv;
1816
1817        size = sizeof(struct p_protocol);
1818
1819        if (mdev->agreed_pro_version >= 87)
1820                size += strlen(mdev->net_conf->integrity_alg) + 1;
1821
1822        /* we must not recurse into our own queue,
1823         * as that is blocked during handshake */
1824        p = kmalloc(size, GFP_NOIO);
1825        if (p == NULL)
1826                return 0;
1827
1828        p->protocol      = cpu_to_be32(mdev->net_conf->wire_protocol);
1829        p->after_sb_0p   = cpu_to_be32(mdev->net_conf->after_sb_0p);
1830        p->after_sb_1p   = cpu_to_be32(mdev->net_conf->after_sb_1p);
1831        p->after_sb_2p   = cpu_to_be32(mdev->net_conf->after_sb_2p);
1832        p->two_primaries = cpu_to_be32(mdev->net_conf->two_primaries);
1833
1834        cf = 0;
1835        if (mdev->net_conf->want_lose)
1836                cf |= CF_WANT_LOSE;
1837        if (mdev->net_conf->dry_run) {
1838                if (mdev->agreed_pro_version >= 92)
1839                        cf |= CF_DRY_RUN;
1840                else {
1841                        dev_err(DEV, "--dry-run is not supported by peer");
1842                        kfree(p);
1843                        return 0;
1844                }
1845        }
1846        p->conn_flags    = cpu_to_be32(cf);
1847
1848        if (mdev->agreed_pro_version >= 87)
1849                strcpy(p->integrity_alg, mdev->net_conf->integrity_alg);
1850
1851        rv = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_PROTOCOL,
1852                           (struct p_header80 *)p, size);
1853        kfree(p);
1854        return rv;
1855}
1856
1857int _drbd_send_uuids(struct drbd_conf *mdev, u64 uuid_flags)
1858{
1859        struct p_uuids p;
1860        int i;
1861
1862        if (!get_ldev_if_state(mdev, D_NEGOTIATING))
1863                return 1;
1864
1865        for (i = UI_CURRENT; i < UI_SIZE; i++)
1866                p.uuid[i] = mdev->ldev ? cpu_to_be64(mdev->ldev->md.uuid[i]) : 0;
1867
1868        mdev->comm_bm_set = drbd_bm_total_weight(mdev);
1869        p.uuid[UI_SIZE] = cpu_to_be64(mdev->comm_bm_set);
1870        uuid_flags |= mdev->net_conf->want_lose ? 1 : 0;
1871        uuid_flags |= test_bit(CRASHED_PRIMARY, &mdev->flags) ? 2 : 0;
1872        uuid_flags |= mdev->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
1873        p.uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
1874
1875        put_ldev(mdev);
1876
1877        return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_UUIDS,
1878                             (struct p_header80 *)&p, sizeof(p));
1879}
1880
1881int drbd_send_uuids(struct drbd_conf *mdev)
1882{
1883        return _drbd_send_uuids(mdev, 0);
1884}
1885
1886int drbd_send_uuids_skip_initial_sync(struct drbd_conf *mdev)
1887{
1888        return _drbd_send_uuids(mdev, 8);
1889}
1890
1891
1892int drbd_send_sync_uuid(struct drbd_conf *mdev, u64 val)
1893{
1894        struct p_rs_uuid p;
1895
1896        p.uuid = cpu_to_be64(val);
1897
1898        return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SYNC_UUID,
1899                             (struct p_header80 *)&p, sizeof(p));
1900}
1901
1902int drbd_send_sizes(struct drbd_conf *mdev, int trigger_reply, enum dds_flags flags)
1903{
1904        struct p_sizes p;
1905        sector_t d_size, u_size;
1906        int q_order_type;
1907        int ok;
1908
1909        if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
1910                D_ASSERT(mdev->ldev->backing_bdev);
1911                d_size = drbd_get_max_capacity(mdev->ldev);
1912                u_size = mdev->ldev->dc.disk_size;
1913                q_order_type = drbd_queue_order_type(mdev);
1914                put_ldev(mdev);
1915        } else {
1916                d_size = 0;
1917                u_size = 0;
1918                q_order_type = QUEUE_ORDERED_NONE;
1919        }
1920
1921        p.d_size = cpu_to_be64(d_size);
1922        p.u_size = cpu_to_be64(u_size);
1923        p.c_size = cpu_to_be64(trigger_reply ? 0 : drbd_get_capacity(mdev->this_bdev));
1924        p.max_segment_size = cpu_to_be32(queue_max_segment_size(mdev->rq_queue));
1925        p.queue_order_type = cpu_to_be16(q_order_type);
1926        p.dds_flags = cpu_to_be16(flags);
1927
1928        ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_SIZES,
1929                           (struct p_header80 *)&p, sizeof(p));
1930        return ok;
1931}
1932
1933/**
1934 * drbd_send_state() - Sends the drbd state to the peer
1935 * @mdev:       DRBD device.
1936 */
1937int drbd_send_state(struct drbd_conf *mdev)
1938{
1939        struct socket *sock;
1940        struct p_state p;
1941        int ok = 0;
1942
1943        /* Grab state lock so we wont send state if we're in the middle
1944         * of a cluster wide state change on another thread */
1945        drbd_state_lock(mdev);
1946
1947        mutex_lock(&mdev->data.mutex);
1948
1949        p.state = cpu_to_be32(mdev->state.i); /* Within the send mutex */
1950        sock = mdev->data.socket;
1951
1952        if (likely(sock != NULL)) {
1953                ok = _drbd_send_cmd(mdev, sock, P_STATE,
1954                                    (struct p_header80 *)&p, sizeof(p), 0);
1955        }
1956
1957        mutex_unlock(&mdev->data.mutex);
1958
1959        drbd_state_unlock(mdev);
1960        return ok;
1961}
1962
1963int drbd_send_state_req(struct drbd_conf *mdev,
1964        union drbd_state mask, union drbd_state val)
1965{
1966        struct p_req_state p;
1967
1968        p.mask    = cpu_to_be32(mask.i);
1969        p.val     = cpu_to_be32(val.i);
1970
1971        return drbd_send_cmd(mdev, USE_DATA_SOCKET, P_STATE_CHG_REQ,
1972                             (struct p_header80 *)&p, sizeof(p));
1973}
1974
1975int drbd_send_sr_reply(struct drbd_conf *mdev, int retcode)
1976{
1977        struct p_req_state_reply p;
1978
1979        p.retcode    = cpu_to_be32(retcode);
1980
1981        return drbd_send_cmd(mdev, USE_META_SOCKET, P_STATE_CHG_REPLY,
1982                             (struct p_header80 *)&p, sizeof(p));
1983}
1984
1985int fill_bitmap_rle_bits(struct drbd_conf *mdev,
1986        struct p_compressed_bm *p,
1987        struct bm_xfer_ctx *c)
1988{
1989        struct bitstream bs;
1990        unsigned long plain_bits;
1991        unsigned long tmp;
1992        unsigned long rl;
1993        unsigned len;
1994        unsigned toggle;
1995        int bits;
1996
1997        /* may we use this feature? */
1998        if ((mdev->sync_conf.use_rle == 0) ||
1999                (mdev->agreed_pro_version < 90))
2000                        return 0;
2001
2002        if (c->bit_offset >= c->bm_bits)
2003                return 0; /* nothing to do. */
2004
2005        /* use at most thus many bytes */
2006        bitstream_init(&bs, p->code, BM_PACKET_VLI_BYTES_MAX, 0);
2007        memset(p->code, 0, BM_PACKET_VLI_BYTES_MAX);
2008        /* plain bits covered in this code string */
2009        plain_bits = 0;
2010
2011        /* p->encoding & 0x80 stores whether the first run length is set.
2012         * bit offset is implicit.
2013         * start with toggle == 2 to be able to tell the first iteration */
2014        toggle = 2;
2015
2016        /* see how much plain bits we can stuff into one packet
2017         * using RLE and VLI. */
2018        do {
2019                tmp = (toggle == 0) ? _drbd_bm_find_next_zero(mdev, c->bit_offset)
2020                                    : _drbd_bm_find_next(mdev, c->bit_offset);
2021                if (tmp == -1UL)
2022                        tmp = c->bm_bits;
2023                rl = tmp - c->bit_offset;
2024
2025                if (toggle == 2) { /* first iteration */
2026                        if (rl == 0) {
2027                                /* the first checked bit was set,
2028                                 * store start value, */
2029                                DCBP_set_start(p, 1);
2030                                /* but skip encoding of zero run length */
2031                                toggle = !toggle;
2032                                continue;
2033                        }
2034                        DCBP_set_start(p, 0);
2035                }
2036
2037                /* paranoia: catch zero runlength.
2038                 * can only happen if bitmap is modified while we scan it. */
2039                if (rl == 0) {
2040                        dev_err(DEV, "unexpected zero runlength while encoding bitmap "
2041                            "t:%u bo:%lu\n", toggle, c->bit_offset);
2042                        return -1;
2043                }
2044
2045                bits = vli_encode_bits(&bs, rl);
2046                if (bits == -ENOBUFS) /* buffer full */
2047                        break;
2048                if (bits <= 0) {
2049                        dev_err(DEV, "error while encoding bitmap: %d\n", bits);
2050                        return 0;
2051                }
2052
2053                toggle = !toggle;
2054                plain_bits += rl;
2055                c->bit_offset = tmp;
2056        } while (c->bit_offset < c->bm_bits);
2057
2058        len = bs.cur.b - p->code + !!bs.cur.bit;
2059
2060        if (plain_bits < (len << 3)) {
2061                /* incompressible with this method.
2062                 * we need to rewind both word and bit position. */
2063                c->bit_offset -= plain_bits;
2064                bm_xfer_ctx_bit_to_word_offset(c);
2065                c->bit_offset = c->word_offset * BITS_PER_LONG;
2066                return 0;
2067        }
2068
2069        /* RLE + VLI was able to compress it just fine.
2070         * update c->word_offset. */
2071        bm_xfer_ctx_bit_to_word_offset(c);
2072
2073        /* store pad_bits */
2074        DCBP_set_pad_bits(p, (8 - bs.cur.bit) & 0x7);
2075
2076        return len;
2077}
2078
2079enum { OK, FAILED, DONE }
2080send_bitmap_rle_or_plain(struct drbd_conf *mdev,
2081        struct p_header80 *h, struct bm_xfer_ctx *c)
2082{
2083        struct p_compressed_bm *p = (void*)h;
2084        unsigned long num_words;
2085        int len;
2086        int ok;
2087
2088        len = fill_bitmap_rle_bits(mdev, p, c);
2089
2090        if (len < 0)
2091                return FAILED;
2092
2093        if (len) {
2094                DCBP_set_code(p, RLE_VLI_Bits);
2095                ok = _drbd_send_cmd(mdev, mdev->data.socket, P_COMPRESSED_BITMAP, h,
2096                        sizeof(*p) + len, 0);
2097
2098                c->packets[0]++;
2099                c->bytes[0] += sizeof(*p) + len;
2100
2101                if (c->bit_offset >= c->bm_bits)
2102                        len = 0; /* DONE */
2103        } else {
2104                /* was not compressible.
2105                 * send a buffer full of plain text bits instead. */
2106                num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
2107                len = num_words * sizeof(long);
2108                if (len)
2109                        drbd_bm_get_lel(mdev, c->word_offset, num_words, (unsigned long*)h->payload);
2110                ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BITMAP,
2111                                   h, sizeof(struct p_header80) + len, 0);
2112                c->word_offset += num_words;
2113                c->bit_offset = c->word_offset * BITS_PER_LONG;
2114
2115                c->packets[1]++;
2116                c->bytes[1] += sizeof(struct p_header80) + len;
2117
2118                if (c->bit_offset > c->bm_bits)
2119                        c->bit_offset = c->bm_bits;
2120        }
2121        ok = ok ? ((len == 0) ? DONE : OK) : FAILED;
2122
2123        if (ok == DONE)
2124                INFO_bm_xfer_stats(mdev, "send", c);
2125        return ok;
2126}
2127
2128/* See the comment at receive_bitmap() */
2129int _drbd_send_bitmap(struct drbd_conf *mdev)
2130{
2131        struct bm_xfer_ctx c;
2132        struct p_header80 *p;
2133        int ret;
2134
2135        ERR_IF(!mdev->bitmap) return FALSE;
2136
2137        /* maybe we should use some per thread scratch page,
2138         * and allocate that during initial device creation? */
2139        p = (struct p_header80 *) __get_free_page(GFP_NOIO);
2140        if (!p) {
2141                dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
2142                return FALSE;
2143        }
2144
2145        if (get_ldev(mdev)) {
2146                if (drbd_md_test_flag(mdev->ldev, MDF_FULL_SYNC)) {
2147                        dev_info(DEV, "Writing the whole bitmap, MDF_FullSync was set.\n");
2148                        drbd_bm_set_all(mdev);
2149                        if (drbd_bm_write(mdev)) {
2150                                /* write_bm did fail! Leave full sync flag set in Meta P_DATA
2151                                 * but otherwise process as per normal - need to tell other
2152                                 * side that a full resync is required! */
2153                                dev_err(DEV, "Failed to write bitmap to disk!\n");
2154                        } else {
2155                                drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
2156                                drbd_md_sync(mdev);
2157                        }
2158                }
2159                put_ldev(mdev);
2160        }
2161
2162        c = (struct bm_xfer_ctx) {
2163                .bm_bits = drbd_bm_bits(mdev),
2164                .bm_words = drbd_bm_words(mdev),
2165        };
2166
2167        do {
2168                ret = send_bitmap_rle_or_plain(mdev, p, &c);
2169        } while (ret == OK);
2170
2171        free_page((unsigned long) p);
2172        return (ret == DONE);
2173}
2174
2175int drbd_send_bitmap(struct drbd_conf *mdev)
2176{
2177        int err;
2178
2179        if (!drbd_get_data_sock(mdev))
2180                return -1;
2181        err = !_drbd_send_bitmap(mdev);
2182        drbd_put_data_sock(mdev);
2183        return err;
2184}
2185
2186int drbd_send_b_ack(struct drbd_conf *mdev, u32 barrier_nr, u32 set_size)
2187{
2188        int ok;
2189        struct p_barrier_ack p;
2190
2191        p.barrier  = barrier_nr;
2192        p.set_size = cpu_to_be32(set_size);
2193
2194        if (mdev->state.conn < C_CONNECTED)
2195                return FALSE;
2196        ok = drbd_send_cmd(mdev, USE_META_SOCKET, P_BARRIER_ACK,
2197                        (struct p_header80 *)&p, sizeof(p));
2198        return ok;
2199}
2200
2201/**
2202 * _drbd_send_ack() - Sends an ack packet
2203 * @mdev:       DRBD device.
2204 * @cmd:        Packet command code.
2205 * @sector:     sector, needs to be in big endian byte order
2206 * @blksize:    size in byte, needs to be in big endian byte order
2207 * @block_id:   Id, big endian byte order
2208 */
2209static int _drbd_send_ack(struct drbd_conf *mdev, enum drbd_packets cmd,
2210                          u64 sector,
2211                          u32 blksize,
2212                          u64 block_id)
2213{
2214        int ok;
2215        struct p_block_ack p;
2216
2217        p.sector   = sector;
2218        p.block_id = block_id;
2219        p.blksize  = blksize;
2220        p.seq_num  = cpu_to_be32(atomic_add_return(1, &mdev->packet_seq));
2221
2222        if (!mdev->meta.socket || mdev->state.conn < C_CONNECTED)
2223                return FALSE;
2224        ok = drbd_send_cmd(mdev, USE_META_SOCKET, cmd,
2225                                (struct p_header80 *)&p, sizeof(p));
2226        return ok;
2227}
2228
2229/* dp->sector and dp->block_id already/still in network byte order,
2230 * data_size is payload size according to dp->head,
2231 * and may need to be corrected for digest size. */
2232int drbd_send_ack_dp(struct drbd_conf *mdev, enum drbd_packets cmd,
2233                     struct p_data *dp, int data_size)
2234{
2235        data_size -= (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
2236                crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
2237        return _drbd_send_ack(mdev, cmd, dp->sector, cpu_to_be32(data_size),
2238                              dp->block_id);
2239}
2240
2241int drbd_send_ack_rp(struct drbd_conf *mdev, enum drbd_packets cmd,
2242                     struct p_block_req *rp)
2243{
2244        return _drbd_send_ack(mdev, cmd, rp->sector, rp->blksize, rp->block_id);
2245}
2246
2247/**
2248 * drbd_send_ack() - Sends an ack packet
2249 * @mdev:       DRBD device.
2250 * @cmd:        Packet command code.
2251 * @e:          Epoch entry.
2252 */
2253int drbd_send_ack(struct drbd_conf *mdev,
2254        enum drbd_packets cmd, struct drbd_epoch_entry *e)
2255{
2256        return _drbd_send_ack(mdev, cmd,
2257                              cpu_to_be64(e->sector),
2258                              cpu_to_be32(e->size),
2259                              e->block_id);
2260}
2261
2262/* This function misuses the block_id field to signal if the blocks
2263 * are is sync or not. */
2264int drbd_send_ack_ex(struct drbd_conf *mdev, enum drbd_packets cmd,
2265                     sector_t sector, int blksize, u64 block_id)
2266{
2267        return _drbd_send_ack(mdev, cmd,
2268                              cpu_to_be64(sector),
2269                              cpu_to_be32(blksize),
2270                              cpu_to_be64(block_id));
2271}
2272
2273int drbd_send_drequest(struct drbd_conf *mdev, int cmd,
2274                       sector_t sector, int size, u64 block_id)
2275{
2276        int ok;
2277        struct p_block_req p;
2278
2279        p.sector   = cpu_to_be64(sector);
2280        p.block_id = block_id;
2281        p.blksize  = cpu_to_be32(size);
2282
2283        ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, cmd,
2284                                (struct p_header80 *)&p, sizeof(p));
2285        return ok;
2286}
2287
2288int drbd_send_drequest_csum(struct drbd_conf *mdev,
2289                            sector_t sector, int size,
2290                            void *digest, int digest_size,
2291                            enum drbd_packets cmd)
2292{
2293        int ok;
2294        struct p_block_req p;
2295
2296        p.sector   = cpu_to_be64(sector);
2297        p.block_id = BE_DRBD_MAGIC + 0xbeef;
2298        p.blksize  = cpu_to_be32(size);
2299
2300        p.head.magic   = BE_DRBD_MAGIC;
2301        p.head.command = cpu_to_be16(cmd);
2302        p.head.length  = cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + digest_size);
2303
2304        mutex_lock(&mdev->data.mutex);
2305
2306        ok = (sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), 0));
2307        ok = ok && (digest_size == drbd_send(mdev, mdev->data.socket, digest, digest_size, 0));
2308
2309        mutex_unlock(&mdev->data.mutex);
2310
2311        return ok;
2312}
2313
2314int drbd_send_ov_request(struct drbd_conf *mdev, sector_t sector, int size)
2315{
2316        int ok;
2317        struct p_block_req p;
2318
2319        p.sector   = cpu_to_be64(sector);
2320        p.block_id = BE_DRBD_MAGIC + 0xbabe;
2321        p.blksize  = cpu_to_be32(size);
2322
2323        ok = drbd_send_cmd(mdev, USE_DATA_SOCKET, P_OV_REQUEST,
2324                           (struct p_header80 *)&p, sizeof(p));
2325        return ok;
2326}
2327
2328/* called on sndtimeo
2329 * returns FALSE if we should retry,
2330 * TRUE if we think connection is dead
2331 */
2332static int we_should_drop_the_connection(struct drbd_conf *mdev, struct socket *sock)
2333{
2334        int drop_it;
2335        /* long elapsed = (long)(jiffies - mdev->last_received); */
2336
2337        drop_it =   mdev->meta.socket == sock
2338                || !mdev->asender.task
2339                || get_t_state(&mdev->asender) != Running
2340                || mdev->state.conn < C_CONNECTED;
2341
2342        if (drop_it)
2343                return TRUE;
2344
2345        drop_it = !--mdev->ko_count;
2346        if (!drop_it) {
2347                dev_err(DEV, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
2348                       current->comm, current->pid, mdev->ko_count);
2349                request_ping(mdev);
2350        }
2351
2352        return drop_it; /* && (mdev->state == R_PRIMARY) */;
2353}
2354
2355/* The idea of sendpage seems to be to put some kind of reference
2356 * to the page into the skb, and to hand it over to the NIC. In
2357 * this process get_page() gets called.
2358 *
2359 * As soon as the page was really sent over the network put_page()
2360 * gets called by some part of the network layer. [ NIC driver? ]
2361 *
2362 * [ get_page() / put_page() increment/decrement the count. If count
2363 *   reaches 0 the page will be freed. ]
2364 *
2365 * This works nicely with pages from FSs.
2366 * But this means that in protocol A we might signal IO completion too early!
2367 *
2368 * In order not to corrupt data during a resync we must make sure
2369 * that we do not reuse our own buffer pages (EEs) to early, therefore
2370 * we have the net_ee list.
2371 *
2372 * XFS seems to have problems, still, it submits pages with page_count == 0!
2373 * As a workaround, we disable sendpage on pages
2374 * with page_count == 0 or PageSlab.
2375 */
2376static int _drbd_no_send_page(struct drbd_conf *mdev, struct page *page,
2377                   int offset, size_t size, unsigned msg_flags)
2378{
2379        int sent = drbd_send(mdev, mdev->data.socket, kmap(page) + offset, size, msg_flags);
2380        kunmap(page);
2381        if (sent == size)
2382                mdev->send_cnt += size>>9;
2383        return sent == size;
2384}
2385
2386static int _drbd_send_page(struct drbd_conf *mdev, struct page *page,
2387                    int offset, size_t size, unsigned msg_flags)
2388{
2389        mm_segment_t oldfs = get_fs();
2390        int sent, ok;
2391        int len = size;
2392
2393        /* e.g. XFS meta- & log-data is in slab pages, which have a
2394         * page_count of 0 and/or have PageSlab() set.
2395         * we cannot use send_page for those, as that does get_page();
2396         * put_page(); and would cause either a VM_BUG directly, or
2397         * __page_cache_release a page that would actually still be referenced
2398         * by someone, leading to some obscure delayed Oops somewhere else. */
2399        if (disable_sendpage || (page_count(page) < 1) || PageSlab(page))
2400                return _drbd_no_send_page(mdev, page, offset, size, msg_flags);
2401
2402        msg_flags |= MSG_NOSIGNAL;
2403        drbd_update_congested(mdev);
2404        set_fs(KERNEL_DS);
2405        do {
2406                sent = mdev->data.socket->ops->sendpage(mdev->data.socket, page,
2407                                                        offset, len,
2408                                                        msg_flags);
2409                if (sent == -EAGAIN) {
2410                        if (we_should_drop_the_connection(mdev,
2411                                                          mdev->data.socket))
2412                                break;
2413                        else
2414                                continue;
2415                }
2416                if (sent <= 0) {
2417                        dev_warn(DEV, "%s: size=%d len=%d sent=%d\n",
2418                             __func__, (int)size, len, sent);
2419                        break;
2420                }
2421                len    -= sent;
2422                offset += sent;
2423        } while (len > 0 /* THINK && mdev->cstate >= C_CONNECTED*/);
2424        set_fs(oldfs);
2425        clear_bit(NET_CONGESTED, &mdev->flags);
2426
2427        ok = (len == 0);
2428        if (likely(ok))
2429                mdev->send_cnt += size>>9;
2430        return ok;
2431}
2432
2433static int _drbd_send_bio(struct drbd_conf *mdev, struct bio *bio)
2434{
2435        struct bio_vec *bvec;
2436        int i;
2437        /* hint all but last page with MSG_MORE */
2438        __bio_for_each_segment(bvec, bio, i, 0) {
2439                if (!_drbd_no_send_page(mdev, bvec->bv_page,
2440                                     bvec->bv_offset, bvec->bv_len,
2441                                     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2442                        return 0;
2443        }
2444        return 1;
2445}
2446
2447static int _drbd_send_zc_bio(struct drbd_conf *mdev, struct bio *bio)
2448{
2449        struct bio_vec *bvec;
2450        int i;
2451        /* hint all but last page with MSG_MORE */
2452        __bio_for_each_segment(bvec, bio, i, 0) {
2453                if (!_drbd_send_page(mdev, bvec->bv_page,
2454                                     bvec->bv_offset, bvec->bv_len,
2455                                     i == bio->bi_vcnt -1 ? 0 : MSG_MORE))
2456                        return 0;
2457        }
2458        return 1;
2459}
2460
2461static int _drbd_send_zc_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
2462{
2463        struct page *page = e->pages;
2464        unsigned len = e->size;
2465        /* hint all but last page with MSG_MORE */
2466        page_chain_for_each(page) {
2467                unsigned l = min_t(unsigned, len, PAGE_SIZE);
2468                if (!_drbd_send_page(mdev, page, 0, l,
2469                                page_chain_next(page) ? MSG_MORE : 0))
2470                        return 0;
2471                len -= l;
2472        }
2473        return 1;
2474}
2475
2476static u32 bio_flags_to_wire(struct drbd_conf *mdev, unsigned long bi_rw)
2477{
2478        if (mdev->agreed_pro_version >= 95)
2479                return  (bi_rw & REQ_SYNC ? DP_RW_SYNC : 0) |
2480                        (bi_rw & REQ_UNPLUG ? DP_UNPLUG : 0) |
2481                        (bi_rw & REQ_FUA ? DP_FUA : 0) |
2482                        (bi_rw & REQ_FLUSH ? DP_FLUSH : 0) |
2483                        (bi_rw & REQ_DISCARD ? DP_DISCARD : 0);
2484        else
2485                return bi_rw & (REQ_SYNC | REQ_UNPLUG) ? DP_RW_SYNC : 0;
2486}
2487
2488/* Used to send write requests
2489 * R_PRIMARY -> Peer    (P_DATA)
2490 */
2491int drbd_send_dblock(struct drbd_conf *mdev, struct drbd_request *req)
2492{
2493        int ok = 1;
2494        struct p_data p;
2495        unsigned int dp_flags = 0;
2496        void *dgb;
2497        int dgs;
2498
2499        if (!drbd_get_data_sock(mdev))
2500                return 0;
2501
2502        dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2503                crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2504
2505        if (req->size <= DRBD_MAX_SIZE_H80_PACKET) {
2506                p.head.h80.magic   = BE_DRBD_MAGIC;
2507                p.head.h80.command = cpu_to_be16(P_DATA);
2508                p.head.h80.length  =
2509                        cpu_to_be16(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2510        } else {
2511                p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2512                p.head.h95.command = cpu_to_be16(P_DATA);
2513                p.head.h95.length  =
2514                        cpu_to_be32(sizeof(p) - sizeof(union p_header) + dgs + req->size);
2515        }
2516
2517        p.sector   = cpu_to_be64(req->sector);
2518        p.block_id = (unsigned long)req;
2519        p.seq_num  = cpu_to_be32(req->seq_num =
2520                                 atomic_add_return(1, &mdev->packet_seq));
2521
2522        dp_flags = bio_flags_to_wire(mdev, req->master_bio->bi_rw);
2523
2524        if (mdev->state.conn >= C_SYNC_SOURCE &&
2525            mdev->state.conn <= C_PAUSED_SYNC_T)
2526                dp_flags |= DP_MAY_SET_IN_SYNC;
2527
2528        p.dp_flags = cpu_to_be32(dp_flags);
2529        set_bit(UNPLUG_REMOTE, &mdev->flags);
2530        ok = (sizeof(p) ==
2531                drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0));
2532        if (ok && dgs) {
2533                dgb = mdev->int_dig_out;
2534                drbd_csum_bio(mdev, mdev->integrity_w_tfm, req->master_bio, dgb);
2535                ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2536        }
2537        if (ok) {
2538                if (mdev->net_conf->wire_protocol == DRBD_PROT_A)
2539                        ok = _drbd_send_bio(mdev, req->master_bio);
2540                else
2541                        ok = _drbd_send_zc_bio(mdev, req->master_bio);
2542        }
2543
2544        drbd_put_data_sock(mdev);
2545
2546        return ok;
2547}
2548
2549/* answer packet, used to send data back for read requests:
2550 *  Peer       -> (diskless) R_PRIMARY   (P_DATA_REPLY)
2551 *  C_SYNC_SOURCE -> C_SYNC_TARGET         (P_RS_DATA_REPLY)
2552 */
2553int drbd_send_block(struct drbd_conf *mdev, enum drbd_packets cmd,
2554                    struct drbd_epoch_entry *e)
2555{
2556        int ok;
2557        struct p_data p;
2558        void *dgb;
2559        int dgs;
2560
2561        dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_w_tfm) ?
2562                crypto_hash_digestsize(mdev->integrity_w_tfm) : 0;
2563
2564        if (e->size <= DRBD_MAX_SIZE_H80_PACKET) {
2565                p.head.h80.magic   = BE_DRBD_MAGIC;
2566                p.head.h80.command = cpu_to_be16(cmd);
2567                p.head.h80.length  =
2568                        cpu_to_be16(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2569        } else {
2570                p.head.h95.magic   = BE_DRBD_MAGIC_BIG;
2571                p.head.h95.command = cpu_to_be16(cmd);
2572                p.head.h95.length  =
2573                        cpu_to_be32(sizeof(p) - sizeof(struct p_header80) + dgs + e->size);
2574        }
2575
2576        p.sector   = cpu_to_be64(e->sector);
2577        p.block_id = e->block_id;
2578        /* p.seq_num  = 0;    No sequence numbers here.. */
2579
2580        /* Only called by our kernel thread.
2581         * This one may be interrupted by DRBD_SIG and/or DRBD_SIGKILL
2582         * in response to admin command or module unload.
2583         */
2584        if (!drbd_get_data_sock(mdev))
2585                return 0;
2586
2587        ok = sizeof(p) == drbd_send(mdev, mdev->data.socket, &p, sizeof(p), dgs ? MSG_MORE : 0);
2588        if (ok && dgs) {
2589                dgb = mdev->int_dig_out;
2590                drbd_csum_ee(mdev, mdev->integrity_w_tfm, e, dgb);
2591                ok = drbd_send(mdev, mdev->data.socket, dgb, dgs, 0);
2592        }
2593        if (ok)
2594                ok = _drbd_send_zc_ee(mdev, e);
2595
2596        drbd_put_data_sock(mdev);
2597
2598        return ok;
2599}
2600
2601/*
2602  drbd_send distinguishes two cases:
2603
2604  Packets sent via the data socket "sock"
2605  and packets sent via the meta data socket "msock"
2606
2607                    sock                      msock
2608  -----------------+-------------------------+------------------------------
2609  timeout           conf.timeout / 2          conf.timeout / 2
2610  timeout action    send a ping via msock     Abort communication
2611                                              and close all sockets
2612*/
2613
2614/*
2615 * you must have down()ed the appropriate [m]sock_mutex elsewhere!
2616 */
2617int drbd_send(struct drbd_conf *mdev, struct socket *sock,
2618              void *buf, size_t size, unsigned msg_flags)
2619{
2620        struct kvec iov;
2621        struct msghdr msg;
2622        int rv, sent = 0;
2623
2624        if (!sock)
2625                return -1000;
2626
2627        /* THINK  if (signal_pending) return ... ? */
2628
2629        iov.iov_base = buf;
2630        iov.iov_len  = size;
2631
2632        msg.msg_name       = NULL;
2633        msg.msg_namelen    = 0;
2634        msg.msg_control    = NULL;
2635        msg.msg_controllen = 0;
2636        msg.msg_flags      = msg_flags | MSG_NOSIGNAL;
2637
2638        if (sock == mdev->data.socket) {
2639                mdev->ko_count = mdev->net_conf->ko_count;
2640                drbd_update_congested(mdev);
2641        }
2642        do {
2643                /* STRANGE
2644                 * tcp_sendmsg does _not_ use its size parameter at all ?
2645                 *
2646                 * -EAGAIN on timeout, -EINTR on signal.
2647                 */
2648/* THINK
2649 * do we need to block DRBD_SIG if sock == &meta.socket ??
2650 * otherwise wake_asender() might interrupt some send_*Ack !
2651 */
2652                rv = kernel_sendmsg(sock, &msg, &iov, 1, size);
2653                if (rv == -EAGAIN) {
2654                        if (we_should_drop_the_connection(mdev, sock))
2655                                break;
2656                        else
2657                                continue;
2658                }
2659                D_ASSERT(rv != 0);
2660                if (rv == -EINTR) {
2661                        flush_signals(current);
2662                        rv = 0;
2663                }
2664                if (rv < 0)
2665                        break;
2666                sent += rv;
2667                iov.iov_base += rv;
2668                iov.iov_len  -= rv;
2669        } while (sent < size);
2670
2671        if (sock == mdev->data.socket)
2672                clear_bit(NET_CONGESTED, &mdev->flags);
2673
2674        if (rv <= 0) {
2675                if (rv != -EAGAIN) {
2676                        dev_err(DEV, "%s_sendmsg returned %d\n",
2677                            sock == mdev->meta.socket ? "msock" : "sock",
2678                            rv);
2679                        drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
2680                } else
2681                        drbd_force_state(mdev, NS(conn, C_TIMEOUT));
2682        }
2683
2684        return sent;
2685}
2686
2687static int drbd_open(struct block_device *bdev, fmode_t mode)
2688{
2689        struct drbd_conf *mdev = bdev->bd_disk->private_data;
2690        unsigned long flags;
2691        int rv = 0;
2692
2693        mutex_lock(&drbd_main_mutex);
2694        spin_lock_irqsave(&mdev->req_lock, flags);
2695        /* to have a stable mdev->state.role
2696         * and no race with updating open_cnt */
2697
2698        if (mdev->state.role != R_PRIMARY) {
2699                if (mode & FMODE_WRITE)
2700                        rv = -EROFS;
2701                else if (!allow_oos)
2702                        rv = -EMEDIUMTYPE;
2703        }
2704
2705        if (!rv)
2706                mdev->open_cnt++;
2707        spin_unlock_irqrestore(&mdev->req_lock, flags);
2708        mutex_unlock(&drbd_main_mutex);
2709
2710        return rv;
2711}
2712
2713static int drbd_release(struct gendisk *gd, fmode_t mode)
2714{
2715        struct drbd_conf *mdev = gd->private_data;
2716        mutex_lock(&drbd_main_mutex);
2717        mdev->open_cnt--;
2718        mutex_unlock(&drbd_main_mutex);
2719        return 0;
2720}
2721
2722static void drbd_unplug_fn(struct request_queue *q)
2723{
2724        struct drbd_conf *mdev = q->queuedata;
2725
2726        /* unplug FIRST */
2727        spin_lock_irq(q->queue_lock);
2728        blk_remove_plug(q);
2729        spin_unlock_irq(q->queue_lock);
2730
2731        /* only if connected */
2732        spin_lock_irq(&mdev->req_lock);
2733        if (mdev->state.pdsk >= D_INCONSISTENT && mdev->state.conn >= C_CONNECTED) {
2734                D_ASSERT(mdev->state.role == R_PRIMARY);
2735                if (test_and_clear_bit(UNPLUG_REMOTE, &mdev->flags)) {
2736                        /* add to the data.work queue,
2737                         * unless already queued.
2738                         * XXX this might be a good addition to drbd_queue_work
2739                         * anyways, to detect "double queuing" ... */
2740                        if (list_empty(&mdev->unplug_work.list))
2741                                drbd_queue_work(&mdev->data.work,
2742                                                &mdev->unplug_work);
2743                }
2744        }
2745        spin_unlock_irq(&mdev->req_lock);
2746
2747        if (mdev->state.disk >= D_INCONSISTENT)
2748                drbd_kick_lo(mdev);
2749}
2750
2751static void drbd_set_defaults(struct drbd_conf *mdev)
2752{
2753        /* This way we get a compile error when sync_conf grows,
2754           and we forgot to initialize it here */
2755        mdev->sync_conf = (struct syncer_conf) {
2756                /* .rate = */           DRBD_RATE_DEF,
2757                /* .after = */          DRBD_AFTER_DEF,
2758                /* .al_extents = */     DRBD_AL_EXTENTS_DEF,
2759                /* .verify_alg = */     {}, 0,
2760                /* .cpu_mask = */       {}, 0,
2761                /* .csums_alg = */      {}, 0,
2762                /* .use_rle = */        0,
2763                /* .on_no_data = */     DRBD_ON_NO_DATA_DEF,
2764                /* .c_plan_ahead = */   DRBD_C_PLAN_AHEAD_DEF,
2765                /* .c_delay_target = */ DRBD_C_DELAY_TARGET_DEF,
2766                /* .c_fill_target = */  DRBD_C_FILL_TARGET_DEF,
2767                /* .c_max_rate = */     DRBD_C_MAX_RATE_DEF,
2768                /* .c_min_rate = */     DRBD_C_MIN_RATE_DEF
2769        };
2770
2771        /* Have to use that way, because the layout differs between
2772           big endian and little endian */
2773        mdev->state = (union drbd_state) {
2774                { .role = R_SECONDARY,
2775                  .peer = R_UNKNOWN,
2776                  .conn = C_STANDALONE,
2777                  .disk = D_DISKLESS,
2778                  .pdsk = D_UNKNOWN,
2779                  .susp = 0,
2780                  .susp_nod = 0,
2781                  .susp_fen = 0
2782                } };
2783}
2784
2785void drbd_init_set_defaults(struct drbd_conf *mdev)
2786{
2787        /* the memset(,0,) did most of this.
2788         * note: only assignments, no allocation in here */
2789
2790        drbd_set_defaults(mdev);
2791
2792        atomic_set(&mdev->ap_bio_cnt, 0);
2793        atomic_set(&mdev->ap_pending_cnt, 0);
2794        atomic_set(&mdev->rs_pending_cnt, 0);
2795        atomic_set(&mdev->unacked_cnt, 0);
2796        atomic_set(&mdev->local_cnt, 0);
2797        atomic_set(&mdev->net_cnt, 0);
2798        atomic_set(&mdev->packet_seq, 0);
2799        atomic_set(&mdev->pp_in_use, 0);
2800        atomic_set(&mdev->pp_in_use_by_net, 0);
2801        atomic_set(&mdev->rs_sect_in, 0);
2802        atomic_set(&mdev->rs_sect_ev, 0);
2803
2804        mutex_init(&mdev->md_io_mutex);
2805        mutex_init(&mdev->data.mutex);
2806        mutex_init(&mdev->meta.mutex);
2807        sema_init(&mdev->data.work.s, 0);
2808        sema_init(&mdev->meta.work.s, 0);
2809        mutex_init(&mdev->state_mutex);
2810
2811        spin_lock_init(&mdev->data.work.q_lock);
2812        spin_lock_init(&mdev->meta.work.q_lock);
2813
2814        spin_lock_init(&mdev->al_lock);
2815        spin_lock_init(&mdev->req_lock);
2816        spin_lock_init(&mdev->peer_seq_lock);
2817        spin_lock_init(&mdev->epoch_lock);
2818
2819        INIT_LIST_HEAD(&mdev->active_ee);
2820        INIT_LIST_HEAD(&mdev->sync_ee);
2821        INIT_LIST_HEAD(&mdev->done_ee);
2822        INIT_LIST_HEAD(&mdev->read_ee);
2823        INIT_LIST_HEAD(&mdev->net_ee);
2824        INIT_LIST_HEAD(&mdev->resync_reads);
2825        INIT_LIST_HEAD(&mdev->data.work.q);
2826        INIT_LIST_HEAD(&mdev->meta.work.q);
2827        INIT_LIST_HEAD(&mdev->resync_work.list);
2828        INIT_LIST_HEAD(&mdev->unplug_work.list);
2829        INIT_LIST_HEAD(&mdev->go_diskless.list);
2830        INIT_LIST_HEAD(&mdev->md_sync_work.list);
2831        INIT_LIST_HEAD(&mdev->bm_io_work.w.list);
2832
2833        mdev->resync_work.cb  = w_resync_inactive;
2834        mdev->unplug_work.cb  = w_send_write_hint;
2835        mdev->go_diskless.cb  = w_go_diskless;
2836        mdev->md_sync_work.cb = w_md_sync;
2837        mdev->bm_io_work.w.cb = w_bitmap_io;
2838        init_timer(&mdev->resync_timer);
2839        init_timer(&mdev->md_sync_timer);
2840        mdev->resync_timer.function = resync_timer_fn;
2841        mdev->resync_timer.data = (unsigned long) mdev;
2842        mdev->md_sync_timer.function = md_sync_timer_fn;
2843        mdev->md_sync_timer.data = (unsigned long) mdev;
2844
2845        init_waitqueue_head(&mdev->misc_wait);
2846        init_waitqueue_head(&mdev->state_wait);
2847        init_waitqueue_head(&mdev->net_cnt_wait);
2848        init_waitqueue_head(&mdev->ee_wait);
2849        init_waitqueue_head(&mdev->al_wait);
2850        init_waitqueue_head(&mdev->seq_wait);
2851
2852        drbd_thread_init(mdev, &mdev->receiver, drbdd_init);
2853        drbd_thread_init(mdev, &mdev->worker, drbd_worker);
2854        drbd_thread_init(mdev, &mdev->asender, drbd_asender);
2855
2856        mdev->agreed_pro_version = PRO_VERSION_MAX;
2857        mdev->write_ordering = WO_bdev_flush;
2858        mdev->resync_wenr = LC_FREE;
2859}
2860
2861void drbd_mdev_cleanup(struct drbd_conf *mdev)
2862{
2863        int i;
2864        if (mdev->receiver.t_state != None)
2865                dev_err(DEV, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
2866                                mdev->receiver.t_state);
2867
2868        /* no need to lock it, I'm the only thread alive */
2869        if (atomic_read(&mdev->current_epoch->epoch_size) !=  0)
2870                dev_err(DEV, "epoch_size:%d\n", atomic_read(&mdev->current_epoch->epoch_size));
2871        mdev->al_writ_cnt  =
2872        mdev->bm_writ_cnt  =
2873        mdev->read_cnt     =
2874        mdev->recv_cnt     =
2875        mdev->send_cnt     =
2876        mdev->writ_cnt     =
2877        mdev->p_size       =
2878        mdev->rs_start     =
2879        mdev->rs_total     =
2880        mdev->rs_failed    = 0;
2881        mdev->rs_last_events = 0;
2882        mdev->rs_last_sect_ev = 0;
2883        for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2884                mdev->rs_mark_left[i] = 0;
2885                mdev->rs_mark_time[i] = 0;
2886        }
2887        D_ASSERT(mdev->net_conf == NULL);
2888
2889        drbd_set_my_capacity(mdev, 0);
2890        if (mdev->bitmap) {
2891                /* maybe never allocated. */
2892                drbd_bm_resize(mdev, 0, 1);
2893                drbd_bm_cleanup(mdev);
2894        }
2895
2896        drbd_free_resources(mdev);
2897        clear_bit(AL_SUSPENDED, &mdev->flags);
2898
2899        /*
2900         * currently we drbd_init_ee only on module load, so
2901         * we may do drbd_release_ee only on module unload!
2902         */
2903        D_ASSERT(list_empty(&mdev->active_ee));
2904        D_ASSERT(list_empty(&mdev->sync_ee));
2905        D_ASSERT(list_empty(&mdev->done_ee));
2906        D_ASSERT(list_empty(&mdev->read_ee));
2907        D_ASSERT(list_empty(&mdev->net_ee));
2908        D_ASSERT(list_empty(&mdev->resync_reads));
2909        D_ASSERT(list_empty(&mdev->data.work.q));
2910        D_ASSERT(list_empty(&mdev->meta.work.q));
2911        D_ASSERT(list_empty(&mdev->resync_work.list));
2912        D_ASSERT(list_empty(&mdev->unplug_work.list));
2913        D_ASSERT(list_empty(&mdev->go_diskless.list));
2914}
2915
2916
2917static void drbd_destroy_mempools(void)
2918{
2919        struct page *page;
2920
2921        while (drbd_pp_pool) {
2922                page = drbd_pp_pool;
2923                drbd_pp_pool = (struct page *)page_private(page);
2924                __free_page(page);
2925                drbd_pp_vacant--;
2926        }
2927
2928        /* D_ASSERT(atomic_read(&drbd_pp_vacant)==0); */
2929
2930        if (drbd_ee_mempool)
2931                mempool_destroy(drbd_ee_mempool);
2932        if (drbd_request_mempool)
2933                mempool_destroy(drbd_request_mempool);
2934        if (drbd_ee_cache)
2935                kmem_cache_destroy(drbd_ee_cache);
2936        if (drbd_request_cache)
2937                kmem_cache_destroy(drbd_request_cache);
2938        if (drbd_bm_ext_cache)
2939                kmem_cache_destroy(drbd_bm_ext_cache);
2940        if (drbd_al_ext_cache)
2941                kmem_cache_destroy(drbd_al_ext_cache);
2942
2943        drbd_ee_mempool      = NULL;
2944        drbd_request_mempool = NULL;
2945        drbd_ee_cache        = NULL;
2946        drbd_request_cache   = NULL;
2947        drbd_bm_ext_cache    = NULL;
2948        drbd_al_ext_cache    = NULL;
2949
2950        return;
2951}
2952
2953static int drbd_create_mempools(void)
2954{
2955        struct page *page;
2956        const int number = (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE) * minor_count;
2957        int i;
2958
2959        /* prepare our caches and mempools */
2960        drbd_request_mempool = NULL;
2961        drbd_ee_cache        = NULL;
2962        drbd_request_cache   = NULL;
2963        drbd_bm_ext_cache    = NULL;
2964        drbd_al_ext_cache    = NULL;
2965        drbd_pp_pool         = NULL;
2966
2967        /* caches */
2968        drbd_request_cache = kmem_cache_create(
2969                "drbd_req", sizeof(struct drbd_request), 0, 0, NULL);
2970        if (drbd_request_cache == NULL)
2971                goto Enomem;
2972
2973        drbd_ee_cache = kmem_cache_create(
2974                "drbd_ee", sizeof(struct drbd_epoch_entry), 0, 0, NULL);
2975        if (drbd_ee_cache == NULL)
2976                goto Enomem;
2977
2978        drbd_bm_ext_cache = kmem_cache_create(
2979                "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
2980        if (drbd_bm_ext_cache == NULL)
2981                goto Enomem;
2982
2983        drbd_al_ext_cache = kmem_cache_create(
2984                "drbd_al", sizeof(struct lc_element), 0, 0, NULL);
2985        if (drbd_al_ext_cache == NULL)
2986                goto Enomem;
2987
2988        /* mempools */
2989        drbd_request_mempool = mempool_create(number,
2990                mempool_alloc_slab, mempool_free_slab, drbd_request_cache);
2991        if (drbd_request_mempool == NULL)
2992                goto Enomem;
2993
2994        drbd_ee_mempool = mempool_create(number,
2995                mempool_alloc_slab, mempool_free_slab, drbd_ee_cache);
2996        if (drbd_ee_mempool == NULL)
2997                goto Enomem;
2998
2999        /* drbd's page pool */
3000        spin_lock_init(&drbd_pp_lock);
3001
3002        for (i = 0; i < number; i++) {
3003                page = alloc_page(GFP_HIGHUSER);
3004                if (!page)
3005                        goto Enomem;
3006                set_page_private(page, (unsigned long)drbd_pp_pool);
3007                drbd_pp_pool = page;
3008        }
3009        drbd_pp_vacant = number;
3010
3011        return 0;
3012
3013Enomem:
3014        drbd_destroy_mempools(); /* in case we allocated some */
3015        return -ENOMEM;
3016}
3017
3018static int drbd_notify_sys(struct notifier_block *this, unsigned long code,
3019        void *unused)
3020{
3021        /* just so we have it.  you never know what interesting things we
3022         * might want to do here some day...
3023         */
3024
3025        return NOTIFY_DONE;
3026}
3027
3028static struct notifier_block drbd_notifier = {
3029        .notifier_call = drbd_notify_sys,
3030};
3031
3032static void drbd_release_ee_lists(struct drbd_conf *mdev)
3033{
3034        int rr;
3035
3036        rr = drbd_release_ee(mdev, &mdev->active_ee);
3037        if (rr)
3038                dev_err(DEV, "%d EEs in active list found!\n", rr);
3039
3040        rr = drbd_release_ee(mdev, &mdev->sync_ee);
3041        if (rr)
3042                dev_err(DEV, "%d EEs in sync list found!\n", rr);
3043
3044        rr = drbd_release_ee(mdev, &mdev->read_ee);
3045        if (rr)
3046                dev_err(DEV, "%d EEs in read list found!\n", rr);
3047
3048        rr = drbd_release_ee(mdev, &mdev->done_ee);
3049        if (rr)
3050                dev_err(DEV, "%d EEs in done list found!\n", rr);
3051
3052        rr = drbd_release_ee(mdev, &mdev->net_ee);
3053        if (rr)
3054                dev_err(DEV, "%d EEs in net list found!\n", rr);
3055}
3056
3057/* caution. no locking.
3058 * currently only used from module cleanup code. */
3059static void drbd_delete_device(unsigned int minor)
3060{
3061        struct drbd_conf *mdev = minor_to_mdev(minor);
3062
3063        if (!mdev)
3064                return;
3065
3066        /* paranoia asserts */
3067        if (mdev->open_cnt != 0)
3068                dev_err(DEV, "open_cnt = %d in %s:%u", mdev->open_cnt,
3069                                __FILE__ , __LINE__);
3070
3071        ERR_IF (!list_empty(&mdev->data.work.q)) {
3072                struct list_head *lp;
3073                list_for_each(lp, &mdev->data.work.q) {
3074                        dev_err(DEV, "lp = %p\n", lp);
3075                }
3076        };
3077        /* end paranoia asserts */
3078
3079        del_gendisk(mdev->vdisk);
3080
3081        /* cleanup stuff that may have been allocated during
3082         * device (re-)configuration or state changes */
3083
3084        if (mdev->this_bdev)
3085                bdput(mdev->this_bdev);
3086
3087        drbd_free_resources(mdev);
3088
3089        drbd_release_ee_lists(mdev);
3090
3091        /* should be free'd on disconnect? */
3092        kfree(mdev->ee_hash);
3093        /*
3094        mdev->ee_hash_s = 0;
3095        mdev->ee_hash = NULL;
3096        */
3097
3098        lc_destroy(mdev->act_log);
3099        lc_destroy(mdev->resync);
3100
3101        kfree(mdev->p_uuid);
3102        /* mdev->p_uuid = NULL; */
3103
3104        kfree(mdev->int_dig_out);
3105        kfree(mdev->int_dig_in);
3106        kfree(mdev->int_dig_vv);
3107
3108        /* cleanup the rest that has been
3109         * allocated from drbd_new_device
3110         * and actually free the mdev itself */
3111        drbd_free_mdev(mdev);
3112}
3113
3114static void drbd_cleanup(void)
3115{
3116        unsigned int i;
3117
3118        unregister_reboot_notifier(&drbd_notifier);
3119
3120        drbd_nl_cleanup();
3121
3122        if (minor_table) {
3123                if (drbd_proc)
3124                        remove_proc_entry("drbd", NULL);
3125                i = minor_count;
3126                while (i--)
3127                        drbd_delete_device(i);
3128                drbd_destroy_mempools();
3129        }
3130
3131        kfree(minor_table);
3132
3133        unregister_blkdev(DRBD_MAJOR, "drbd");
3134
3135        printk(KERN_INFO "drbd: module cleanup done.\n");
3136}
3137
3138/**
3139 * drbd_congested() - Callback for pdflush
3140 * @congested_data:     User data
3141 * @bdi_bits:           Bits pdflush is currently interested in
3142 *
3143 * Returns 1<<BDI_async_congested and/or 1<<BDI_sync_congested if we are congested.
3144 */
3145static int drbd_congested(void *congested_data, int bdi_bits)
3146{
3147        struct drbd_conf *mdev = congested_data;
3148        struct request_queue *q;
3149        char reason = '-';
3150        int r = 0;
3151
3152        if (!__inc_ap_bio_cond(mdev)) {
3153                /* DRBD has frozen IO */
3154                r = bdi_bits;
3155                reason = 'd';
3156                goto out;
3157        }
3158
3159        if (get_ldev(mdev)) {
3160                q = bdev_get_queue(mdev->ldev->backing_bdev);
3161                r = bdi_congested(&q->backing_dev_info, bdi_bits);
3162                put_ldev(mdev);
3163                if (r)
3164                        reason = 'b';
3165        }
3166
3167        if (bdi_bits & (1 << BDI_async_congested) && test_bit(NET_CONGESTED, &mdev->flags)) {
3168                r |= (1 << BDI_async_congested);
3169                reason = reason == 'b' ? 'a' : 'n';
3170        }
3171
3172out:
3173        mdev->congestion_reason = reason;
3174        return r;
3175}
3176
3177struct drbd_conf *drbd_new_device(unsigned int minor)
3178{
3179        struct drbd_conf *mdev;
3180        struct gendisk *disk;
3181        struct request_queue *q;
3182
3183        /* GFP_KERNEL, we are outside of all write-out paths */
3184        mdev = kzalloc(sizeof(struct drbd_conf), GFP_KERNEL);
3185        if (!mdev)
3186                return NULL;
3187        if (!zalloc_cpumask_var(&mdev->cpu_mask, GFP_KERNEL))
3188                goto out_no_cpumask;
3189
3190        mdev->minor = minor;
3191
3192        drbd_init_set_defaults(mdev);
3193
3194        q = blk_alloc_queue(GFP_KERNEL);
3195        if (!q)
3196                goto out_no_q;
3197        mdev->rq_queue = q;
3198        q->queuedata   = mdev;
3199
3200        disk = alloc_disk(1);
3201        if (!disk)
3202                goto out_no_disk;
3203        mdev->vdisk = disk;
3204
3205        set_disk_ro(disk, TRUE);
3206
3207        disk->queue = q;
3208        disk->major = DRBD_MAJOR;
3209        disk->first_minor = minor;
3210        disk->fops = &drbd_ops;
3211        sprintf(disk->disk_name, "drbd%d", minor);
3212        disk->private_data = mdev;
3213
3214        mdev->this_bdev = bdget(MKDEV(DRBD_MAJOR, minor));
3215        /* we have no partitions. we contain only ourselves. */
3216        mdev->this_bdev->bd_contains = mdev->this_bdev;
3217
3218        q->backing_dev_info.congested_fn = drbd_congested;
3219        q->backing_dev_info.congested_data = mdev;
3220
3221        blk_queue_make_request(q, drbd_make_request_26);
3222        blk_queue_max_segment_size(q, DRBD_MAX_SEGMENT_SIZE);
3223        blk_queue_bounce_limit(q, BLK_BOUNCE_ANY);
3224        blk_queue_merge_bvec(q, drbd_merge_bvec);
3225        q->queue_lock = &mdev->req_lock; /* needed since we use */
3226                /* plugging on a queue, that actually has no requests! */
3227        q->unplug_fn = drbd_unplug_fn;
3228
3229        mdev->md_io_page = alloc_page(GFP_KERNEL);
3230        if (!mdev->md_io_page)
3231                goto out_no_io_page;
3232
3233        if (drbd_bm_init(mdev))
3234                goto out_no_bitmap;
3235        /* no need to lock access, we are still initializing this minor device. */
3236        if (!tl_init(mdev))
3237                goto out_no_tl;
3238
3239        mdev->app_reads_hash = kzalloc(APP_R_HSIZE*sizeof(void *), GFP_KERNEL);
3240        if (!mdev->app_reads_hash)
3241                goto out_no_app_reads;
3242
3243        mdev->current_epoch = kzalloc(sizeof(struct drbd_epoch), GFP_KERNEL);
3244        if (!mdev->current_epoch)
3245                goto out_no_epoch;
3246
3247        INIT_LIST_HEAD(&mdev->current_epoch->list);
3248        mdev->epochs = 1;
3249
3250        return mdev;
3251
3252/* out_whatever_else:
3253        kfree(mdev->current_epoch); */
3254out_no_epoch:
3255        kfree(mdev->app_reads_hash);
3256out_no_app_reads:
3257        tl_cleanup(mdev);
3258out_no_tl:
3259        drbd_bm_cleanup(mdev);
3260out_no_bitmap:
3261        __free_page(mdev->md_io_page);
3262out_no_io_page:
3263        put_disk(disk);
3264out_no_disk:
3265        blk_cleanup_queue(q);
3266out_no_q:
3267        free_cpumask_var(mdev->cpu_mask);
3268out_no_cpumask:
3269        kfree(mdev);
3270        return NULL;
3271}
3272
3273/* counterpart of drbd_new_device.
3274 * last part of drbd_delete_device. */
3275void drbd_free_mdev(struct drbd_conf *mdev)
3276{
3277        kfree(mdev->current_epoch);
3278        kfree(mdev->app_reads_hash);
3279        tl_cleanup(mdev);
3280        if (mdev->bitmap) /* should no longer be there. */
3281                drbd_bm_cleanup(mdev);
3282        __free_page(mdev->md_io_page);
3283        put_disk(mdev->vdisk);
3284        blk_cleanup_queue(mdev->rq_queue);
3285        free_cpumask_var(mdev->cpu_mask);
3286        kfree(mdev);
3287}
3288
3289
3290int __init drbd_init(void)
3291{
3292        int err;
3293
3294        if (sizeof(struct p_handshake) != 80) {
3295                printk(KERN_ERR
3296                       "drbd: never change the size or layout "
3297                       "of the HandShake packet.\n");
3298                return -EINVAL;
3299        }
3300
3301        if (1 > minor_count || minor_count > 255) {
3302                printk(KERN_ERR
3303                        "drbd: invalid minor_count (%d)\n", minor_count);
3304#ifdef MODULE
3305                return -EINVAL;
3306#else
3307                minor_count = 8;
3308#endif
3309        }
3310
3311        err = drbd_nl_init();
3312        if (err)
3313                return err;
3314
3315        err = register_blkdev(DRBD_MAJOR, "drbd");
3316        if (err) {
3317                printk(KERN_ERR
3318                       "drbd: unable to register block device major %d\n",
3319                       DRBD_MAJOR);
3320                return err;
3321        }
3322
3323        register_reboot_notifier(&drbd_notifier);
3324
3325        /*
3326         * allocate all necessary structs
3327         */
3328        err = -ENOMEM;
3329
3330        init_waitqueue_head(&drbd_pp_wait);
3331
3332        drbd_proc = NULL; /* play safe for drbd_cleanup */
3333        minor_table = kzalloc(sizeof(struct drbd_conf *)*minor_count,
3334                                GFP_KERNEL);
3335        if (!minor_table)
3336                goto Enomem;
3337
3338        err = drbd_create_mempools();
3339        if (err)
3340                goto Enomem;
3341
3342        drbd_proc = proc_create_data("drbd", S_IFREG | S_IRUGO , NULL, &drbd_proc_fops, NULL);
3343        if (!drbd_proc) {
3344                printk(KERN_ERR "drbd: unable to register proc file\n");
3345                goto Enomem;
3346        }
3347
3348        rwlock_init(&global_state_lock);
3349
3350        printk(KERN_INFO "drbd: initialized. "
3351               "Version: " REL_VERSION " (api:%d/proto:%d-%d)\n",
3352               API_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
3353        printk(KERN_INFO "drbd: %s\n", drbd_buildtag());
3354        printk(KERN_INFO "drbd: registered as block device major %d\n",
3355                DRBD_MAJOR);
3356        printk(KERN_INFO "drbd: minor_table @ 0x%p\n", minor_table);
3357
3358        return 0; /* Success! */
3359
3360Enomem:
3361        drbd_cleanup();
3362        if (err == -ENOMEM)
3363                /* currently always the case */
3364                printk(KERN_ERR "drbd: ran out of memory\n");
3365        else
3366                printk(KERN_ERR "drbd: initialization failure\n");
3367        return err;
3368}
3369
3370void drbd_free_bc(struct drbd_backing_dev *ldev)
3371{
3372        if (ldev == NULL)
3373                return;
3374
3375        blkdev_put(ldev->backing_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3376        blkdev_put(ldev->md_bdev, FMODE_READ | FMODE_WRITE | FMODE_EXCL);
3377
3378        kfree(ldev);
3379}
3380
3381void drbd_free_sock(struct drbd_conf *mdev)
3382{
3383        if (mdev->data.socket) {
3384                mutex_lock(&mdev->data.mutex);
3385                kernel_sock_shutdown(mdev->data.socket, SHUT_RDWR);
3386                sock_release(mdev->data.socket);
3387                mdev->data.socket = NULL;
3388                mutex_unlock(&mdev->data.mutex);
3389        }
3390        if (mdev->meta.socket) {
3391                mutex_lock(&mdev->meta.mutex);
3392                kernel_sock_shutdown(mdev->meta.socket, SHUT_RDWR);
3393                sock_release(mdev->meta.socket);
3394                mdev->meta.socket = NULL;
3395                mutex_unlock(&mdev->meta.mutex);
3396        }
3397}
3398
3399
3400void drbd_free_resources(struct drbd_conf *mdev)
3401{
3402        crypto_free_hash(mdev->csums_tfm);
3403        mdev->csums_tfm = NULL;
3404        crypto_free_hash(mdev->verify_tfm);
3405        mdev->verify_tfm = NULL;
3406        crypto_free_hash(mdev->cram_hmac_tfm);
3407        mdev->cram_hmac_tfm = NULL;
3408        crypto_free_hash(mdev->integrity_w_tfm);
3409        mdev->integrity_w_tfm = NULL;
3410        crypto_free_hash(mdev->integrity_r_tfm);
3411        mdev->integrity_r_tfm = NULL;
3412
3413        drbd_free_sock(mdev);
3414
3415        __no_warn(local,
3416                  drbd_free_bc(mdev->ldev);
3417                  mdev->ldev = NULL;);
3418}
3419
3420/* meta data management */
3421
3422struct meta_data_on_disk {
3423        u64 la_size;           /* last agreed size. */
3424        u64 uuid[UI_SIZE];   /* UUIDs. */
3425        u64 device_uuid;
3426        u64 reserved_u64_1;
3427        u32 flags;             /* MDF */
3428        u32 magic;
3429        u32 md_size_sect;
3430        u32 al_offset;         /* offset to this block */
3431        u32 al_nr_extents;     /* important for restoring the AL */
3432              /* `-- act_log->nr_elements <-- sync_conf.al_extents */
3433        u32 bm_offset;         /* offset to the bitmap, from here */
3434        u32 bm_bytes_per_bit;  /* BM_BLOCK_SIZE */
3435        u32 reserved_u32[4];
3436
3437} __packed;
3438
3439/**
3440 * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
3441 * @mdev:       DRBD device.
3442 */
3443void drbd_md_sync(struct drbd_conf *mdev)
3444{
3445        struct meta_data_on_disk *buffer;
3446        sector_t sector;
3447        int i;
3448
3449        del_timer(&mdev->md_sync_timer);
3450        /* timer may be rearmed by drbd_md_mark_dirty() now. */
3451        if (!test_and_clear_bit(MD_DIRTY, &mdev->flags))
3452                return;
3453
3454        /* We use here D_FAILED and not D_ATTACHING because we try to write
3455         * metadata even if we detach due to a disk failure! */
3456        if (!get_ldev_if_state(mdev, D_FAILED))
3457                return;
3458
3459        mutex_lock(&mdev->md_io_mutex);
3460        buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3461        memset(buffer, 0, 512);
3462
3463        buffer->la_size = cpu_to_be64(drbd_get_capacity(mdev->this_bdev));
3464        for (i = UI_CURRENT; i < UI_SIZE; i++)
3465                buffer->uuid[i] = cpu_to_be64(mdev->ldev->md.uuid[i]);
3466        buffer->flags = cpu_to_be32(mdev->ldev->md.flags);
3467        buffer->magic = cpu_to_be32(DRBD_MD_MAGIC);
3468
3469        buffer->md_size_sect  = cpu_to_be32(mdev->ldev->md.md_size_sect);
3470        buffer->al_offset     = cpu_to_be32(mdev->ldev->md.al_offset);
3471        buffer->al_nr_extents = cpu_to_be32(mdev->act_log->nr_elements);
3472        buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
3473        buffer->device_uuid = cpu_to_be64(mdev->ldev->md.device_uuid);
3474
3475        buffer->bm_offset = cpu_to_be32(mdev->ldev->md.bm_offset);
3476
3477        D_ASSERT(drbd_md_ss__(mdev, mdev->ldev) == mdev->ldev->md.md_offset);
3478        sector = mdev->ldev->md.md_offset;
3479
3480        if (!drbd_md_sync_page_io(mdev, mdev->ldev, sector, WRITE)) {
3481                /* this was a try anyways ... */
3482                dev_err(DEV, "meta data update failed!\n");
3483                drbd_chk_io_error(mdev, 1, TRUE);
3484        }
3485
3486        /* Update mdev->ldev->md.la_size_sect,
3487         * since we updated it on metadata. */
3488        mdev->ldev->md.la_size_sect = drbd_get_capacity(mdev->this_bdev);
3489
3490        mutex_unlock(&mdev->md_io_mutex);
3491        put_ldev(mdev);
3492}
3493
3494/**
3495 * drbd_md_read() - Reads in the meta data super block
3496 * @mdev:       DRBD device.
3497 * @bdev:       Device from which the meta data should be read in.
3498 *
3499 * Return 0 (NO_ERROR) on success, and an enum drbd_ret_codes in case
3500 * something goes wrong.  Currently only: ERR_IO_MD_DISK, ERR_MD_INVALID.
3501 */
3502int drbd_md_read(struct drbd_conf *mdev, struct drbd_backing_dev *bdev)
3503{
3504        struct meta_data_on_disk *buffer;
3505        int i, rv = NO_ERROR;
3506
3507        if (!get_ldev_if_state(mdev, D_ATTACHING))
3508                return ERR_IO_MD_DISK;
3509
3510        mutex_lock(&mdev->md_io_mutex);
3511        buffer = (struct meta_data_on_disk *)page_address(mdev->md_io_page);
3512
3513        if (!drbd_md_sync_page_io(mdev, bdev, bdev->md.md_offset, READ)) {
3514                /* NOTE: cant do normal error processing here as this is
3515                   called BEFORE disk is attached */
3516                dev_err(DEV, "Error while reading metadata.\n");
3517                rv = ERR_IO_MD_DISK;
3518                goto err;
3519        }
3520
3521        if (be32_to_cpu(buffer->magic) != DRBD_MD_MAGIC) {
3522                dev_err(DEV, "Error while reading metadata, magic not found.\n");
3523                rv = ERR_MD_INVALID;
3524                goto err;
3525        }
3526        if (be32_to_cpu(buffer->al_offset) != bdev->md.al_offset) {
3527                dev_err(DEV, "unexpected al_offset: %d (expected %d)\n",
3528                    be32_to_cpu(buffer->al_offset), bdev->md.al_offset);
3529                rv = ERR_MD_INVALID;
3530                goto err;
3531        }
3532        if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
3533                dev_err(DEV, "unexpected bm_offset: %d (expected %d)\n",
3534                    be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
3535                rv = ERR_MD_INVALID;
3536                goto err;
3537        }
3538        if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
3539                dev_err(DEV, "unexpected md_size: %u (expected %u)\n",
3540                    be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
3541                rv = ERR_MD_INVALID;
3542                goto err;
3543        }
3544
3545        if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
3546                dev_err(DEV, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
3547                    be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
3548                rv = ERR_MD_INVALID;
3549                goto err;
3550        }
3551
3552        bdev->md.la_size_sect = be64_to_cpu(buffer->la_size);
3553        for (i = UI_CURRENT; i < UI_SIZE; i++)
3554                bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
3555        bdev->md.flags = be32_to_cpu(buffer->flags);
3556        mdev->sync_conf.al_extents = be32_to_cpu(buffer->al_nr_extents);
3557        bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
3558
3559        if (mdev->sync_conf.al_extents < 7)
3560                mdev->sync_conf.al_extents = 127;
3561
3562 err:
3563        mutex_unlock(&mdev->md_io_mutex);
3564        put_ldev(mdev);
3565
3566        return rv;
3567}
3568
3569static void debug_drbd_uuid(struct drbd_conf *mdev, enum drbd_uuid_index index)
3570{
3571        static char *uuid_str[UI_EXTENDED_SIZE] = {
3572                [UI_CURRENT] = "CURRENT",
3573                [UI_BITMAP] = "BITMAP",
3574                [UI_HISTORY_START] = "HISTORY_START",
3575                [UI_HISTORY_END] = "HISTORY_END",
3576                [UI_SIZE] = "SIZE",
3577                [UI_FLAGS] = "FLAGS",
3578        };
3579
3580        if (index >= UI_EXTENDED_SIZE) {
3581                dev_warn(DEV, " uuid_index >= EXTENDED_SIZE\n");
3582                return;
3583        }
3584
3585        dynamic_dev_dbg(DEV, " uuid[%s] now %016llX\n",
3586                 uuid_str[index],
3587                 (unsigned long long)mdev->ldev->md.uuid[index]);
3588}
3589
3590
3591/**
3592 * drbd_md_mark_dirty() - Mark meta data super block as dirty
3593 * @mdev:       DRBD device.
3594 *
3595 * Call this function if you change anything that should be written to
3596 * the meta-data super block. This function sets MD_DIRTY, and starts a
3597 * timer that ensures that within five seconds you have to call drbd_md_sync().
3598 */
3599#ifdef DEBUG
3600void drbd_md_mark_dirty_(struct drbd_conf *mdev, unsigned int line, const char *func)
3601{
3602        if (!test_and_set_bit(MD_DIRTY, &mdev->flags)) {
3603                mod_timer(&mdev->md_sync_timer, jiffies + HZ);
3604                mdev->last_md_mark_dirty.line = line;
3605                mdev->last_md_mark_dirty.func = func;
3606        }
3607}
3608#else
3609void drbd_md_mark_dirty(struct drbd_conf *mdev)
3610{
3611        if (!test_and_set_bit(MD_DIRTY, &mdev->flags))
3612                mod_timer(&mdev->md_sync_timer, jiffies + 5*HZ);
3613}
3614#endif
3615
3616static void drbd_uuid_move_history(struct drbd_conf *mdev) __must_hold(local)
3617{
3618        int i;
3619
3620        for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++) {
3621                mdev->ldev->md.uuid[i+1] = mdev->ldev->md.uuid[i];
3622                debug_drbd_uuid(mdev, i+1);
3623        }
3624}
3625
3626void _drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3627{
3628        if (idx == UI_CURRENT) {
3629                if (mdev->state.role == R_PRIMARY)
3630                        val |= 1;
3631                else
3632                        val &= ~((u64)1);
3633
3634                drbd_set_ed_uuid(mdev, val);
3635        }
3636
3637        mdev->ldev->md.uuid[idx] = val;
3638        debug_drbd_uuid(mdev, idx);
3639        drbd_md_mark_dirty(mdev);
3640}
3641
3642
3643void drbd_uuid_set(struct drbd_conf *mdev, int idx, u64 val) __must_hold(local)
3644{
3645        if (mdev->ldev->md.uuid[idx]) {
3646                drbd_uuid_move_history(mdev);
3647                mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[idx];
3648                debug_drbd_uuid(mdev, UI_HISTORY_START);
3649        }
3650        _drbd_uuid_set(mdev, idx, val);
3651}
3652
3653/**
3654 * drbd_uuid_new_current() - Creates a new current UUID
3655 * @mdev:       DRBD device.
3656 *
3657 * Creates a new current UUID, and rotates the old current UUID into
3658 * the bitmap slot. Causes an incremental resync upon next connect.
3659 */
3660void drbd_uuid_new_current(struct drbd_conf *mdev) __must_hold(local)
3661{
3662        u64 val;
3663
3664        dev_info(DEV, "Creating new current UUID\n");
3665        D_ASSERT(mdev->ldev->md.uuid[UI_BITMAP] == 0);
3666        mdev->ldev->md.uuid[UI_BITMAP] = mdev->ldev->md.uuid[UI_CURRENT];
3667        debug_drbd_uuid(mdev, UI_BITMAP);
3668
3669        get_random_bytes(&val, sizeof(u64));
3670        _drbd_uuid_set(mdev, UI_CURRENT, val);
3671        /* get it to stable storage _now_ */
3672        drbd_md_sync(mdev);
3673}
3674
3675void drbd_uuid_set_bm(struct drbd_conf *mdev, u64 val) __must_hold(local)
3676{
3677        if (mdev->ldev->md.uuid[UI_BITMAP] == 0 && val == 0)
3678                return;
3679
3680        if (val == 0) {
3681                drbd_uuid_move_history(mdev);
3682                mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
3683                mdev->ldev->md.uuid[UI_BITMAP] = 0;
3684                debug_drbd_uuid(mdev, UI_HISTORY_START);
3685                debug_drbd_uuid(mdev, UI_BITMAP);
3686        } else {
3687                if (mdev->ldev->md.uuid[UI_BITMAP])
3688                        dev_warn(DEV, "bm UUID already set");
3689
3690                mdev->ldev->md.uuid[UI_BITMAP] = val;
3691                mdev->ldev->md.uuid[UI_BITMAP] &= ~((u64)1);
3692
3693                debug_drbd_uuid(mdev, UI_BITMAP);
3694        }
3695        drbd_md_mark_dirty(mdev);
3696}
3697
3698/**
3699 * drbd_bmio_set_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3700 * @mdev:       DRBD device.
3701 *
3702 * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
3703 */
3704int drbd_bmio_set_n_write(struct drbd_conf *mdev)
3705{
3706        int rv = -EIO;
3707
3708        if (get_ldev_if_state(mdev, D_ATTACHING)) {
3709                drbd_md_set_flag(mdev, MDF_FULL_SYNC);
3710                drbd_md_sync(mdev);
3711                drbd_bm_set_all(mdev);
3712
3713                rv = drbd_bm_write(mdev);
3714
3715                if (!rv) {
3716                        drbd_md_clear_flag(mdev, MDF_FULL_SYNC);
3717                        drbd_md_sync(mdev);
3718                }
3719
3720                put_ldev(mdev);
3721        }
3722
3723        return rv;
3724}
3725
3726/**
3727 * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
3728 * @mdev:       DRBD device.
3729 *
3730 * Clears all bits in the bitmap and writes the whole bitmap to stable storage.
3731 */
3732int drbd_bmio_clear_n_write(struct drbd_conf *mdev)
3733{
3734        int rv = -EIO;
3735
3736        drbd_resume_al(mdev);
3737        if (get_ldev_if_state(mdev, D_ATTACHING)) {
3738                drbd_bm_clear_all(mdev);
3739                rv = drbd_bm_write(mdev);
3740                put_ldev(mdev);
3741        }
3742
3743        return rv;
3744}
3745
3746static int w_bitmap_io(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3747{
3748        struct bm_io_work *work = container_of(w, struct bm_io_work, w);
3749        int rv;
3750
3751        D_ASSERT(atomic_read(&mdev->ap_bio_cnt) == 0);
3752
3753        drbd_bm_lock(mdev, work->why);
3754        rv = work->io_fn(mdev);
3755        drbd_bm_unlock(mdev);
3756
3757        clear_bit(BITMAP_IO, &mdev->flags);
3758        wake_up(&mdev->misc_wait);
3759
3760        if (work->done)
3761                work->done(mdev, rv);
3762
3763        clear_bit(BITMAP_IO_QUEUED, &mdev->flags);
3764        work->why = NULL;
3765
3766        return 1;
3767}
3768
3769void drbd_ldev_destroy(struct drbd_conf *mdev)
3770{
3771        lc_destroy(mdev->resync);
3772        mdev->resync = NULL;
3773        lc_destroy(mdev->act_log);
3774        mdev->act_log = NULL;
3775        __no_warn(local,
3776                drbd_free_bc(mdev->ldev);
3777                mdev->ldev = NULL;);
3778
3779        if (mdev->md_io_tmpp) {
3780                __free_page(mdev->md_io_tmpp);
3781                mdev->md_io_tmpp = NULL;
3782        }
3783        clear_bit(GO_DISKLESS, &mdev->flags);
3784}
3785
3786static int w_go_diskless(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3787{
3788        D_ASSERT(mdev->state.disk == D_FAILED);
3789        /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
3790         * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
3791         * the protected members anymore, though, so once put_ldev reaches zero
3792         * again, it will be safe to free them. */
3793        drbd_force_state(mdev, NS(disk, D_DISKLESS));
3794        return 1;
3795}
3796
3797void drbd_go_diskless(struct drbd_conf *mdev)
3798{
3799        D_ASSERT(mdev->state.disk == D_FAILED);
3800        if (!test_and_set_bit(GO_DISKLESS, &mdev->flags))
3801                drbd_queue_work(&mdev->data.work, &mdev->go_diskless);
3802}
3803
3804/**
3805 * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
3806 * @mdev:       DRBD device.
3807 * @io_fn:      IO callback to be called when bitmap IO is possible
3808 * @done:       callback to be called after the bitmap IO was performed
3809 * @why:        Descriptive text of the reason for doing the IO
3810 *
3811 * While IO on the bitmap happens we freeze application IO thus we ensure
3812 * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
3813 * called from worker context. It MUST NOT be used while a previous such
3814 * work is still pending!
3815 */
3816void drbd_queue_bitmap_io(struct drbd_conf *mdev,
3817                          int (*io_fn)(struct drbd_conf *),
3818                          void (*done)(struct drbd_conf *, int),
3819                          char *why)
3820{
3821        D_ASSERT(current == mdev->worker.task);
3822
3823        D_ASSERT(!test_bit(BITMAP_IO_QUEUED, &mdev->flags));
3824        D_ASSERT(!test_bit(BITMAP_IO, &mdev->flags));
3825        D_ASSERT(list_empty(&mdev->bm_io_work.w.list));
3826        if (mdev->bm_io_work.why)
3827                dev_err(DEV, "FIXME going to queue '%s' but '%s' still pending?\n",
3828                        why, mdev->bm_io_work.why);
3829
3830        mdev->bm_io_work.io_fn = io_fn;
3831        mdev->bm_io_work.done = done;
3832        mdev->bm_io_work.why = why;
3833
3834        set_bit(BITMAP_IO, &mdev->flags);
3835        if (atomic_read(&mdev->ap_bio_cnt) == 0) {
3836                if (list_empty(&mdev->bm_io_work.w.list)) {
3837                        set_bit(BITMAP_IO_QUEUED, &mdev->flags);
3838                        drbd_queue_work(&mdev->data.work, &mdev->bm_io_work.w);
3839                } else
3840                        dev_err(DEV, "FIXME avoided double queuing bm_io_work\n");
3841        }
3842}
3843
3844/**
3845 * drbd_bitmap_io() -  Does an IO operation on the whole bitmap
3846 * @mdev:       DRBD device.
3847 * @io_fn:      IO callback to be called when bitmap IO is possible
3848 * @why:        Descriptive text of the reason for doing the IO
3849 *
3850 * freezes application IO while that the actual IO operations runs. This
3851 * functions MAY NOT be called from worker context.
3852 */
3853int drbd_bitmap_io(struct drbd_conf *mdev, int (*io_fn)(struct drbd_conf *), char *why)
3854{
3855        int rv;
3856
3857        D_ASSERT(current != mdev->worker.task);
3858
3859        drbd_suspend_io(mdev);
3860
3861        drbd_bm_lock(mdev, why);
3862        rv = io_fn(mdev);
3863        drbd_bm_unlock(mdev);
3864
3865        drbd_resume_io(mdev);
3866
3867        return rv;
3868}
3869
3870void drbd_md_set_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3871{
3872        if ((mdev->ldev->md.flags & flag) != flag) {
3873                drbd_md_mark_dirty(mdev);
3874                mdev->ldev->md.flags |= flag;
3875        }
3876}
3877
3878void drbd_md_clear_flag(struct drbd_conf *mdev, int flag) __must_hold(local)
3879{
3880        if ((mdev->ldev->md.flags & flag) != 0) {
3881                drbd_md_mark_dirty(mdev);
3882                mdev->ldev->md.flags &= ~flag;
3883        }
3884}
3885int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
3886{
3887        return (bdev->md.flags & flag) != 0;
3888}
3889
3890static void md_sync_timer_fn(unsigned long data)
3891{
3892        struct drbd_conf *mdev = (struct drbd_conf *) data;
3893
3894        drbd_queue_work_front(&mdev->data.work, &mdev->md_sync_work);
3895}
3896
3897static int w_md_sync(struct drbd_conf *mdev, struct drbd_work *w, int unused)
3898{
3899        dev_warn(DEV, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
3900#ifdef DEBUG
3901        dev_warn(DEV, "last md_mark_dirty: %s:%u\n",
3902                mdev->last_md_mark_dirty.func, mdev->last_md_mark_dirty.line);
3903#endif
3904        drbd_md_sync(mdev);
3905        return 1;
3906}
3907
3908#ifdef CONFIG_DRBD_FAULT_INJECTION
3909/* Fault insertion support including random number generator shamelessly
3910 * stolen from kernel/rcutorture.c */
3911struct fault_random_state {
3912        unsigned long state;
3913        unsigned long count;
3914};
3915
3916#define FAULT_RANDOM_MULT 39916801  /* prime */
3917#define FAULT_RANDOM_ADD        479001701 /* prime */
3918#define FAULT_RANDOM_REFRESH 10000
3919
3920/*
3921 * Crude but fast random-number generator.  Uses a linear congruential
3922 * generator, with occasional help from get_random_bytes().
3923 */
3924static unsigned long
3925_drbd_fault_random(struct fault_random_state *rsp)
3926{
3927        long refresh;
3928
3929        if (!rsp->count--) {
3930                get_random_bytes(&refresh, sizeof(refresh));
3931                rsp->state += refresh;
3932                rsp->count = FAULT_RANDOM_REFRESH;
3933        }
3934        rsp->state = rsp->state * FAULT_RANDOM_MULT + FAULT_RANDOM_ADD;
3935        return swahw32(rsp->state);
3936}
3937
3938static char *
3939_drbd_fault_str(unsigned int type) {
3940        static char *_faults[] = {
3941                [DRBD_FAULT_MD_WR] = "Meta-data write",
3942                [DRBD_FAULT_MD_RD] = "Meta-data read",
3943                [DRBD_FAULT_RS_WR] = "Resync write",
3944                [DRBD_FAULT_RS_RD] = "Resync read",
3945                [DRBD_FAULT_DT_WR] = "Data write",
3946                [DRBD_FAULT_DT_RD] = "Data read",
3947                [DRBD_FAULT_DT_RA] = "Data read ahead",
3948                [DRBD_FAULT_BM_ALLOC] = "BM allocation",
3949                [DRBD_FAULT_AL_EE] = "EE allocation",
3950                [DRBD_FAULT_RECEIVE] = "receive data corruption",
3951        };
3952
3953        return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
3954}
3955
3956unsigned int
3957_drbd_insert_fault(struct drbd_conf *mdev, unsigned int type)
3958{
3959        static struct fault_random_state rrs = {0, 0};
3960
3961        unsigned int ret = (
3962                (fault_devs == 0 ||
3963                        ((1 << mdev_to_minor(mdev)) & fault_devs) != 0) &&
3964                (((_drbd_fault_random(&rrs) % 100) + 1) <= fault_rate));
3965
3966        if (ret) {
3967                fault_count++;
3968
3969                if (__ratelimit(&drbd_ratelimit_state))
3970                        dev_warn(DEV, "***Simulating %s failure\n",
3971                                _drbd_fault_str(type));
3972        }
3973
3974        return ret;
3975}
3976#endif
3977
3978const char *drbd_buildtag(void)
3979{
3980        /* DRBD built from external sources has here a reference to the
3981           git hash of the source code. */
3982
3983        static char buildtag[38] = "\0uilt-in";
3984
3985        if (buildtag[0] == 0) {
3986#ifdef CONFIG_MODULES
3987                if (THIS_MODULE != NULL)
3988                        sprintf(buildtag, "srcversion: %-24s", THIS_MODULE->srcversion);
3989                else
3990#endif
3991                        buildtag[0] = 'b';
3992        }
3993
3994        return buildtag;
3995}
3996
3997module_init(drbd_init)
3998module_exit(drbd_cleanup)
3999
4000EXPORT_SYMBOL(drbd_conn_str);
4001EXPORT_SYMBOL(drbd_role_str);
4002EXPORT_SYMBOL(drbd_disk_str);
4003EXPORT_SYMBOL(drbd_set_st_err_str);
4004