linux/drivers/block/drbd/drbd_worker.c
<<
>>
Prefs
   1/*
   2   drbd_worker.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23
  24 */
  25
  26#include <linux/module.h>
  27#include <linux/drbd.h>
  28#include <linux/sched.h>
  29#include <linux/wait.h>
  30#include <linux/mm.h>
  31#include <linux/memcontrol.h>
  32#include <linux/mm_inline.h>
  33#include <linux/slab.h>
  34#include <linux/random.h>
  35#include <linux/string.h>
  36#include <linux/scatterlist.h>
  37
  38#include "drbd_int.h"
  39#include "drbd_req.h"
  40
  41static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
  42
  43
  44
  45/* defined here:
  46   drbd_md_io_complete
  47   drbd_endio_sec
  48   drbd_endio_pri
  49
  50 * more endio handlers:
  51   atodb_endio in drbd_actlog.c
  52   drbd_bm_async_io_complete in drbd_bitmap.c
  53
  54 * For all these callbacks, note the following:
  55 * The callbacks will be called in irq context by the IDE drivers,
  56 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
  57 * Try to get the locking right :)
  58 *
  59 */
  60
  61
  62/* About the global_state_lock
  63   Each state transition on an device holds a read lock. In case we have
  64   to evaluate the sync after dependencies, we grab a write lock, because
  65   we need stable states on all devices for that.  */
  66rwlock_t global_state_lock;
  67
  68/* used for synchronous meta data and bitmap IO
  69 * submitted by drbd_md_sync_page_io()
  70 */
  71void drbd_md_io_complete(struct bio *bio, int error)
  72{
  73        struct drbd_md_io *md_io;
  74
  75        md_io = (struct drbd_md_io *)bio->bi_private;
  76        md_io->error = error;
  77
  78        complete(&md_io->event);
  79}
  80
  81/* reads on behalf of the partner,
  82 * "submitted" by the receiver
  83 */
  84void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
  85{
  86        unsigned long flags = 0;
  87        struct drbd_conf *mdev = e->mdev;
  88
  89        D_ASSERT(e->block_id != ID_VACANT);
  90
  91        spin_lock_irqsave(&mdev->req_lock, flags);
  92        mdev->read_cnt += e->size >> 9;
  93        list_del(&e->w.list);
  94        if (list_empty(&mdev->read_ee))
  95                wake_up(&mdev->ee_wait);
  96        if (test_bit(__EE_WAS_ERROR, &e->flags))
  97                __drbd_chk_io_error(mdev, FALSE);
  98        spin_unlock_irqrestore(&mdev->req_lock, flags);
  99
 100        drbd_queue_work(&mdev->data.work, &e->w);
 101        put_ldev(mdev);
 102}
 103
 104/* writes on behalf of the partner, or resync writes,
 105 * "submitted" by the receiver, final stage.  */
 106static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
 107{
 108        unsigned long flags = 0;
 109        struct drbd_conf *mdev = e->mdev;
 110        sector_t e_sector;
 111        int do_wake;
 112        int is_syncer_req;
 113        int do_al_complete_io;
 114
 115        D_ASSERT(e->block_id != ID_VACANT);
 116
 117        /* after we moved e to done_ee,
 118         * we may no longer access it,
 119         * it may be freed/reused already!
 120         * (as soon as we release the req_lock) */
 121        e_sector = e->sector;
 122        do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
 123        is_syncer_req = is_syncer_block_id(e->block_id);
 124
 125        spin_lock_irqsave(&mdev->req_lock, flags);
 126        mdev->writ_cnt += e->size >> 9;
 127        list_del(&e->w.list); /* has been on active_ee or sync_ee */
 128        list_add_tail(&e->w.list, &mdev->done_ee);
 129
 130        /* No hlist_del_init(&e->colision) here, we did not send the Ack yet,
 131         * neither did we wake possibly waiting conflicting requests.
 132         * done from "drbd_process_done_ee" within the appropriate w.cb
 133         * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
 134
 135        do_wake = is_syncer_req
 136                ? list_empty(&mdev->sync_ee)
 137                : list_empty(&mdev->active_ee);
 138
 139        if (test_bit(__EE_WAS_ERROR, &e->flags))
 140                __drbd_chk_io_error(mdev, FALSE);
 141        spin_unlock_irqrestore(&mdev->req_lock, flags);
 142
 143        if (is_syncer_req)
 144                drbd_rs_complete_io(mdev, e_sector);
 145
 146        if (do_wake)
 147                wake_up(&mdev->ee_wait);
 148
 149        if (do_al_complete_io)
 150                drbd_al_complete_io(mdev, e_sector);
 151
 152        wake_asender(mdev);
 153        put_ldev(mdev);
 154}
 155
 156/* writes on behalf of the partner, or resync writes,
 157 * "submitted" by the receiver.
 158 */
 159void drbd_endio_sec(struct bio *bio, int error)
 160{
 161        struct drbd_epoch_entry *e = bio->bi_private;
 162        struct drbd_conf *mdev = e->mdev;
 163        int uptodate = bio_flagged(bio, BIO_UPTODATE);
 164        int is_write = bio_data_dir(bio) == WRITE;
 165
 166        if (error)
 167                dev_warn(DEV, "%s: error=%d s=%llus\n",
 168                                is_write ? "write" : "read", error,
 169                                (unsigned long long)e->sector);
 170        if (!error && !uptodate) {
 171                dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
 172                                is_write ? "write" : "read",
 173                                (unsigned long long)e->sector);
 174                /* strange behavior of some lower level drivers...
 175                 * fail the request by clearing the uptodate flag,
 176                 * but do not return any error?! */
 177                error = -EIO;
 178        }
 179
 180        if (error)
 181                set_bit(__EE_WAS_ERROR, &e->flags);
 182
 183        bio_put(bio); /* no need for the bio anymore */
 184        if (atomic_dec_and_test(&e->pending_bios)) {
 185                if (is_write)
 186                        drbd_endio_write_sec_final(e);
 187                else
 188                        drbd_endio_read_sec_final(e);
 189        }
 190}
 191
 192/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
 193 */
 194void drbd_endio_pri(struct bio *bio, int error)
 195{
 196        unsigned long flags;
 197        struct drbd_request *req = bio->bi_private;
 198        struct drbd_conf *mdev = req->mdev;
 199        struct bio_and_error m;
 200        enum drbd_req_event what;
 201        int uptodate = bio_flagged(bio, BIO_UPTODATE);
 202
 203        if (!error && !uptodate) {
 204                dev_warn(DEV, "p %s: setting error to -EIO\n",
 205                         bio_data_dir(bio) == WRITE ? "write" : "read");
 206                /* strange behavior of some lower level drivers...
 207                 * fail the request by clearing the uptodate flag,
 208                 * but do not return any error?! */
 209                error = -EIO;
 210        }
 211
 212        /* to avoid recursion in __req_mod */
 213        if (unlikely(error)) {
 214                what = (bio_data_dir(bio) == WRITE)
 215                        ? write_completed_with_error
 216                        : (bio_rw(bio) == READ)
 217                          ? read_completed_with_error
 218                          : read_ahead_completed_with_error;
 219        } else
 220                what = completed_ok;
 221
 222        bio_put(req->private_bio);
 223        req->private_bio = ERR_PTR(error);
 224
 225        /* not req_mod(), we need irqsave here! */
 226        spin_lock_irqsave(&mdev->req_lock, flags);
 227        __req_mod(req, what, &m);
 228        spin_unlock_irqrestore(&mdev->req_lock, flags);
 229
 230        if (m.bio)
 231                complete_master_bio(mdev, &m);
 232}
 233
 234int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 235{
 236        struct drbd_request *req = container_of(w, struct drbd_request, w);
 237
 238        /* We should not detach for read io-error,
 239         * but try to WRITE the P_DATA_REPLY to the failed location,
 240         * to give the disk the chance to relocate that block */
 241
 242        spin_lock_irq(&mdev->req_lock);
 243        if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
 244                _req_mod(req, read_retry_remote_canceled);
 245                spin_unlock_irq(&mdev->req_lock);
 246                return 1;
 247        }
 248        spin_unlock_irq(&mdev->req_lock);
 249
 250        return w_send_read_req(mdev, w, 0);
 251}
 252
 253int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 254{
 255        ERR_IF(cancel) return 1;
 256        dev_err(DEV, "resync inactive, but callback triggered??\n");
 257        return 1; /* Simply ignore this! */
 258}
 259
 260void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
 261{
 262        struct hash_desc desc;
 263        struct scatterlist sg;
 264        struct page *page = e->pages;
 265        struct page *tmp;
 266        unsigned len;
 267
 268        desc.tfm = tfm;
 269        desc.flags = 0;
 270
 271        sg_init_table(&sg, 1);
 272        crypto_hash_init(&desc);
 273
 274        while ((tmp = page_chain_next(page))) {
 275                /* all but the last page will be fully used */
 276                sg_set_page(&sg, page, PAGE_SIZE, 0);
 277                crypto_hash_update(&desc, &sg, sg.length);
 278                page = tmp;
 279        }
 280        /* and now the last, possibly only partially used page */
 281        len = e->size & (PAGE_SIZE - 1);
 282        sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
 283        crypto_hash_update(&desc, &sg, sg.length);
 284        crypto_hash_final(&desc, digest);
 285}
 286
 287void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
 288{
 289        struct hash_desc desc;
 290        struct scatterlist sg;
 291        struct bio_vec *bvec;
 292        int i;
 293
 294        desc.tfm = tfm;
 295        desc.flags = 0;
 296
 297        sg_init_table(&sg, 1);
 298        crypto_hash_init(&desc);
 299
 300        __bio_for_each_segment(bvec, bio, i, 0) {
 301                sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
 302                crypto_hash_update(&desc, &sg, sg.length);
 303        }
 304        crypto_hash_final(&desc, digest);
 305}
 306
 307static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 308{
 309        struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
 310        int digest_size;
 311        void *digest;
 312        int ok;
 313
 314        D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
 315
 316        if (unlikely(cancel)) {
 317                drbd_free_ee(mdev, e);
 318                return 1;
 319        }
 320
 321        if (likely((e->flags & EE_WAS_ERROR) == 0)) {
 322                digest_size = crypto_hash_digestsize(mdev->csums_tfm);
 323                digest = kmalloc(digest_size, GFP_NOIO);
 324                if (digest) {
 325                        drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
 326
 327                        inc_rs_pending(mdev);
 328                        ok = drbd_send_drequest_csum(mdev,
 329                                                     e->sector,
 330                                                     e->size,
 331                                                     digest,
 332                                                     digest_size,
 333                                                     P_CSUM_RS_REQUEST);
 334                        kfree(digest);
 335                } else {
 336                        dev_err(DEV, "kmalloc() of digest failed.\n");
 337                        ok = 0;
 338                }
 339        } else
 340                ok = 1;
 341
 342        drbd_free_ee(mdev, e);
 343
 344        if (unlikely(!ok))
 345                dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
 346        return ok;
 347}
 348
 349#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
 350
 351static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
 352{
 353        struct drbd_epoch_entry *e;
 354
 355        if (!get_ldev(mdev))
 356                return -EIO;
 357
 358        if (drbd_rs_should_slow_down(mdev))
 359                goto defer;
 360
 361        /* GFP_TRY, because if there is no memory available right now, this may
 362         * be rescheduled for later. It is "only" background resync, after all. */
 363        e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
 364        if (!e)
 365                goto defer;
 366
 367        e->w.cb = w_e_send_csum;
 368        spin_lock_irq(&mdev->req_lock);
 369        list_add(&e->w.list, &mdev->read_ee);
 370        spin_unlock_irq(&mdev->req_lock);
 371
 372        atomic_add(size >> 9, &mdev->rs_sect_ev);
 373        if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
 374                return 0;
 375
 376        /* drbd_submit_ee currently fails for one reason only:
 377         * not being able to allocate enough bios.
 378         * Is dropping the connection going to help? */
 379        spin_lock_irq(&mdev->req_lock);
 380        list_del(&e->w.list);
 381        spin_unlock_irq(&mdev->req_lock);
 382
 383        drbd_free_ee(mdev, e);
 384defer:
 385        put_ldev(mdev);
 386        return -EAGAIN;
 387}
 388
 389void resync_timer_fn(unsigned long data)
 390{
 391        struct drbd_conf *mdev = (struct drbd_conf *) data;
 392        int queue;
 393
 394        queue = 1;
 395        switch (mdev->state.conn) {
 396        case C_VERIFY_S:
 397                mdev->resync_work.cb = w_make_ov_request;
 398                break;
 399        case C_SYNC_TARGET:
 400                mdev->resync_work.cb = w_make_resync_request;
 401                break;
 402        default:
 403                queue = 0;
 404                mdev->resync_work.cb = w_resync_inactive;
 405        }
 406
 407        /* harmless race: list_empty outside data.work.q_lock */
 408        if (list_empty(&mdev->resync_work.list) && queue)
 409                drbd_queue_work(&mdev->data.work, &mdev->resync_work);
 410}
 411
 412static void fifo_set(struct fifo_buffer *fb, int value)
 413{
 414        int i;
 415
 416        for (i = 0; i < fb->size; i++)
 417                fb->values[i] = value;
 418}
 419
 420static int fifo_push(struct fifo_buffer *fb, int value)
 421{
 422        int ov;
 423
 424        ov = fb->values[fb->head_index];
 425        fb->values[fb->head_index++] = value;
 426
 427        if (fb->head_index >= fb->size)
 428                fb->head_index = 0;
 429
 430        return ov;
 431}
 432
 433static void fifo_add_val(struct fifo_buffer *fb, int value)
 434{
 435        int i;
 436
 437        for (i = 0; i < fb->size; i++)
 438                fb->values[i] += value;
 439}
 440
 441int drbd_rs_controller(struct drbd_conf *mdev)
 442{
 443        unsigned int sect_in;  /* Number of sectors that came in since the last turn */
 444        unsigned int want;     /* The number of sectors we want in the proxy */
 445        int req_sect; /* Number of sectors to request in this turn */
 446        int correction; /* Number of sectors more we need in the proxy*/
 447        int cps; /* correction per invocation of drbd_rs_controller() */
 448        int steps; /* Number of time steps to plan ahead */
 449        int curr_corr;
 450        int max_sect;
 451
 452        sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
 453        mdev->rs_in_flight -= sect_in;
 454
 455        spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
 456
 457        steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
 458
 459        if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
 460                want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
 461        } else { /* normal path */
 462                want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
 463                        sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
 464        }
 465
 466        correction = want - mdev->rs_in_flight - mdev->rs_planed;
 467
 468        /* Plan ahead */
 469        cps = correction / steps;
 470        fifo_add_val(&mdev->rs_plan_s, cps);
 471        mdev->rs_planed += cps * steps;
 472
 473        /* What we do in this step */
 474        curr_corr = fifo_push(&mdev->rs_plan_s, 0);
 475        spin_unlock(&mdev->peer_seq_lock);
 476        mdev->rs_planed -= curr_corr;
 477
 478        req_sect = sect_in + curr_corr;
 479        if (req_sect < 0)
 480                req_sect = 0;
 481
 482        max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
 483        if (req_sect > max_sect)
 484                req_sect = max_sect;
 485
 486        /*
 487        dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
 488                 sect_in, mdev->rs_in_flight, want, correction,
 489                 steps, cps, mdev->rs_planed, curr_corr, req_sect);
 490        */
 491
 492        return req_sect;
 493}
 494
 495int w_make_resync_request(struct drbd_conf *mdev,
 496                struct drbd_work *w, int cancel)
 497{
 498        unsigned long bit;
 499        sector_t sector;
 500        const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
 501        int max_segment_size;
 502        int number, rollback_i, size, pe, mx;
 503        int align, queued, sndbuf;
 504        int i = 0;
 505
 506        if (unlikely(cancel))
 507                return 1;
 508
 509        if (unlikely(mdev->state.conn < C_CONNECTED)) {
 510                dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected");
 511                return 0;
 512        }
 513
 514        if (mdev->state.conn != C_SYNC_TARGET)
 515                dev_err(DEV, "%s in w_make_resync_request\n",
 516                        drbd_conn_str(mdev->state.conn));
 517
 518        if (mdev->rs_total == 0) {
 519                /* empty resync? */
 520                drbd_resync_finished(mdev);
 521                return 1;
 522        }
 523
 524        if (!get_ldev(mdev)) {
 525                /* Since we only need to access mdev->rsync a
 526                   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
 527                   to continue resync with a broken disk makes no sense at
 528                   all */
 529                dev_err(DEV, "Disk broke down during resync!\n");
 530                mdev->resync_work.cb = w_resync_inactive;
 531                return 1;
 532        }
 533
 534        /* starting with drbd 8.3.8, we can handle multi-bio EEs,
 535         * if it should be necessary */
 536        max_segment_size =
 537                mdev->agreed_pro_version < 94 ? queue_max_segment_size(mdev->rq_queue) :
 538                mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_SEGMENT_SIZE;
 539
 540        if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
 541                number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
 542                mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
 543        } else {
 544                mdev->c_sync_rate = mdev->sync_conf.rate;
 545                number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
 546        }
 547
 548        /* Throttle resync on lower level disk activity, which may also be
 549         * caused by application IO on Primary/SyncTarget.
 550         * Keep this after the call to drbd_rs_controller, as that assumes
 551         * to be called as precisely as possible every SLEEP_TIME,
 552         * and would be confused otherwise. */
 553        if (drbd_rs_should_slow_down(mdev))
 554                goto requeue;
 555
 556        mutex_lock(&mdev->data.mutex);
 557        if (mdev->data.socket)
 558                mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req);
 559        else
 560                mx = 1;
 561        mutex_unlock(&mdev->data.mutex);
 562
 563        /* For resync rates >160MB/sec, allow more pending RS requests */
 564        if (number > mx)
 565                mx = number;
 566
 567        /* Limit the number of pending RS requests to no more than the peer's receive buffer */
 568        pe = atomic_read(&mdev->rs_pending_cnt);
 569        if ((pe + number) > mx) {
 570                number = mx - pe;
 571        }
 572
 573        for (i = 0; i < number; i++) {
 574                /* Stop generating RS requests, when half of the send buffer is filled */
 575                mutex_lock(&mdev->data.mutex);
 576                if (mdev->data.socket) {
 577                        queued = mdev->data.socket->sk->sk_wmem_queued;
 578                        sndbuf = mdev->data.socket->sk->sk_sndbuf;
 579                } else {
 580                        queued = 1;
 581                        sndbuf = 0;
 582                }
 583                mutex_unlock(&mdev->data.mutex);
 584                if (queued > sndbuf / 2)
 585                        goto requeue;
 586
 587next_sector:
 588                size = BM_BLOCK_SIZE;
 589                bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
 590
 591                if (bit == -1UL) {
 592                        mdev->bm_resync_fo = drbd_bm_bits(mdev);
 593                        mdev->resync_work.cb = w_resync_inactive;
 594                        put_ldev(mdev);
 595                        return 1;
 596                }
 597
 598                sector = BM_BIT_TO_SECT(bit);
 599
 600                if (drbd_try_rs_begin_io(mdev, sector)) {
 601                        mdev->bm_resync_fo = bit;
 602                        goto requeue;
 603                }
 604                mdev->bm_resync_fo = bit + 1;
 605
 606                if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
 607                        drbd_rs_complete_io(mdev, sector);
 608                        goto next_sector;
 609                }
 610
 611#if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE
 612                /* try to find some adjacent bits.
 613                 * we stop if we have already the maximum req size.
 614                 *
 615                 * Additionally always align bigger requests, in order to
 616                 * be prepared for all stripe sizes of software RAIDs.
 617                 */
 618                align = 1;
 619                rollback_i = i;
 620                for (;;) {
 621                        if (size + BM_BLOCK_SIZE > max_segment_size)
 622                                break;
 623
 624                        /* Be always aligned */
 625                        if (sector & ((1<<(align+3))-1))
 626                                break;
 627
 628                        /* do not cross extent boundaries */
 629                        if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
 630                                break;
 631                        /* now, is it actually dirty, after all?
 632                         * caution, drbd_bm_test_bit is tri-state for some
 633                         * obscure reason; ( b == 0 ) would get the out-of-band
 634                         * only accidentally right because of the "oddly sized"
 635                         * adjustment below */
 636                        if (drbd_bm_test_bit(mdev, bit+1) != 1)
 637                                break;
 638                        bit++;
 639                        size += BM_BLOCK_SIZE;
 640                        if ((BM_BLOCK_SIZE << align) <= size)
 641                                align++;
 642                        i++;
 643                }
 644                /* if we merged some,
 645                 * reset the offset to start the next drbd_bm_find_next from */
 646                if (size > BM_BLOCK_SIZE)
 647                        mdev->bm_resync_fo = bit + 1;
 648#endif
 649
 650                /* adjust very last sectors, in case we are oddly sized */
 651                if (sector + (size>>9) > capacity)
 652                        size = (capacity-sector)<<9;
 653                if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
 654                        switch (read_for_csum(mdev, sector, size)) {
 655                        case -EIO: /* Disk failure */
 656                                put_ldev(mdev);
 657                                return 0;
 658                        case -EAGAIN: /* allocation failed, or ldev busy */
 659                                drbd_rs_complete_io(mdev, sector);
 660                                mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
 661                                i = rollback_i;
 662                                goto requeue;
 663                        case 0:
 664                                /* everything ok */
 665                                break;
 666                        default:
 667                                BUG();
 668                        }
 669                } else {
 670                        inc_rs_pending(mdev);
 671                        if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
 672                                               sector, size, ID_SYNCER)) {
 673                                dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
 674                                dec_rs_pending(mdev);
 675                                put_ldev(mdev);
 676                                return 0;
 677                        }
 678                }
 679        }
 680
 681        if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
 682                /* last syncer _request_ was sent,
 683                 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
 684                 * next sync group will resume), as soon as we receive the last
 685                 * resync data block, and the last bit is cleared.
 686                 * until then resync "work" is "inactive" ...
 687                 */
 688                mdev->resync_work.cb = w_resync_inactive;
 689                put_ldev(mdev);
 690                return 1;
 691        }
 692
 693 requeue:
 694        mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 695        mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
 696        put_ldev(mdev);
 697        return 1;
 698}
 699
 700static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 701{
 702        int number, i, size;
 703        sector_t sector;
 704        const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
 705
 706        if (unlikely(cancel))
 707                return 1;
 708
 709        if (unlikely(mdev->state.conn < C_CONNECTED)) {
 710                dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected");
 711                return 0;
 712        }
 713
 714        number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ);
 715        if (atomic_read(&mdev->rs_pending_cnt) > number)
 716                goto requeue;
 717
 718        number -= atomic_read(&mdev->rs_pending_cnt);
 719
 720        sector = mdev->ov_position;
 721        for (i = 0; i < number; i++) {
 722                if (sector >= capacity) {
 723                        mdev->resync_work.cb = w_resync_inactive;
 724                        return 1;
 725                }
 726
 727                size = BM_BLOCK_SIZE;
 728
 729                if (drbd_try_rs_begin_io(mdev, sector)) {
 730                        mdev->ov_position = sector;
 731                        goto requeue;
 732                }
 733
 734                if (sector + (size>>9) > capacity)
 735                        size = (capacity-sector)<<9;
 736
 737                inc_rs_pending(mdev);
 738                if (!drbd_send_ov_request(mdev, sector, size)) {
 739                        dec_rs_pending(mdev);
 740                        return 0;
 741                }
 742                sector += BM_SECT_PER_BIT;
 743        }
 744        mdev->ov_position = sector;
 745
 746 requeue:
 747        mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
 748        return 1;
 749}
 750
 751
 752int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 753{
 754        kfree(w);
 755        ov_oos_print(mdev);
 756        drbd_resync_finished(mdev);
 757
 758        return 1;
 759}
 760
 761static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 762{
 763        kfree(w);
 764
 765        drbd_resync_finished(mdev);
 766
 767        return 1;
 768}
 769
 770static void ping_peer(struct drbd_conf *mdev)
 771{
 772        clear_bit(GOT_PING_ACK, &mdev->flags);
 773        request_ping(mdev);
 774        wait_event(mdev->misc_wait,
 775                   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
 776}
 777
 778int drbd_resync_finished(struct drbd_conf *mdev)
 779{
 780        unsigned long db, dt, dbdt;
 781        unsigned long n_oos;
 782        union drbd_state os, ns;
 783        struct drbd_work *w;
 784        char *khelper_cmd = NULL;
 785
 786        /* Remove all elements from the resync LRU. Since future actions
 787         * might set bits in the (main) bitmap, then the entries in the
 788         * resync LRU would be wrong. */
 789        if (drbd_rs_del_all(mdev)) {
 790                /* In case this is not possible now, most probably because
 791                 * there are P_RS_DATA_REPLY Packets lingering on the worker's
 792                 * queue (or even the read operations for those packets
 793                 * is not finished by now).   Retry in 100ms. */
 794
 795                drbd_kick_lo(mdev);
 796                __set_current_state(TASK_INTERRUPTIBLE);
 797                schedule_timeout(HZ / 10);
 798                w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
 799                if (w) {
 800                        w->cb = w_resync_finished;
 801                        drbd_queue_work(&mdev->data.work, w);
 802                        return 1;
 803                }
 804                dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
 805        }
 806
 807        dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
 808        if (dt <= 0)
 809                dt = 1;
 810        db = mdev->rs_total;
 811        dbdt = Bit2KB(db/dt);
 812        mdev->rs_paused /= HZ;
 813
 814        if (!get_ldev(mdev))
 815                goto out;
 816
 817        ping_peer(mdev);
 818
 819        spin_lock_irq(&mdev->req_lock);
 820        os = mdev->state;
 821
 822        /* This protects us against multiple calls (that can happen in the presence
 823           of application IO), and against connectivity loss just before we arrive here. */
 824        if (os.conn <= C_CONNECTED)
 825                goto out_unlock;
 826
 827        ns = os;
 828        ns.conn = C_CONNECTED;
 829
 830        dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
 831             (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ?
 832             "Online verify " : "Resync",
 833             dt + mdev->rs_paused, mdev->rs_paused, dbdt);
 834
 835        n_oos = drbd_bm_total_weight(mdev);
 836
 837        if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
 838                if (n_oos) {
 839                        dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
 840                              n_oos, Bit2KB(1));
 841                        khelper_cmd = "out-of-sync";
 842                }
 843        } else {
 844                D_ASSERT((n_oos - mdev->rs_failed) == 0);
 845
 846                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
 847                        khelper_cmd = "after-resync-target";
 848
 849                if (mdev->csums_tfm && mdev->rs_total) {
 850                        const unsigned long s = mdev->rs_same_csum;
 851                        const unsigned long t = mdev->rs_total;
 852                        const int ratio =
 853                                (t == 0)     ? 0 :
 854                        (t < 100000) ? ((s*100)/t) : (s/(t/100));
 855                        dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; "
 856                             "transferred %luK total %luK\n",
 857                             ratio,
 858                             Bit2KB(mdev->rs_same_csum),
 859                             Bit2KB(mdev->rs_total - mdev->rs_same_csum),
 860                             Bit2KB(mdev->rs_total));
 861                }
 862        }
 863
 864        if (mdev->rs_failed) {
 865                dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
 866
 867                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 868                        ns.disk = D_INCONSISTENT;
 869                        ns.pdsk = D_UP_TO_DATE;
 870                } else {
 871                        ns.disk = D_UP_TO_DATE;
 872                        ns.pdsk = D_INCONSISTENT;
 873                }
 874        } else {
 875                ns.disk = D_UP_TO_DATE;
 876                ns.pdsk = D_UP_TO_DATE;
 877
 878                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 879                        if (mdev->p_uuid) {
 880                                int i;
 881                                for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
 882                                        _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
 883                                drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
 884                                _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
 885                        } else {
 886                                dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
 887                        }
 888                }
 889
 890                drbd_uuid_set_bm(mdev, 0UL);
 891
 892                if (mdev->p_uuid) {
 893                        /* Now the two UUID sets are equal, update what we
 894                         * know of the peer. */
 895                        int i;
 896                        for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
 897                                mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
 898                }
 899        }
 900
 901        _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
 902out_unlock:
 903        spin_unlock_irq(&mdev->req_lock);
 904        put_ldev(mdev);
 905out:
 906        mdev->rs_total  = 0;
 907        mdev->rs_failed = 0;
 908        mdev->rs_paused = 0;
 909        mdev->ov_start_sector = 0;
 910
 911        drbd_md_sync(mdev);
 912
 913        if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) {
 914                dev_info(DEV, "Writing the whole bitmap\n");
 915                drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished");
 916        }
 917
 918        if (khelper_cmd)
 919                drbd_khelper(mdev, khelper_cmd);
 920
 921        return 1;
 922}
 923
 924/* helper */
 925static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
 926{
 927        if (drbd_ee_has_active_page(e)) {
 928                /* This might happen if sendpage() has not finished */
 929                int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
 930                atomic_add(i, &mdev->pp_in_use_by_net);
 931                atomic_sub(i, &mdev->pp_in_use);
 932                spin_lock_irq(&mdev->req_lock);
 933                list_add_tail(&e->w.list, &mdev->net_ee);
 934                spin_unlock_irq(&mdev->req_lock);
 935                wake_up(&drbd_pp_wait);
 936        } else
 937                drbd_free_ee(mdev, e);
 938}
 939
 940/**
 941 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
 942 * @mdev:       DRBD device.
 943 * @w:          work object.
 944 * @cancel:     The connection will be closed anyways
 945 */
 946int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 947{
 948        struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
 949        int ok;
 950
 951        if (unlikely(cancel)) {
 952                drbd_free_ee(mdev, e);
 953                dec_unacked(mdev);
 954                return 1;
 955        }
 956
 957        if (likely((e->flags & EE_WAS_ERROR) == 0)) {
 958                ok = drbd_send_block(mdev, P_DATA_REPLY, e);
 959        } else {
 960                if (__ratelimit(&drbd_ratelimit_state))
 961                        dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
 962                            (unsigned long long)e->sector);
 963
 964                ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
 965        }
 966
 967        dec_unacked(mdev);
 968
 969        move_to_net_ee_or_free(mdev, e);
 970
 971        if (unlikely(!ok))
 972                dev_err(DEV, "drbd_send_block() failed\n");
 973        return ok;
 974}
 975
 976/**
 977 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
 978 * @mdev:       DRBD device.
 979 * @w:          work object.
 980 * @cancel:     The connection will be closed anyways
 981 */
 982int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 983{
 984        struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
 985        int ok;
 986
 987        if (unlikely(cancel)) {
 988                drbd_free_ee(mdev, e);
 989                dec_unacked(mdev);
 990                return 1;
 991        }
 992
 993        if (get_ldev_if_state(mdev, D_FAILED)) {
 994                drbd_rs_complete_io(mdev, e->sector);
 995                put_ldev(mdev);
 996        }
 997
 998        if (likely((e->flags & EE_WAS_ERROR) == 0)) {
 999                if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1000                        inc_rs_pending(mdev);
1001                        ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1002                } else {
1003                        if (__ratelimit(&drbd_ratelimit_state))
1004                                dev_err(DEV, "Not sending RSDataReply, "
1005                                    "partner DISKLESS!\n");
1006                        ok = 1;
1007                }
1008        } else {
1009                if (__ratelimit(&drbd_ratelimit_state))
1010                        dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1011                            (unsigned long long)e->sector);
1012
1013                ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1014
1015                /* update resync data with failure */
1016                drbd_rs_failed_io(mdev, e->sector, e->size);
1017        }
1018
1019        dec_unacked(mdev);
1020
1021        move_to_net_ee_or_free(mdev, e);
1022
1023        if (unlikely(!ok))
1024                dev_err(DEV, "drbd_send_block() failed\n");
1025        return ok;
1026}
1027
1028int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1029{
1030        struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1031        struct digest_info *di;
1032        int digest_size;
1033        void *digest = NULL;
1034        int ok, eq = 0;
1035
1036        if (unlikely(cancel)) {
1037                drbd_free_ee(mdev, e);
1038                dec_unacked(mdev);
1039                return 1;
1040        }
1041
1042        if (get_ldev(mdev)) {
1043                drbd_rs_complete_io(mdev, e->sector);
1044                put_ldev(mdev);
1045        }
1046
1047        di = e->digest;
1048
1049        if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1050                /* quick hack to try to avoid a race against reconfiguration.
1051                 * a real fix would be much more involved,
1052                 * introducing more locking mechanisms */
1053                if (mdev->csums_tfm) {
1054                        digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1055                        D_ASSERT(digest_size == di->digest_size);
1056                        digest = kmalloc(digest_size, GFP_NOIO);
1057                }
1058                if (digest) {
1059                        drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1060                        eq = !memcmp(digest, di->digest, digest_size);
1061                        kfree(digest);
1062                }
1063
1064                if (eq) {
1065                        drbd_set_in_sync(mdev, e->sector, e->size);
1066                        /* rs_same_csums unit is BM_BLOCK_SIZE */
1067                        mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1068                        ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1069                } else {
1070                        inc_rs_pending(mdev);
1071                        e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1072                        e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1073                        kfree(di);
1074                        ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1075                }
1076        } else {
1077                ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1078                if (__ratelimit(&drbd_ratelimit_state))
1079                        dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1080        }
1081
1082        dec_unacked(mdev);
1083        move_to_net_ee_or_free(mdev, e);
1084
1085        if (unlikely(!ok))
1086                dev_err(DEV, "drbd_send_block/ack() failed\n");
1087        return ok;
1088}
1089
1090int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1091{
1092        struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1093        int digest_size;
1094        void *digest;
1095        int ok = 1;
1096
1097        if (unlikely(cancel))
1098                goto out;
1099
1100        if (unlikely((e->flags & EE_WAS_ERROR) != 0))
1101                goto out;
1102
1103        digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1104        /* FIXME if this allocation fails, online verify will not terminate! */
1105        digest = kmalloc(digest_size, GFP_NOIO);
1106        if (digest) {
1107                drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1108                inc_rs_pending(mdev);
1109                ok = drbd_send_drequest_csum(mdev, e->sector, e->size,
1110                                             digest, digest_size, P_OV_REPLY);
1111                if (!ok)
1112                        dec_rs_pending(mdev);
1113                kfree(digest);
1114        }
1115
1116out:
1117        drbd_free_ee(mdev, e);
1118
1119        dec_unacked(mdev);
1120
1121        return ok;
1122}
1123
1124void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1125{
1126        if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1127                mdev->ov_last_oos_size += size>>9;
1128        } else {
1129                mdev->ov_last_oos_start = sector;
1130                mdev->ov_last_oos_size = size>>9;
1131        }
1132        drbd_set_out_of_sync(mdev, sector, size);
1133        set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags);
1134}
1135
1136int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1137{
1138        struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1139        struct digest_info *di;
1140        int digest_size;
1141        void *digest;
1142        int ok, eq = 0;
1143
1144        if (unlikely(cancel)) {
1145                drbd_free_ee(mdev, e);
1146                dec_unacked(mdev);
1147                return 1;
1148        }
1149
1150        /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1151         * the resync lru has been cleaned up already */
1152        if (get_ldev(mdev)) {
1153                drbd_rs_complete_io(mdev, e->sector);
1154                put_ldev(mdev);
1155        }
1156
1157        di = e->digest;
1158
1159        if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1160                digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1161                digest = kmalloc(digest_size, GFP_NOIO);
1162                if (digest) {
1163                        drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1164
1165                        D_ASSERT(digest_size == di->digest_size);
1166                        eq = !memcmp(digest, di->digest, digest_size);
1167                        kfree(digest);
1168                }
1169        } else {
1170                ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1171                if (__ratelimit(&drbd_ratelimit_state))
1172                        dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1173        }
1174
1175        dec_unacked(mdev);
1176        if (!eq)
1177                drbd_ov_oos_found(mdev, e->sector, e->size);
1178        else
1179                ov_oos_print(mdev);
1180
1181        ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size,
1182                              eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1183
1184        drbd_free_ee(mdev, e);
1185
1186        if (--mdev->ov_left == 0) {
1187                ov_oos_print(mdev);
1188                drbd_resync_finished(mdev);
1189        }
1190
1191        return ok;
1192}
1193
1194int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1195{
1196        struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1197        complete(&b->done);
1198        return 1;
1199}
1200
1201int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1202{
1203        struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1204        struct p_barrier *p = &mdev->data.sbuf.barrier;
1205        int ok = 1;
1206
1207        /* really avoid racing with tl_clear.  w.cb may have been referenced
1208         * just before it was reassigned and re-queued, so double check that.
1209         * actually, this race was harmless, since we only try to send the
1210         * barrier packet here, and otherwise do nothing with the object.
1211         * but compare with the head of w_clear_epoch */
1212        spin_lock_irq(&mdev->req_lock);
1213        if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1214                cancel = 1;
1215        spin_unlock_irq(&mdev->req_lock);
1216        if (cancel)
1217                return 1;
1218
1219        if (!drbd_get_data_sock(mdev))
1220                return 0;
1221        p->barrier = b->br_number;
1222        /* inc_ap_pending was done where this was queued.
1223         * dec_ap_pending will be done in got_BarrierAck
1224         * or (on connection loss) in w_clear_epoch.  */
1225        ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1226                                (struct p_header80 *)p, sizeof(*p), 0);
1227        drbd_put_data_sock(mdev);
1228
1229        return ok;
1230}
1231
1232int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1233{
1234        if (cancel)
1235                return 1;
1236        return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1237}
1238
1239/**
1240 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1241 * @mdev:       DRBD device.
1242 * @w:          work object.
1243 * @cancel:     The connection will be closed anyways
1244 */
1245int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1246{
1247        struct drbd_request *req = container_of(w, struct drbd_request, w);
1248        int ok;
1249
1250        if (unlikely(cancel)) {
1251                req_mod(req, send_canceled);
1252                return 1;
1253        }
1254
1255        ok = drbd_send_dblock(mdev, req);
1256        req_mod(req, ok ? handed_over_to_network : send_failed);
1257
1258        return ok;
1259}
1260
1261/**
1262 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1263 * @mdev:       DRBD device.
1264 * @w:          work object.
1265 * @cancel:     The connection will be closed anyways
1266 */
1267int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1268{
1269        struct drbd_request *req = container_of(w, struct drbd_request, w);
1270        int ok;
1271
1272        if (unlikely(cancel)) {
1273                req_mod(req, send_canceled);
1274                return 1;
1275        }
1276
1277        ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1278                                (unsigned long)req);
1279
1280        if (!ok) {
1281                /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1282                 * so this is probably redundant */
1283                if (mdev->state.conn >= C_CONNECTED)
1284                        drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1285        }
1286        req_mod(req, ok ? handed_over_to_network : send_failed);
1287
1288        return ok;
1289}
1290
1291int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1292{
1293        struct drbd_request *req = container_of(w, struct drbd_request, w);
1294
1295        if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1296                drbd_al_begin_io(mdev, req->sector);
1297        /* Calling drbd_al_begin_io() out of the worker might deadlocks
1298           theoretically. Practically it can not deadlock, since this is
1299           only used when unfreezing IOs. All the extents of the requests
1300           that made it into the TL are already active */
1301
1302        drbd_req_make_private_bio(req, req->master_bio);
1303        req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1304        generic_make_request(req->private_bio);
1305
1306        return 1;
1307}
1308
1309static int _drbd_may_sync_now(struct drbd_conf *mdev)
1310{
1311        struct drbd_conf *odev = mdev;
1312
1313        while (1) {
1314                if (odev->sync_conf.after == -1)
1315                        return 1;
1316                odev = minor_to_mdev(odev->sync_conf.after);
1317                ERR_IF(!odev) return 1;
1318                if ((odev->state.conn >= C_SYNC_SOURCE &&
1319                     odev->state.conn <= C_PAUSED_SYNC_T) ||
1320                    odev->state.aftr_isp || odev->state.peer_isp ||
1321                    odev->state.user_isp)
1322                        return 0;
1323        }
1324}
1325
1326/**
1327 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1328 * @mdev:       DRBD device.
1329 *
1330 * Called from process context only (admin command and after_state_ch).
1331 */
1332static int _drbd_pause_after(struct drbd_conf *mdev)
1333{
1334        struct drbd_conf *odev;
1335        int i, rv = 0;
1336
1337        for (i = 0; i < minor_count; i++) {
1338                odev = minor_to_mdev(i);
1339                if (!odev)
1340                        continue;
1341                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1342                        continue;
1343                if (!_drbd_may_sync_now(odev))
1344                        rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1345                               != SS_NOTHING_TO_DO);
1346        }
1347
1348        return rv;
1349}
1350
1351/**
1352 * _drbd_resume_next() - Resume resync on all devices that may resync now
1353 * @mdev:       DRBD device.
1354 *
1355 * Called from process context only (admin command and worker).
1356 */
1357static int _drbd_resume_next(struct drbd_conf *mdev)
1358{
1359        struct drbd_conf *odev;
1360        int i, rv = 0;
1361
1362        for (i = 0; i < minor_count; i++) {
1363                odev = minor_to_mdev(i);
1364                if (!odev)
1365                        continue;
1366                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1367                        continue;
1368                if (odev->state.aftr_isp) {
1369                        if (_drbd_may_sync_now(odev))
1370                                rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1371                                                        CS_HARD, NULL)
1372                                       != SS_NOTHING_TO_DO) ;
1373                }
1374        }
1375        return rv;
1376}
1377
1378void resume_next_sg(struct drbd_conf *mdev)
1379{
1380        write_lock_irq(&global_state_lock);
1381        _drbd_resume_next(mdev);
1382        write_unlock_irq(&global_state_lock);
1383}
1384
1385void suspend_other_sg(struct drbd_conf *mdev)
1386{
1387        write_lock_irq(&global_state_lock);
1388        _drbd_pause_after(mdev);
1389        write_unlock_irq(&global_state_lock);
1390}
1391
1392static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1393{
1394        struct drbd_conf *odev;
1395
1396        if (o_minor == -1)
1397                return NO_ERROR;
1398        if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1399                return ERR_SYNC_AFTER;
1400
1401        /* check for loops */
1402        odev = minor_to_mdev(o_minor);
1403        while (1) {
1404                if (odev == mdev)
1405                        return ERR_SYNC_AFTER_CYCLE;
1406
1407                /* dependency chain ends here, no cycles. */
1408                if (odev->sync_conf.after == -1)
1409                        return NO_ERROR;
1410
1411                /* follow the dependency chain */
1412                odev = minor_to_mdev(odev->sync_conf.after);
1413        }
1414}
1415
1416int drbd_alter_sa(struct drbd_conf *mdev, int na)
1417{
1418        int changes;
1419        int retcode;
1420
1421        write_lock_irq(&global_state_lock);
1422        retcode = sync_after_error(mdev, na);
1423        if (retcode == NO_ERROR) {
1424                mdev->sync_conf.after = na;
1425                do {
1426                        changes  = _drbd_pause_after(mdev);
1427                        changes |= _drbd_resume_next(mdev);
1428                } while (changes);
1429        }
1430        write_unlock_irq(&global_state_lock);
1431        return retcode;
1432}
1433
1434/**
1435 * drbd_start_resync() - Start the resync process
1436 * @mdev:       DRBD device.
1437 * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1438 *
1439 * This function might bring you directly into one of the
1440 * C_PAUSED_SYNC_* states.
1441 */
1442void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1443{
1444        union drbd_state ns;
1445        int r;
1446
1447        if (mdev->state.conn >= C_SYNC_SOURCE) {
1448                dev_err(DEV, "Resync already running!\n");
1449                return;
1450        }
1451
1452        /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1453        drbd_rs_cancel_all(mdev);
1454
1455        if (side == C_SYNC_TARGET) {
1456                /* Since application IO was locked out during C_WF_BITMAP_T and
1457                   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1458                   we check that we might make the data inconsistent. */
1459                r = drbd_khelper(mdev, "before-resync-target");
1460                r = (r >> 8) & 0xff;
1461                if (r > 0) {
1462                        dev_info(DEV, "before-resync-target handler returned %d, "
1463                             "dropping connection.\n", r);
1464                        drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1465                        return;
1466                }
1467        }
1468
1469        drbd_state_lock(mdev);
1470
1471        if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1472                drbd_state_unlock(mdev);
1473                return;
1474        }
1475
1476        if (side == C_SYNC_TARGET) {
1477                mdev->bm_resync_fo = 0;
1478        } else /* side == C_SYNC_SOURCE */ {
1479                u64 uuid;
1480
1481                get_random_bytes(&uuid, sizeof(u64));
1482                drbd_uuid_set(mdev, UI_BITMAP, uuid);
1483                drbd_send_sync_uuid(mdev, uuid);
1484
1485                D_ASSERT(mdev->state.disk == D_UP_TO_DATE);
1486        }
1487
1488        write_lock_irq(&global_state_lock);
1489        ns = mdev->state;
1490
1491        ns.aftr_isp = !_drbd_may_sync_now(mdev);
1492
1493        ns.conn = side;
1494
1495        if (side == C_SYNC_TARGET)
1496                ns.disk = D_INCONSISTENT;
1497        else /* side == C_SYNC_SOURCE */
1498                ns.pdsk = D_INCONSISTENT;
1499
1500        r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1501        ns = mdev->state;
1502
1503        if (ns.conn < C_CONNECTED)
1504                r = SS_UNKNOWN_ERROR;
1505
1506        if (r == SS_SUCCESS) {
1507                unsigned long tw = drbd_bm_total_weight(mdev);
1508                unsigned long now = jiffies;
1509                int i;
1510
1511                mdev->rs_failed    = 0;
1512                mdev->rs_paused    = 0;
1513                mdev->rs_same_csum = 0;
1514                mdev->rs_last_events = 0;
1515                mdev->rs_last_sect_ev = 0;
1516                mdev->rs_total     = tw;
1517                mdev->rs_start     = now;
1518                for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1519                        mdev->rs_mark_left[i] = tw;
1520                        mdev->rs_mark_time[i] = now;
1521                }
1522                _drbd_pause_after(mdev);
1523        }
1524        write_unlock_irq(&global_state_lock);
1525        put_ldev(mdev);
1526
1527        if (r == SS_SUCCESS) {
1528                dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1529                     drbd_conn_str(ns.conn),
1530                     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1531                     (unsigned long) mdev->rs_total);
1532
1533                if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1534                        /* This still has a race (about when exactly the peers
1535                         * detect connection loss) that can lead to a full sync
1536                         * on next handshake. In 8.3.9 we fixed this with explicit
1537                         * resync-finished notifications, but the fix
1538                         * introduces a protocol change.  Sleeping for some
1539                         * time longer than the ping interval + timeout on the
1540                         * SyncSource, to give the SyncTarget the chance to
1541                         * detect connection loss, then waiting for a ping
1542                         * response (implicit in drbd_resync_finished) reduces
1543                         * the race considerably, but does not solve it. */
1544                        if (side == C_SYNC_SOURCE)
1545                                schedule_timeout_interruptible(
1546                                        mdev->net_conf->ping_int * HZ +
1547                                        mdev->net_conf->ping_timeo*HZ/9);
1548                        drbd_resync_finished(mdev);
1549                }
1550
1551                atomic_set(&mdev->rs_sect_in, 0);
1552                atomic_set(&mdev->rs_sect_ev, 0);
1553                mdev->rs_in_flight = 0;
1554                mdev->rs_planed = 0;
1555                spin_lock(&mdev->peer_seq_lock);
1556                fifo_set(&mdev->rs_plan_s, 0);
1557                spin_unlock(&mdev->peer_seq_lock);
1558                /* ns.conn may already be != mdev->state.conn,
1559                 * we may have been paused in between, or become paused until
1560                 * the timer triggers.
1561                 * No matter, that is handled in resync_timer_fn() */
1562                if (ns.conn == C_SYNC_TARGET)
1563                        mod_timer(&mdev->resync_timer, jiffies);
1564
1565                drbd_md_sync(mdev);
1566        }
1567        drbd_state_unlock(mdev);
1568}
1569
1570int drbd_worker(struct drbd_thread *thi)
1571{
1572        struct drbd_conf *mdev = thi->mdev;
1573        struct drbd_work *w = NULL;
1574        LIST_HEAD(work_list);
1575        int intr = 0, i;
1576
1577        sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1578
1579        while (get_t_state(thi) == Running) {
1580                drbd_thread_current_set_cpu(mdev);
1581
1582                if (down_trylock(&mdev->data.work.s)) {
1583                        mutex_lock(&mdev->data.mutex);
1584                        if (mdev->data.socket && !mdev->net_conf->no_cork)
1585                                drbd_tcp_uncork(mdev->data.socket);
1586                        mutex_unlock(&mdev->data.mutex);
1587
1588                        intr = down_interruptible(&mdev->data.work.s);
1589
1590                        mutex_lock(&mdev->data.mutex);
1591                        if (mdev->data.socket  && !mdev->net_conf->no_cork)
1592                                drbd_tcp_cork(mdev->data.socket);
1593                        mutex_unlock(&mdev->data.mutex);
1594                }
1595
1596                if (intr) {
1597                        D_ASSERT(intr == -EINTR);
1598                        flush_signals(current);
1599                        ERR_IF (get_t_state(thi) == Running)
1600                                continue;
1601                        break;
1602                }
1603
1604                if (get_t_state(thi) != Running)
1605                        break;
1606                /* With this break, we have done a down() but not consumed
1607                   the entry from the list. The cleanup code takes care of
1608                   this...   */
1609
1610                w = NULL;
1611                spin_lock_irq(&mdev->data.work.q_lock);
1612                ERR_IF(list_empty(&mdev->data.work.q)) {
1613                        /* something terribly wrong in our logic.
1614                         * we were able to down() the semaphore,
1615                         * but the list is empty... doh.
1616                         *
1617                         * what is the best thing to do now?
1618                         * try again from scratch, restarting the receiver,
1619                         * asender, whatnot? could break even more ugly,
1620                         * e.g. when we are primary, but no good local data.
1621                         *
1622                         * I'll try to get away just starting over this loop.
1623                         */
1624                        spin_unlock_irq(&mdev->data.work.q_lock);
1625                        continue;
1626                }
1627                w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1628                list_del_init(&w->list);
1629                spin_unlock_irq(&mdev->data.work.q_lock);
1630
1631                if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1632                        /* dev_warn(DEV, "worker: a callback failed! \n"); */
1633                        if (mdev->state.conn >= C_CONNECTED)
1634                                drbd_force_state(mdev,
1635                                                NS(conn, C_NETWORK_FAILURE));
1636                }
1637        }
1638        D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1639        D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1640
1641        spin_lock_irq(&mdev->data.work.q_lock);
1642        i = 0;
1643        while (!list_empty(&mdev->data.work.q)) {
1644                list_splice_init(&mdev->data.work.q, &work_list);
1645                spin_unlock_irq(&mdev->data.work.q_lock);
1646
1647                while (!list_empty(&work_list)) {
1648                        w = list_entry(work_list.next, struct drbd_work, list);
1649                        list_del_init(&w->list);
1650                        w->cb(mdev, w, 1);
1651                        i++; /* dead debugging code */
1652                }
1653
1654                spin_lock_irq(&mdev->data.work.q_lock);
1655        }
1656        sema_init(&mdev->data.work.s, 0);
1657        /* DANGEROUS race: if someone did queue his work within the spinlock,
1658         * but up() ed outside the spinlock, we could get an up() on the
1659         * semaphore without corresponding list entry.
1660         * So don't do that.
1661         */
1662        spin_unlock_irq(&mdev->data.work.q_lock);
1663
1664        D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1665        /* _drbd_set_state only uses stop_nowait.
1666         * wait here for the Exiting receiver. */
1667        drbd_thread_stop(&mdev->receiver);
1668        drbd_mdev_cleanup(mdev);
1669
1670        dev_info(DEV, "worker terminated\n");
1671
1672        clear_bit(DEVICE_DYING, &mdev->flags);
1673        clear_bit(CONFIG_PENDING, &mdev->flags);
1674        wake_up(&mdev->state_wait);
1675
1676        return 0;
1677}
1678