linux/drivers/block/drbd/drbd_worker.c
<<
>>
Prefs
   1/*
   2   drbd_worker.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23
  24 */
  25
  26#include <linux/module.h>
  27#include <linux/drbd.h>
  28#include <linux/sched.h>
  29#include <linux/wait.h>
  30#include <linux/mm.h>
  31#include <linux/memcontrol.h>
  32#include <linux/mm_inline.h>
  33#include <linux/slab.h>
  34#include <linux/random.h>
  35#include <linux/string.h>
  36#include <linux/scatterlist.h>
  37
  38#include "drbd_int.h"
  39#include "drbd_req.h"
  40
  41static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel);
  42static int w_make_resync_request(struct drbd_conf *mdev,
  43                                 struct drbd_work *w, int cancel);
  44
  45
  46
  47/* endio handlers:
  48 *   drbd_md_io_complete (defined here)
  49 *   drbd_endio_pri (defined here)
  50 *   drbd_endio_sec (defined here)
  51 *   bm_async_io_complete (defined in drbd_bitmap.c)
  52 *
  53 * For all these callbacks, note the following:
  54 * The callbacks will be called in irq context by the IDE drivers,
  55 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
  56 * Try to get the locking right :)
  57 *
  58 */
  59
  60
  61/* About the global_state_lock
  62   Each state transition on an device holds a read lock. In case we have
  63   to evaluate the sync after dependencies, we grab a write lock, because
  64   we need stable states on all devices for that.  */
  65rwlock_t global_state_lock;
  66
  67/* used for synchronous meta data and bitmap IO
  68 * submitted by drbd_md_sync_page_io()
  69 */
  70void drbd_md_io_complete(struct bio *bio, int error)
  71{
  72        struct drbd_md_io *md_io;
  73
  74        md_io = (struct drbd_md_io *)bio->bi_private;
  75        md_io->error = error;
  76
  77        complete(&md_io->event);
  78}
  79
  80/* reads on behalf of the partner,
  81 * "submitted" by the receiver
  82 */
  83void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local)
  84{
  85        unsigned long flags = 0;
  86        struct drbd_conf *mdev = e->mdev;
  87
  88        D_ASSERT(e->block_id != ID_VACANT);
  89
  90        spin_lock_irqsave(&mdev->req_lock, flags);
  91        mdev->read_cnt += e->size >> 9;
  92        list_del(&e->w.list);
  93        if (list_empty(&mdev->read_ee))
  94                wake_up(&mdev->ee_wait);
  95        if (test_bit(__EE_WAS_ERROR, &e->flags))
  96                __drbd_chk_io_error(mdev, false);
  97        spin_unlock_irqrestore(&mdev->req_lock, flags);
  98
  99        drbd_queue_work(&mdev->data.work, &e->w);
 100        put_ldev(mdev);
 101}
 102
 103/* writes on behalf of the partner, or resync writes,
 104 * "submitted" by the receiver, final stage.  */
 105static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local)
 106{
 107        unsigned long flags = 0;
 108        struct drbd_conf *mdev = e->mdev;
 109        sector_t e_sector;
 110        int do_wake;
 111        int is_syncer_req;
 112        int do_al_complete_io;
 113
 114        D_ASSERT(e->block_id != ID_VACANT);
 115
 116        /* after we moved e to done_ee,
 117         * we may no longer access it,
 118         * it may be freed/reused already!
 119         * (as soon as we release the req_lock) */
 120        e_sector = e->sector;
 121        do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO;
 122        is_syncer_req = is_syncer_block_id(e->block_id);
 123
 124        spin_lock_irqsave(&mdev->req_lock, flags);
 125        mdev->writ_cnt += e->size >> 9;
 126        list_del(&e->w.list); /* has been on active_ee or sync_ee */
 127        list_add_tail(&e->w.list, &mdev->done_ee);
 128
 129        /* No hlist_del_init(&e->collision) here, we did not send the Ack yet,
 130         * neither did we wake possibly waiting conflicting requests.
 131         * done from "drbd_process_done_ee" within the appropriate w.cb
 132         * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */
 133
 134        do_wake = is_syncer_req
 135                ? list_empty(&mdev->sync_ee)
 136                : list_empty(&mdev->active_ee);
 137
 138        if (test_bit(__EE_WAS_ERROR, &e->flags))
 139                __drbd_chk_io_error(mdev, false);
 140        spin_unlock_irqrestore(&mdev->req_lock, flags);
 141
 142        if (is_syncer_req)
 143                drbd_rs_complete_io(mdev, e_sector);
 144
 145        if (do_wake)
 146                wake_up(&mdev->ee_wait);
 147
 148        if (do_al_complete_io)
 149                drbd_al_complete_io(mdev, e_sector);
 150
 151        wake_asender(mdev);
 152        put_ldev(mdev);
 153}
 154
 155/* writes on behalf of the partner, or resync writes,
 156 * "submitted" by the receiver.
 157 */
 158void drbd_endio_sec(struct bio *bio, int error)
 159{
 160        struct drbd_epoch_entry *e = bio->bi_private;
 161        struct drbd_conf *mdev = e->mdev;
 162        int uptodate = bio_flagged(bio, BIO_UPTODATE);
 163        int is_write = bio_data_dir(bio) == WRITE;
 164
 165        if (error && __ratelimit(&drbd_ratelimit_state))
 166                dev_warn(DEV, "%s: error=%d s=%llus\n",
 167                                is_write ? "write" : "read", error,
 168                                (unsigned long long)e->sector);
 169        if (!error && !uptodate) {
 170                if (__ratelimit(&drbd_ratelimit_state))
 171                        dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
 172                                        is_write ? "write" : "read",
 173                                        (unsigned long long)e->sector);
 174                /* strange behavior of some lower level drivers...
 175                 * fail the request by clearing the uptodate flag,
 176                 * but do not return any error?! */
 177                error = -EIO;
 178        }
 179
 180        if (error)
 181                set_bit(__EE_WAS_ERROR, &e->flags);
 182
 183        bio_put(bio); /* no need for the bio anymore */
 184        if (atomic_dec_and_test(&e->pending_bios)) {
 185                if (is_write)
 186                        drbd_endio_write_sec_final(e);
 187                else
 188                        drbd_endio_read_sec_final(e);
 189        }
 190}
 191
 192/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
 193 */
 194void drbd_endio_pri(struct bio *bio, int error)
 195{
 196        unsigned long flags;
 197        struct drbd_request *req = bio->bi_private;
 198        struct drbd_conf *mdev = req->mdev;
 199        struct bio_and_error m;
 200        enum drbd_req_event what;
 201        int uptodate = bio_flagged(bio, BIO_UPTODATE);
 202
 203        if (!error && !uptodate) {
 204                dev_warn(DEV, "p %s: setting error to -EIO\n",
 205                         bio_data_dir(bio) == WRITE ? "write" : "read");
 206                /* strange behavior of some lower level drivers...
 207                 * fail the request by clearing the uptodate flag,
 208                 * but do not return any error?! */
 209                error = -EIO;
 210        }
 211
 212        /* to avoid recursion in __req_mod */
 213        if (unlikely(error)) {
 214                what = (bio_data_dir(bio) == WRITE)
 215                        ? write_completed_with_error
 216                        : (bio_rw(bio) == READ)
 217                          ? read_completed_with_error
 218                          : read_ahead_completed_with_error;
 219        } else
 220                what = completed_ok;
 221
 222        bio_put(req->private_bio);
 223        req->private_bio = ERR_PTR(error);
 224
 225        /* not req_mod(), we need irqsave here! */
 226        spin_lock_irqsave(&mdev->req_lock, flags);
 227        __req_mod(req, what, &m);
 228        spin_unlock_irqrestore(&mdev->req_lock, flags);
 229
 230        if (m.bio)
 231                complete_master_bio(mdev, &m);
 232}
 233
 234int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 235{
 236        struct drbd_request *req = container_of(w, struct drbd_request, w);
 237
 238        /* We should not detach for read io-error,
 239         * but try to WRITE the P_DATA_REPLY to the failed location,
 240         * to give the disk the chance to relocate that block */
 241
 242        spin_lock_irq(&mdev->req_lock);
 243        if (cancel || mdev->state.pdsk != D_UP_TO_DATE) {
 244                _req_mod(req, read_retry_remote_canceled);
 245                spin_unlock_irq(&mdev->req_lock);
 246                return 1;
 247        }
 248        spin_unlock_irq(&mdev->req_lock);
 249
 250        return w_send_read_req(mdev, w, 0);
 251}
 252
 253void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest)
 254{
 255        struct hash_desc desc;
 256        struct scatterlist sg;
 257        struct page *page = e->pages;
 258        struct page *tmp;
 259        unsigned len;
 260
 261        desc.tfm = tfm;
 262        desc.flags = 0;
 263
 264        sg_init_table(&sg, 1);
 265        crypto_hash_init(&desc);
 266
 267        while ((tmp = page_chain_next(page))) {
 268                /* all but the last page will be fully used */
 269                sg_set_page(&sg, page, PAGE_SIZE, 0);
 270                crypto_hash_update(&desc, &sg, sg.length);
 271                page = tmp;
 272        }
 273        /* and now the last, possibly only partially used page */
 274        len = e->size & (PAGE_SIZE - 1);
 275        sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
 276        crypto_hash_update(&desc, &sg, sg.length);
 277        crypto_hash_final(&desc, digest);
 278}
 279
 280void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
 281{
 282        struct hash_desc desc;
 283        struct scatterlist sg;
 284        struct bio_vec *bvec;
 285        int i;
 286
 287        desc.tfm = tfm;
 288        desc.flags = 0;
 289
 290        sg_init_table(&sg, 1);
 291        crypto_hash_init(&desc);
 292
 293        __bio_for_each_segment(bvec, bio, i, 0) {
 294                sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
 295                crypto_hash_update(&desc, &sg, sg.length);
 296        }
 297        crypto_hash_final(&desc, digest);
 298}
 299
 300/* TODO merge common code with w_e_end_ov_req */
 301int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 302{
 303        struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
 304        int digest_size;
 305        void *digest;
 306        int ok = 1;
 307
 308        D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef);
 309
 310        if (unlikely(cancel))
 311                goto out;
 312
 313        if (likely((e->flags & EE_WAS_ERROR) != 0))
 314                goto out;
 315
 316        digest_size = crypto_hash_digestsize(mdev->csums_tfm);
 317        digest = kmalloc(digest_size, GFP_NOIO);
 318        if (digest) {
 319                sector_t sector = e->sector;
 320                unsigned int size = e->size;
 321                drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
 322                /* Free e and pages before send.
 323                 * In case we block on congestion, we could otherwise run into
 324                 * some distributed deadlock, if the other side blocks on
 325                 * congestion as well, because our receiver blocks in
 326                 * drbd_pp_alloc due to pp_in_use > max_buffers. */
 327                drbd_free_ee(mdev, e);
 328                e = NULL;
 329                inc_rs_pending(mdev);
 330                ok = drbd_send_drequest_csum(mdev, sector, size,
 331                                             digest, digest_size,
 332                                             P_CSUM_RS_REQUEST);
 333                kfree(digest);
 334        } else {
 335                dev_err(DEV, "kmalloc() of digest failed.\n");
 336                ok = 0;
 337        }
 338
 339out:
 340        if (e)
 341                drbd_free_ee(mdev, e);
 342
 343        if (unlikely(!ok))
 344                dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
 345        return ok;
 346}
 347
 348#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
 349
 350static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
 351{
 352        struct drbd_epoch_entry *e;
 353
 354        if (!get_ldev(mdev))
 355                return -EIO;
 356
 357        if (drbd_rs_should_slow_down(mdev, sector))
 358                goto defer;
 359
 360        /* GFP_TRY, because if there is no memory available right now, this may
 361         * be rescheduled for later. It is "only" background resync, after all. */
 362        e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY);
 363        if (!e)
 364                goto defer;
 365
 366        e->w.cb = w_e_send_csum;
 367        spin_lock_irq(&mdev->req_lock);
 368        list_add(&e->w.list, &mdev->read_ee);
 369        spin_unlock_irq(&mdev->req_lock);
 370
 371        atomic_add(size >> 9, &mdev->rs_sect_ev);
 372        if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0)
 373                return 0;
 374
 375        /* If it failed because of ENOMEM, retry should help.  If it failed
 376         * because bio_add_page failed (probably broken lower level driver),
 377         * retry may or may not help.
 378         * If it does not, you may need to force disconnect. */
 379        spin_lock_irq(&mdev->req_lock);
 380        list_del(&e->w.list);
 381        spin_unlock_irq(&mdev->req_lock);
 382
 383        drbd_free_ee(mdev, e);
 384defer:
 385        put_ldev(mdev);
 386        return -EAGAIN;
 387}
 388
 389int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 390{
 391        switch (mdev->state.conn) {
 392        case C_VERIFY_S:
 393                w_make_ov_request(mdev, w, cancel);
 394                break;
 395        case C_SYNC_TARGET:
 396                w_make_resync_request(mdev, w, cancel);
 397                break;
 398        }
 399
 400        return 1;
 401}
 402
 403void resync_timer_fn(unsigned long data)
 404{
 405        struct drbd_conf *mdev = (struct drbd_conf *) data;
 406
 407        if (list_empty(&mdev->resync_work.list))
 408                drbd_queue_work(&mdev->data.work, &mdev->resync_work);
 409}
 410
 411static void fifo_set(struct fifo_buffer *fb, int value)
 412{
 413        int i;
 414
 415        for (i = 0; i < fb->size; i++)
 416                fb->values[i] = value;
 417}
 418
 419static int fifo_push(struct fifo_buffer *fb, int value)
 420{
 421        int ov;
 422
 423        ov = fb->values[fb->head_index];
 424        fb->values[fb->head_index++] = value;
 425
 426        if (fb->head_index >= fb->size)
 427                fb->head_index = 0;
 428
 429        return ov;
 430}
 431
 432static void fifo_add_val(struct fifo_buffer *fb, int value)
 433{
 434        int i;
 435
 436        for (i = 0; i < fb->size; i++)
 437                fb->values[i] += value;
 438}
 439
 440static int drbd_rs_controller(struct drbd_conf *mdev)
 441{
 442        unsigned int sect_in;  /* Number of sectors that came in since the last turn */
 443        unsigned int want;     /* The number of sectors we want in the proxy */
 444        int req_sect; /* Number of sectors to request in this turn */
 445        int correction; /* Number of sectors more we need in the proxy*/
 446        int cps; /* correction per invocation of drbd_rs_controller() */
 447        int steps; /* Number of time steps to plan ahead */
 448        int curr_corr;
 449        int max_sect;
 450
 451        sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
 452        mdev->rs_in_flight -= sect_in;
 453
 454        spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */
 455
 456        steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
 457
 458        if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
 459                want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps;
 460        } else { /* normal path */
 461                want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target :
 462                        sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10);
 463        }
 464
 465        correction = want - mdev->rs_in_flight - mdev->rs_planed;
 466
 467        /* Plan ahead */
 468        cps = correction / steps;
 469        fifo_add_val(&mdev->rs_plan_s, cps);
 470        mdev->rs_planed += cps * steps;
 471
 472        /* What we do in this step */
 473        curr_corr = fifo_push(&mdev->rs_plan_s, 0);
 474        spin_unlock(&mdev->peer_seq_lock);
 475        mdev->rs_planed -= curr_corr;
 476
 477        req_sect = sect_in + curr_corr;
 478        if (req_sect < 0)
 479                req_sect = 0;
 480
 481        max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ;
 482        if (req_sect > max_sect)
 483                req_sect = max_sect;
 484
 485        /*
 486        dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
 487                 sect_in, mdev->rs_in_flight, want, correction,
 488                 steps, cps, mdev->rs_planed, curr_corr, req_sect);
 489        */
 490
 491        return req_sect;
 492}
 493
 494static int drbd_rs_number_requests(struct drbd_conf *mdev)
 495{
 496        int number;
 497        if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */
 498                number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
 499                mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
 500        } else {
 501                mdev->c_sync_rate = mdev->sync_conf.rate;
 502                number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
 503        }
 504
 505        /* ignore the amount of pending requests, the resync controller should
 506         * throttle down to incoming reply rate soon enough anyways. */
 507        return number;
 508}
 509
 510static int w_make_resync_request(struct drbd_conf *mdev,
 511                                 struct drbd_work *w, int cancel)
 512{
 513        unsigned long bit;
 514        sector_t sector;
 515        const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
 516        int max_bio_size;
 517        int number, rollback_i, size;
 518        int align, queued, sndbuf;
 519        int i = 0;
 520
 521        if (unlikely(cancel))
 522                return 1;
 523
 524        if (mdev->rs_total == 0) {
 525                /* empty resync? */
 526                drbd_resync_finished(mdev);
 527                return 1;
 528        }
 529
 530        if (!get_ldev(mdev)) {
 531                /* Since we only need to access mdev->rsync a
 532                   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
 533                   to continue resync with a broken disk makes no sense at
 534                   all */
 535                dev_err(DEV, "Disk broke down during resync!\n");
 536                return 1;
 537        }
 538
 539        max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
 540        number = drbd_rs_number_requests(mdev);
 541        if (number == 0)
 542                goto requeue;
 543
 544        for (i = 0; i < number; i++) {
 545                /* Stop generating RS requests, when half of the send buffer is filled */
 546                mutex_lock(&mdev->data.mutex);
 547                if (mdev->data.socket) {
 548                        queued = mdev->data.socket->sk->sk_wmem_queued;
 549                        sndbuf = mdev->data.socket->sk->sk_sndbuf;
 550                } else {
 551                        queued = 1;
 552                        sndbuf = 0;
 553                }
 554                mutex_unlock(&mdev->data.mutex);
 555                if (queued > sndbuf / 2)
 556                        goto requeue;
 557
 558next_sector:
 559                size = BM_BLOCK_SIZE;
 560                bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
 561
 562                if (bit == DRBD_END_OF_BITMAP) {
 563                        mdev->bm_resync_fo = drbd_bm_bits(mdev);
 564                        put_ldev(mdev);
 565                        return 1;
 566                }
 567
 568                sector = BM_BIT_TO_SECT(bit);
 569
 570                if (drbd_rs_should_slow_down(mdev, sector) ||
 571                    drbd_try_rs_begin_io(mdev, sector)) {
 572                        mdev->bm_resync_fo = bit;
 573                        goto requeue;
 574                }
 575                mdev->bm_resync_fo = bit + 1;
 576
 577                if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
 578                        drbd_rs_complete_io(mdev, sector);
 579                        goto next_sector;
 580                }
 581
 582#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
 583                /* try to find some adjacent bits.
 584                 * we stop if we have already the maximum req size.
 585                 *
 586                 * Additionally always align bigger requests, in order to
 587                 * be prepared for all stripe sizes of software RAIDs.
 588                 */
 589                align = 1;
 590                rollback_i = i;
 591                for (;;) {
 592                        if (size + BM_BLOCK_SIZE > max_bio_size)
 593                                break;
 594
 595                        /* Be always aligned */
 596                        if (sector & ((1<<(align+3))-1))
 597                                break;
 598
 599                        /* do not cross extent boundaries */
 600                        if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
 601                                break;
 602                        /* now, is it actually dirty, after all?
 603                         * caution, drbd_bm_test_bit is tri-state for some
 604                         * obscure reason; ( b == 0 ) would get the out-of-band
 605                         * only accidentally right because of the "oddly sized"
 606                         * adjustment below */
 607                        if (drbd_bm_test_bit(mdev, bit+1) != 1)
 608                                break;
 609                        bit++;
 610                        size += BM_BLOCK_SIZE;
 611                        if ((BM_BLOCK_SIZE << align) <= size)
 612                                align++;
 613                        i++;
 614                }
 615                /* if we merged some,
 616                 * reset the offset to start the next drbd_bm_find_next from */
 617                if (size > BM_BLOCK_SIZE)
 618                        mdev->bm_resync_fo = bit + 1;
 619#endif
 620
 621                /* adjust very last sectors, in case we are oddly sized */
 622                if (sector + (size>>9) > capacity)
 623                        size = (capacity-sector)<<9;
 624                if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) {
 625                        switch (read_for_csum(mdev, sector, size)) {
 626                        case -EIO: /* Disk failure */
 627                                put_ldev(mdev);
 628                                return 0;
 629                        case -EAGAIN: /* allocation failed, or ldev busy */
 630                                drbd_rs_complete_io(mdev, sector);
 631                                mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
 632                                i = rollback_i;
 633                                goto requeue;
 634                        case 0:
 635                                /* everything ok */
 636                                break;
 637                        default:
 638                                BUG();
 639                        }
 640                } else {
 641                        inc_rs_pending(mdev);
 642                        if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
 643                                               sector, size, ID_SYNCER)) {
 644                                dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
 645                                dec_rs_pending(mdev);
 646                                put_ldev(mdev);
 647                                return 0;
 648                        }
 649                }
 650        }
 651
 652        if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
 653                /* last syncer _request_ was sent,
 654                 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
 655                 * next sync group will resume), as soon as we receive the last
 656                 * resync data block, and the last bit is cleared.
 657                 * until then resync "work" is "inactive" ...
 658                 */
 659                put_ldev(mdev);
 660                return 1;
 661        }
 662
 663 requeue:
 664        mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 665        mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
 666        put_ldev(mdev);
 667        return 1;
 668}
 669
 670static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 671{
 672        int number, i, size;
 673        sector_t sector;
 674        const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
 675
 676        if (unlikely(cancel))
 677                return 1;
 678
 679        number = drbd_rs_number_requests(mdev);
 680
 681        sector = mdev->ov_position;
 682        for (i = 0; i < number; i++) {
 683                if (sector >= capacity) {
 684                        return 1;
 685                }
 686
 687                size = BM_BLOCK_SIZE;
 688
 689                if (drbd_rs_should_slow_down(mdev, sector) ||
 690                    drbd_try_rs_begin_io(mdev, sector)) {
 691                        mdev->ov_position = sector;
 692                        goto requeue;
 693                }
 694
 695                if (sector + (size>>9) > capacity)
 696                        size = (capacity-sector)<<9;
 697
 698                inc_rs_pending(mdev);
 699                if (!drbd_send_ov_request(mdev, sector, size)) {
 700                        dec_rs_pending(mdev);
 701                        return 0;
 702                }
 703                sector += BM_SECT_PER_BIT;
 704        }
 705        mdev->ov_position = sector;
 706
 707 requeue:
 708        mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 709        mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
 710        return 1;
 711}
 712
 713
 714void start_resync_timer_fn(unsigned long data)
 715{
 716        struct drbd_conf *mdev = (struct drbd_conf *) data;
 717
 718        drbd_queue_work(&mdev->data.work, &mdev->start_resync_work);
 719}
 720
 721int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 722{
 723        if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
 724                dev_warn(DEV, "w_start_resync later...\n");
 725                mdev->start_resync_timer.expires = jiffies + HZ/10;
 726                add_timer(&mdev->start_resync_timer);
 727                return 1;
 728        }
 729
 730        drbd_start_resync(mdev, C_SYNC_SOURCE);
 731        clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags);
 732        return 1;
 733}
 734
 735int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 736{
 737        kfree(w);
 738        ov_oos_print(mdev);
 739        drbd_resync_finished(mdev);
 740
 741        return 1;
 742}
 743
 744static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 745{
 746        kfree(w);
 747
 748        drbd_resync_finished(mdev);
 749
 750        return 1;
 751}
 752
 753static void ping_peer(struct drbd_conf *mdev)
 754{
 755        clear_bit(GOT_PING_ACK, &mdev->flags);
 756        request_ping(mdev);
 757        wait_event(mdev->misc_wait,
 758                   test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED);
 759}
 760
 761int drbd_resync_finished(struct drbd_conf *mdev)
 762{
 763        unsigned long db, dt, dbdt;
 764        unsigned long n_oos;
 765        union drbd_state os, ns;
 766        struct drbd_work *w;
 767        char *khelper_cmd = NULL;
 768        int verify_done = 0;
 769
 770        /* Remove all elements from the resync LRU. Since future actions
 771         * might set bits in the (main) bitmap, then the entries in the
 772         * resync LRU would be wrong. */
 773        if (drbd_rs_del_all(mdev)) {
 774                /* In case this is not possible now, most probably because
 775                 * there are P_RS_DATA_REPLY Packets lingering on the worker's
 776                 * queue (or even the read operations for those packets
 777                 * is not finished by now).   Retry in 100ms. */
 778
 779                schedule_timeout_interruptible(HZ / 10);
 780                w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
 781                if (w) {
 782                        w->cb = w_resync_finished;
 783                        drbd_queue_work(&mdev->data.work, w);
 784                        return 1;
 785                }
 786                dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
 787        }
 788
 789        dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
 790        if (dt <= 0)
 791                dt = 1;
 792        db = mdev->rs_total;
 793        dbdt = Bit2KB(db/dt);
 794        mdev->rs_paused /= HZ;
 795
 796        if (!get_ldev(mdev))
 797                goto out;
 798
 799        ping_peer(mdev);
 800
 801        spin_lock_irq(&mdev->req_lock);
 802        os = mdev->state;
 803
 804        verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
 805
 806        /* This protects us against multiple calls (that can happen in the presence
 807           of application IO), and against connectivity loss just before we arrive here. */
 808        if (os.conn <= C_CONNECTED)
 809                goto out_unlock;
 810
 811        ns = os;
 812        ns.conn = C_CONNECTED;
 813
 814        dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
 815             verify_done ? "Online verify " : "Resync",
 816             dt + mdev->rs_paused, mdev->rs_paused, dbdt);
 817
 818        n_oos = drbd_bm_total_weight(mdev);
 819
 820        if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
 821                if (n_oos) {
 822                        dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
 823                              n_oos, Bit2KB(1));
 824                        khelper_cmd = "out-of-sync";
 825                }
 826        } else {
 827                D_ASSERT((n_oos - mdev->rs_failed) == 0);
 828
 829                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
 830                        khelper_cmd = "after-resync-target";
 831
 832                if (mdev->csums_tfm && mdev->rs_total) {
 833                        const unsigned long s = mdev->rs_same_csum;
 834                        const unsigned long t = mdev->rs_total;
 835                        const int ratio =
 836                                (t == 0)     ? 0 :
 837                        (t < 100000) ? ((s*100)/t) : (s/(t/100));
 838                        dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
 839                             "transferred %luK total %luK\n",
 840                             ratio,
 841                             Bit2KB(mdev->rs_same_csum),
 842                             Bit2KB(mdev->rs_total - mdev->rs_same_csum),
 843                             Bit2KB(mdev->rs_total));
 844                }
 845        }
 846
 847        if (mdev->rs_failed) {
 848                dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
 849
 850                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 851                        ns.disk = D_INCONSISTENT;
 852                        ns.pdsk = D_UP_TO_DATE;
 853                } else {
 854                        ns.disk = D_UP_TO_DATE;
 855                        ns.pdsk = D_INCONSISTENT;
 856                }
 857        } else {
 858                ns.disk = D_UP_TO_DATE;
 859                ns.pdsk = D_UP_TO_DATE;
 860
 861                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 862                        if (mdev->p_uuid) {
 863                                int i;
 864                                for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
 865                                        _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
 866                                drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
 867                                _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
 868                        } else {
 869                                dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
 870                        }
 871                }
 872
 873                if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
 874                        /* for verify runs, we don't update uuids here,
 875                         * so there would be nothing to report. */
 876                        drbd_uuid_set_bm(mdev, 0UL);
 877                        drbd_print_uuids(mdev, "updated UUIDs");
 878                        if (mdev->p_uuid) {
 879                                /* Now the two UUID sets are equal, update what we
 880                                 * know of the peer. */
 881                                int i;
 882                                for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
 883                                        mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
 884                        }
 885                }
 886        }
 887
 888        _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
 889out_unlock:
 890        spin_unlock_irq(&mdev->req_lock);
 891        put_ldev(mdev);
 892out:
 893        mdev->rs_total  = 0;
 894        mdev->rs_failed = 0;
 895        mdev->rs_paused = 0;
 896        if (verify_done)
 897                mdev->ov_start_sector = 0;
 898
 899        drbd_md_sync(mdev);
 900
 901        if (khelper_cmd)
 902                drbd_khelper(mdev, khelper_cmd);
 903
 904        return 1;
 905}
 906
 907/* helper */
 908static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e)
 909{
 910        if (drbd_ee_has_active_page(e)) {
 911                /* This might happen if sendpage() has not finished */
 912                int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT;
 913                atomic_add(i, &mdev->pp_in_use_by_net);
 914                atomic_sub(i, &mdev->pp_in_use);
 915                spin_lock_irq(&mdev->req_lock);
 916                list_add_tail(&e->w.list, &mdev->net_ee);
 917                spin_unlock_irq(&mdev->req_lock);
 918                wake_up(&drbd_pp_wait);
 919        } else
 920                drbd_free_ee(mdev, e);
 921}
 922
 923/**
 924 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
 925 * @mdev:       DRBD device.
 926 * @w:          work object.
 927 * @cancel:     The connection will be closed anyways
 928 */
 929int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 930{
 931        struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
 932        int ok;
 933
 934        if (unlikely(cancel)) {
 935                drbd_free_ee(mdev, e);
 936                dec_unacked(mdev);
 937                return 1;
 938        }
 939
 940        if (likely((e->flags & EE_WAS_ERROR) == 0)) {
 941                ok = drbd_send_block(mdev, P_DATA_REPLY, e);
 942        } else {
 943                if (__ratelimit(&drbd_ratelimit_state))
 944                        dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
 945                            (unsigned long long)e->sector);
 946
 947                ok = drbd_send_ack(mdev, P_NEG_DREPLY, e);
 948        }
 949
 950        dec_unacked(mdev);
 951
 952        move_to_net_ee_or_free(mdev, e);
 953
 954        if (unlikely(!ok))
 955                dev_err(DEV, "drbd_send_block() failed\n");
 956        return ok;
 957}
 958
 959/**
 960 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS
 961 * @mdev:       DRBD device.
 962 * @w:          work object.
 963 * @cancel:     The connection will be closed anyways
 964 */
 965int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
 966{
 967        struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
 968        int ok;
 969
 970        if (unlikely(cancel)) {
 971                drbd_free_ee(mdev, e);
 972                dec_unacked(mdev);
 973                return 1;
 974        }
 975
 976        if (get_ldev_if_state(mdev, D_FAILED)) {
 977                drbd_rs_complete_io(mdev, e->sector);
 978                put_ldev(mdev);
 979        }
 980
 981        if (mdev->state.conn == C_AHEAD) {
 982                ok = drbd_send_ack(mdev, P_RS_CANCEL, e);
 983        } else if (likely((e->flags & EE_WAS_ERROR) == 0)) {
 984                if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
 985                        inc_rs_pending(mdev);
 986                        ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
 987                } else {
 988                        if (__ratelimit(&drbd_ratelimit_state))
 989                                dev_err(DEV, "Not sending RSDataReply, "
 990                                    "partner DISKLESS!\n");
 991                        ok = 1;
 992                }
 993        } else {
 994                if (__ratelimit(&drbd_ratelimit_state))
 995                        dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
 996                            (unsigned long long)e->sector);
 997
 998                ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
 999
1000                /* update resync data with failure */
1001                drbd_rs_failed_io(mdev, e->sector, e->size);
1002        }
1003
1004        dec_unacked(mdev);
1005
1006        move_to_net_ee_or_free(mdev, e);
1007
1008        if (unlikely(!ok))
1009                dev_err(DEV, "drbd_send_block() failed\n");
1010        return ok;
1011}
1012
1013int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1014{
1015        struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1016        struct digest_info *di;
1017        int digest_size;
1018        void *digest = NULL;
1019        int ok, eq = 0;
1020
1021        if (unlikely(cancel)) {
1022                drbd_free_ee(mdev, e);
1023                dec_unacked(mdev);
1024                return 1;
1025        }
1026
1027        if (get_ldev(mdev)) {
1028                drbd_rs_complete_io(mdev, e->sector);
1029                put_ldev(mdev);
1030        }
1031
1032        di = e->digest;
1033
1034        if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1035                /* quick hack to try to avoid a race against reconfiguration.
1036                 * a real fix would be much more involved,
1037                 * introducing more locking mechanisms */
1038                if (mdev->csums_tfm) {
1039                        digest_size = crypto_hash_digestsize(mdev->csums_tfm);
1040                        D_ASSERT(digest_size == di->digest_size);
1041                        digest = kmalloc(digest_size, GFP_NOIO);
1042                }
1043                if (digest) {
1044                        drbd_csum_ee(mdev, mdev->csums_tfm, e, digest);
1045                        eq = !memcmp(digest, di->digest, digest_size);
1046                        kfree(digest);
1047                }
1048
1049                if (eq) {
1050                        drbd_set_in_sync(mdev, e->sector, e->size);
1051                        /* rs_same_csums unit is BM_BLOCK_SIZE */
1052                        mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT;
1053                        ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e);
1054                } else {
1055                        inc_rs_pending(mdev);
1056                        e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1057                        e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */
1058                        kfree(di);
1059                        ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e);
1060                }
1061        } else {
1062                ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e);
1063                if (__ratelimit(&drbd_ratelimit_state))
1064                        dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1065        }
1066
1067        dec_unacked(mdev);
1068        move_to_net_ee_or_free(mdev, e);
1069
1070        if (unlikely(!ok))
1071                dev_err(DEV, "drbd_send_block/ack() failed\n");
1072        return ok;
1073}
1074
1075/* TODO merge common code with w_e_send_csum */
1076int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1077{
1078        struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1079        sector_t sector = e->sector;
1080        unsigned int size = e->size;
1081        int digest_size;
1082        void *digest;
1083        int ok = 1;
1084
1085        if (unlikely(cancel))
1086                goto out;
1087
1088        digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1089        digest = kmalloc(digest_size, GFP_NOIO);
1090        if (!digest) {
1091                ok = 0; /* terminate the connection in case the allocation failed */
1092                goto out;
1093        }
1094
1095        if (likely(!(e->flags & EE_WAS_ERROR)))
1096                drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1097        else
1098                memset(digest, 0, digest_size);
1099
1100        /* Free e and pages before send.
1101         * In case we block on congestion, we could otherwise run into
1102         * some distributed deadlock, if the other side blocks on
1103         * congestion as well, because our receiver blocks in
1104         * drbd_pp_alloc due to pp_in_use > max_buffers. */
1105        drbd_free_ee(mdev, e);
1106        e = NULL;
1107        inc_rs_pending(mdev);
1108        ok = drbd_send_drequest_csum(mdev, sector, size,
1109                                     digest, digest_size,
1110                                     P_OV_REPLY);
1111        if (!ok)
1112                dec_rs_pending(mdev);
1113        kfree(digest);
1114
1115out:
1116        if (e)
1117                drbd_free_ee(mdev, e);
1118        dec_unacked(mdev);
1119        return ok;
1120}
1121
1122void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size)
1123{
1124        if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1125                mdev->ov_last_oos_size += size>>9;
1126        } else {
1127                mdev->ov_last_oos_start = sector;
1128                mdev->ov_last_oos_size = size>>9;
1129        }
1130        drbd_set_out_of_sync(mdev, sector, size);
1131}
1132
1133int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1134{
1135        struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w);
1136        struct digest_info *di;
1137        void *digest;
1138        sector_t sector = e->sector;
1139        unsigned int size = e->size;
1140        int digest_size;
1141        int ok, eq = 0;
1142
1143        if (unlikely(cancel)) {
1144                drbd_free_ee(mdev, e);
1145                dec_unacked(mdev);
1146                return 1;
1147        }
1148
1149        /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1150         * the resync lru has been cleaned up already */
1151        if (get_ldev(mdev)) {
1152                drbd_rs_complete_io(mdev, e->sector);
1153                put_ldev(mdev);
1154        }
1155
1156        di = e->digest;
1157
1158        if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1159                digest_size = crypto_hash_digestsize(mdev->verify_tfm);
1160                digest = kmalloc(digest_size, GFP_NOIO);
1161                if (digest) {
1162                        drbd_csum_ee(mdev, mdev->verify_tfm, e, digest);
1163
1164                        D_ASSERT(digest_size == di->digest_size);
1165                        eq = !memcmp(digest, di->digest, digest_size);
1166                        kfree(digest);
1167                }
1168        }
1169
1170                /* Free e and pages before send.
1171                 * In case we block on congestion, we could otherwise run into
1172                 * some distributed deadlock, if the other side blocks on
1173                 * congestion as well, because our receiver blocks in
1174                 * drbd_pp_alloc due to pp_in_use > max_buffers. */
1175        drbd_free_ee(mdev, e);
1176        if (!eq)
1177                drbd_ov_oos_found(mdev, sector, size);
1178        else
1179                ov_oos_print(mdev);
1180
1181        ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1182                              eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1183
1184        dec_unacked(mdev);
1185
1186        --mdev->ov_left;
1187
1188        /* let's advance progress step marks only for every other megabyte */
1189        if ((mdev->ov_left & 0x200) == 0x200)
1190                drbd_advance_rs_marks(mdev, mdev->ov_left);
1191
1192        if (mdev->ov_left == 0) {
1193                ov_oos_print(mdev);
1194                drbd_resync_finished(mdev);
1195        }
1196
1197        return ok;
1198}
1199
1200int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1201{
1202        struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1203        complete(&b->done);
1204        return 1;
1205}
1206
1207int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1208{
1209        struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w);
1210        struct p_barrier *p = &mdev->data.sbuf.barrier;
1211        int ok = 1;
1212
1213        /* really avoid racing with tl_clear.  w.cb may have been referenced
1214         * just before it was reassigned and re-queued, so double check that.
1215         * actually, this race was harmless, since we only try to send the
1216         * barrier packet here, and otherwise do nothing with the object.
1217         * but compare with the head of w_clear_epoch */
1218        spin_lock_irq(&mdev->req_lock);
1219        if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED)
1220                cancel = 1;
1221        spin_unlock_irq(&mdev->req_lock);
1222        if (cancel)
1223                return 1;
1224
1225        if (!drbd_get_data_sock(mdev))
1226                return 0;
1227        p->barrier = b->br_number;
1228        /* inc_ap_pending was done where this was queued.
1229         * dec_ap_pending will be done in got_BarrierAck
1230         * or (on connection loss) in w_clear_epoch.  */
1231        ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER,
1232                                (struct p_header80 *)p, sizeof(*p), 0);
1233        drbd_put_data_sock(mdev);
1234
1235        return ok;
1236}
1237
1238int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1239{
1240        if (cancel)
1241                return 1;
1242        return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE);
1243}
1244
1245int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1246{
1247        struct drbd_request *req = container_of(w, struct drbd_request, w);
1248        int ok;
1249
1250        if (unlikely(cancel)) {
1251                req_mod(req, send_canceled);
1252                return 1;
1253        }
1254
1255        ok = drbd_send_oos(mdev, req);
1256        req_mod(req, oos_handed_to_network);
1257
1258        return ok;
1259}
1260
1261/**
1262 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1263 * @mdev:       DRBD device.
1264 * @w:          work object.
1265 * @cancel:     The connection will be closed anyways
1266 */
1267int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1268{
1269        struct drbd_request *req = container_of(w, struct drbd_request, w);
1270        int ok;
1271
1272        if (unlikely(cancel)) {
1273                req_mod(req, send_canceled);
1274                return 1;
1275        }
1276
1277        ok = drbd_send_dblock(mdev, req);
1278        req_mod(req, ok ? handed_over_to_network : send_failed);
1279
1280        return ok;
1281}
1282
1283/**
1284 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1285 * @mdev:       DRBD device.
1286 * @w:          work object.
1287 * @cancel:     The connection will be closed anyways
1288 */
1289int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1290{
1291        struct drbd_request *req = container_of(w, struct drbd_request, w);
1292        int ok;
1293
1294        if (unlikely(cancel)) {
1295                req_mod(req, send_canceled);
1296                return 1;
1297        }
1298
1299        ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size,
1300                                (unsigned long)req);
1301
1302        if (!ok) {
1303                /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send();
1304                 * so this is probably redundant */
1305                if (mdev->state.conn >= C_CONNECTED)
1306                        drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
1307        }
1308        req_mod(req, ok ? handed_over_to_network : send_failed);
1309
1310        return ok;
1311}
1312
1313int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1314{
1315        struct drbd_request *req = container_of(w, struct drbd_request, w);
1316
1317        if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1318                drbd_al_begin_io(mdev, req->sector);
1319        /* Calling drbd_al_begin_io() out of the worker might deadlocks
1320           theoretically. Practically it can not deadlock, since this is
1321           only used when unfreezing IOs. All the extents of the requests
1322           that made it into the TL are already active */
1323
1324        drbd_req_make_private_bio(req, req->master_bio);
1325        req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1326        generic_make_request(req->private_bio);
1327
1328        return 1;
1329}
1330
1331static int _drbd_may_sync_now(struct drbd_conf *mdev)
1332{
1333        struct drbd_conf *odev = mdev;
1334
1335        while (1) {
1336                if (odev->sync_conf.after == -1)
1337                        return 1;
1338                odev = minor_to_mdev(odev->sync_conf.after);
1339                ERR_IF(!odev) return 1;
1340                if ((odev->state.conn >= C_SYNC_SOURCE &&
1341                     odev->state.conn <= C_PAUSED_SYNC_T) ||
1342                    odev->state.aftr_isp || odev->state.peer_isp ||
1343                    odev->state.user_isp)
1344                        return 0;
1345        }
1346}
1347
1348/**
1349 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1350 * @mdev:       DRBD device.
1351 *
1352 * Called from process context only (admin command and after_state_ch).
1353 */
1354static int _drbd_pause_after(struct drbd_conf *mdev)
1355{
1356        struct drbd_conf *odev;
1357        int i, rv = 0;
1358
1359        for (i = 0; i < minor_count; i++) {
1360                odev = minor_to_mdev(i);
1361                if (!odev)
1362                        continue;
1363                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1364                        continue;
1365                if (!_drbd_may_sync_now(odev))
1366                        rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1367                               != SS_NOTHING_TO_DO);
1368        }
1369
1370        return rv;
1371}
1372
1373/**
1374 * _drbd_resume_next() - Resume resync on all devices that may resync now
1375 * @mdev:       DRBD device.
1376 *
1377 * Called from process context only (admin command and worker).
1378 */
1379static int _drbd_resume_next(struct drbd_conf *mdev)
1380{
1381        struct drbd_conf *odev;
1382        int i, rv = 0;
1383
1384        for (i = 0; i < minor_count; i++) {
1385                odev = minor_to_mdev(i);
1386                if (!odev)
1387                        continue;
1388                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1389                        continue;
1390                if (odev->state.aftr_isp) {
1391                        if (_drbd_may_sync_now(odev))
1392                                rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1393                                                        CS_HARD, NULL)
1394                                       != SS_NOTHING_TO_DO) ;
1395                }
1396        }
1397        return rv;
1398}
1399
1400void resume_next_sg(struct drbd_conf *mdev)
1401{
1402        write_lock_irq(&global_state_lock);
1403        _drbd_resume_next(mdev);
1404        write_unlock_irq(&global_state_lock);
1405}
1406
1407void suspend_other_sg(struct drbd_conf *mdev)
1408{
1409        write_lock_irq(&global_state_lock);
1410        _drbd_pause_after(mdev);
1411        write_unlock_irq(&global_state_lock);
1412}
1413
1414static int sync_after_error(struct drbd_conf *mdev, int o_minor)
1415{
1416        struct drbd_conf *odev;
1417
1418        if (o_minor == -1)
1419                return NO_ERROR;
1420        if (o_minor < -1 || minor_to_mdev(o_minor) == NULL)
1421                return ERR_SYNC_AFTER;
1422
1423        /* check for loops */
1424        odev = minor_to_mdev(o_minor);
1425        while (1) {
1426                if (odev == mdev)
1427                        return ERR_SYNC_AFTER_CYCLE;
1428
1429                /* dependency chain ends here, no cycles. */
1430                if (odev->sync_conf.after == -1)
1431                        return NO_ERROR;
1432
1433                /* follow the dependency chain */
1434                odev = minor_to_mdev(odev->sync_conf.after);
1435        }
1436}
1437
1438int drbd_alter_sa(struct drbd_conf *mdev, int na)
1439{
1440        int changes;
1441        int retcode;
1442
1443        write_lock_irq(&global_state_lock);
1444        retcode = sync_after_error(mdev, na);
1445        if (retcode == NO_ERROR) {
1446                mdev->sync_conf.after = na;
1447                do {
1448                        changes  = _drbd_pause_after(mdev);
1449                        changes |= _drbd_resume_next(mdev);
1450                } while (changes);
1451        }
1452        write_unlock_irq(&global_state_lock);
1453        return retcode;
1454}
1455
1456void drbd_rs_controller_reset(struct drbd_conf *mdev)
1457{
1458        atomic_set(&mdev->rs_sect_in, 0);
1459        atomic_set(&mdev->rs_sect_ev, 0);
1460        mdev->rs_in_flight = 0;
1461        mdev->rs_planed = 0;
1462        spin_lock(&mdev->peer_seq_lock);
1463        fifo_set(&mdev->rs_plan_s, 0);
1464        spin_unlock(&mdev->peer_seq_lock);
1465}
1466
1467/**
1468 * drbd_start_resync() - Start the resync process
1469 * @mdev:       DRBD device.
1470 * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1471 *
1472 * This function might bring you directly into one of the
1473 * C_PAUSED_SYNC_* states.
1474 */
1475void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1476{
1477        union drbd_state ns;
1478        int r;
1479
1480        if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1481                dev_err(DEV, "Resync already running!\n");
1482                return;
1483        }
1484
1485        if (mdev->state.conn < C_AHEAD) {
1486                /* In case a previous resync run was aborted by an IO error/detach on the peer. */
1487                drbd_rs_cancel_all(mdev);
1488                /* This should be done when we abort the resync. We definitely do not
1489                   want to have this for connections going back and forth between
1490                   Ahead/Behind and SyncSource/SyncTarget */
1491        }
1492
1493        if (side == C_SYNC_TARGET) {
1494                /* Since application IO was locked out during C_WF_BITMAP_T and
1495                   C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1496                   we check that we might make the data inconsistent. */
1497                r = drbd_khelper(mdev, "before-resync-target");
1498                r = (r >> 8) & 0xff;
1499                if (r > 0) {
1500                        dev_info(DEV, "before-resync-target handler returned %d, "
1501                             "dropping connection.\n", r);
1502                        drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1503                        return;
1504                }
1505        } else /* C_SYNC_SOURCE */ {
1506                r = drbd_khelper(mdev, "before-resync-source");
1507                r = (r >> 8) & 0xff;
1508                if (r > 0) {
1509                        if (r == 3) {
1510                                dev_info(DEV, "before-resync-source handler returned %d, "
1511                                         "ignoring. Old userland tools?", r);
1512                        } else {
1513                                dev_info(DEV, "before-resync-source handler returned %d, "
1514                                         "dropping connection.\n", r);
1515                                drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
1516                                return;
1517                        }
1518                }
1519        }
1520
1521        drbd_state_lock(mdev);
1522
1523        if (!get_ldev_if_state(mdev, D_NEGOTIATING)) {
1524                drbd_state_unlock(mdev);
1525                return;
1526        }
1527
1528        write_lock_irq(&global_state_lock);
1529        ns = mdev->state;
1530
1531        ns.aftr_isp = !_drbd_may_sync_now(mdev);
1532
1533        ns.conn = side;
1534
1535        if (side == C_SYNC_TARGET)
1536                ns.disk = D_INCONSISTENT;
1537        else /* side == C_SYNC_SOURCE */
1538                ns.pdsk = D_INCONSISTENT;
1539
1540        r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1541        ns = mdev->state;
1542
1543        if (ns.conn < C_CONNECTED)
1544                r = SS_UNKNOWN_ERROR;
1545
1546        if (r == SS_SUCCESS) {
1547                unsigned long tw = drbd_bm_total_weight(mdev);
1548                unsigned long now = jiffies;
1549                int i;
1550
1551                mdev->rs_failed    = 0;
1552                mdev->rs_paused    = 0;
1553                mdev->rs_same_csum = 0;
1554                mdev->rs_last_events = 0;
1555                mdev->rs_last_sect_ev = 0;
1556                mdev->rs_total     = tw;
1557                mdev->rs_start     = now;
1558                for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1559                        mdev->rs_mark_left[i] = tw;
1560                        mdev->rs_mark_time[i] = now;
1561                }
1562                _drbd_pause_after(mdev);
1563        }
1564        write_unlock_irq(&global_state_lock);
1565
1566        if (r == SS_SUCCESS) {
1567                dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1568                     drbd_conn_str(ns.conn),
1569                     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1570                     (unsigned long) mdev->rs_total);
1571                if (side == C_SYNC_TARGET)
1572                        mdev->bm_resync_fo = 0;
1573
1574                /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1575                 * with w_send_oos, or the sync target will get confused as to
1576                 * how much bits to resync.  We cannot do that always, because for an
1577                 * empty resync and protocol < 95, we need to do it here, as we call
1578                 * drbd_resync_finished from here in that case.
1579                 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1580                 * and from after_state_ch otherwise. */
1581                if (side == C_SYNC_SOURCE && mdev->agreed_pro_version < 96)
1582                        drbd_gen_and_send_sync_uuid(mdev);
1583
1584                if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) {
1585                        /* This still has a race (about when exactly the peers
1586                         * detect connection loss) that can lead to a full sync
1587                         * on next handshake. In 8.3.9 we fixed this with explicit
1588                         * resync-finished notifications, but the fix
1589                         * introduces a protocol change.  Sleeping for some
1590                         * time longer than the ping interval + timeout on the
1591                         * SyncSource, to give the SyncTarget the chance to
1592                         * detect connection loss, then waiting for a ping
1593                         * response (implicit in drbd_resync_finished) reduces
1594                         * the race considerably, but does not solve it. */
1595                        if (side == C_SYNC_SOURCE)
1596                                schedule_timeout_interruptible(
1597                                        mdev->net_conf->ping_int * HZ +
1598                                        mdev->net_conf->ping_timeo*HZ/9);
1599                        drbd_resync_finished(mdev);
1600                }
1601
1602                drbd_rs_controller_reset(mdev);
1603                /* ns.conn may already be != mdev->state.conn,
1604                 * we may have been paused in between, or become paused until
1605                 * the timer triggers.
1606                 * No matter, that is handled in resync_timer_fn() */
1607                if (ns.conn == C_SYNC_TARGET)
1608                        mod_timer(&mdev->resync_timer, jiffies);
1609
1610                drbd_md_sync(mdev);
1611        }
1612        put_ldev(mdev);
1613        drbd_state_unlock(mdev);
1614}
1615
1616int drbd_worker(struct drbd_thread *thi)
1617{
1618        struct drbd_conf *mdev = thi->mdev;
1619        struct drbd_work *w = NULL;
1620        LIST_HEAD(work_list);
1621        int intr = 0, i;
1622
1623        sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev));
1624
1625        while (get_t_state(thi) == Running) {
1626                drbd_thread_current_set_cpu(mdev);
1627
1628                if (down_trylock(&mdev->data.work.s)) {
1629                        mutex_lock(&mdev->data.mutex);
1630                        if (mdev->data.socket && !mdev->net_conf->no_cork)
1631                                drbd_tcp_uncork(mdev->data.socket);
1632                        mutex_unlock(&mdev->data.mutex);
1633
1634                        intr = down_interruptible(&mdev->data.work.s);
1635
1636                        mutex_lock(&mdev->data.mutex);
1637                        if (mdev->data.socket  && !mdev->net_conf->no_cork)
1638                                drbd_tcp_cork(mdev->data.socket);
1639                        mutex_unlock(&mdev->data.mutex);
1640                }
1641
1642                if (intr) {
1643                        D_ASSERT(intr == -EINTR);
1644                        flush_signals(current);
1645                        ERR_IF (get_t_state(thi) == Running)
1646                                continue;
1647                        break;
1648                }
1649
1650                if (get_t_state(thi) != Running)
1651                        break;
1652                /* With this break, we have done a down() but not consumed
1653                   the entry from the list. The cleanup code takes care of
1654                   this...   */
1655
1656                w = NULL;
1657                spin_lock_irq(&mdev->data.work.q_lock);
1658                ERR_IF(list_empty(&mdev->data.work.q)) {
1659                        /* something terribly wrong in our logic.
1660                         * we were able to down() the semaphore,
1661                         * but the list is empty... doh.
1662                         *
1663                         * what is the best thing to do now?
1664                         * try again from scratch, restarting the receiver,
1665                         * asender, whatnot? could break even more ugly,
1666                         * e.g. when we are primary, but no good local data.
1667                         *
1668                         * I'll try to get away just starting over this loop.
1669                         */
1670                        spin_unlock_irq(&mdev->data.work.q_lock);
1671                        continue;
1672                }
1673                w = list_entry(mdev->data.work.q.next, struct drbd_work, list);
1674                list_del_init(&w->list);
1675                spin_unlock_irq(&mdev->data.work.q_lock);
1676
1677                if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) {
1678                        /* dev_warn(DEV, "worker: a callback failed! \n"); */
1679                        if (mdev->state.conn >= C_CONNECTED)
1680                                drbd_force_state(mdev,
1681                                                NS(conn, C_NETWORK_FAILURE));
1682                }
1683        }
1684        D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags));
1685        D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags));
1686
1687        spin_lock_irq(&mdev->data.work.q_lock);
1688        i = 0;
1689        while (!list_empty(&mdev->data.work.q)) {
1690                list_splice_init(&mdev->data.work.q, &work_list);
1691                spin_unlock_irq(&mdev->data.work.q_lock);
1692
1693                while (!list_empty(&work_list)) {
1694                        w = list_entry(work_list.next, struct drbd_work, list);
1695                        list_del_init(&w->list);
1696                        w->cb(mdev, w, 1);
1697                        i++; /* dead debugging code */
1698                }
1699
1700                spin_lock_irq(&mdev->data.work.q_lock);
1701        }
1702        sema_init(&mdev->data.work.s, 0);
1703        /* DANGEROUS race: if someone did queue his work within the spinlock,
1704         * but up() ed outside the spinlock, we could get an up() on the
1705         * semaphore without corresponding list entry.
1706         * So don't do that.
1707         */
1708        spin_unlock_irq(&mdev->data.work.q_lock);
1709
1710        D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1711        /* _drbd_set_state only uses stop_nowait.
1712         * wait here for the Exiting receiver. */
1713        drbd_thread_stop(&mdev->receiver);
1714        drbd_mdev_cleanup(mdev);
1715
1716        dev_info(DEV, "worker terminated\n");
1717
1718        clear_bit(DEVICE_DYING, &mdev->flags);
1719        clear_bit(CONFIG_PENDING, &mdev->flags);
1720        wake_up(&mdev->state_wait);
1721
1722        return 0;
1723}
1724