linux/drivers/block/drbd/drbd_worker.c
<<
>>
Prefs
   1/*
   2   drbd_worker.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23
  24 */
  25
  26#include <linux/module.h>
  27#include <linux/drbd.h>
  28#include <linux/sched.h>
  29#include <linux/wait.h>
  30#include <linux/mm.h>
  31#include <linux/memcontrol.h>
  32#include <linux/mm_inline.h>
  33#include <linux/slab.h>
  34#include <linux/random.h>
  35#include <linux/string.h>
  36#include <linux/scatterlist.h>
  37
  38#include "drbd_int.h"
  39#include "drbd_req.h"
  40
  41static int w_make_ov_request(struct drbd_work *w, int cancel);
  42
  43
  44/* endio handlers:
  45 *   drbd_md_io_complete (defined here)
  46 *   drbd_request_endio (defined here)
  47 *   drbd_peer_request_endio (defined here)
  48 *   bm_async_io_complete (defined in drbd_bitmap.c)
  49 *
  50 * For all these callbacks, note the following:
  51 * The callbacks will be called in irq context by the IDE drivers,
  52 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
  53 * Try to get the locking right :)
  54 *
  55 */
  56
  57
  58/* About the global_state_lock
  59   Each state transition on an device holds a read lock. In case we have
  60   to evaluate the resync after dependencies, we grab a write lock, because
  61   we need stable states on all devices for that.  */
  62rwlock_t global_state_lock;
  63
  64/* used for synchronous meta data and bitmap IO
  65 * submitted by drbd_md_sync_page_io()
  66 */
  67void drbd_md_io_complete(struct bio *bio, int error)
  68{
  69        struct drbd_md_io *md_io;
  70        struct drbd_conf *mdev;
  71
  72        md_io = (struct drbd_md_io *)bio->bi_private;
  73        mdev = container_of(md_io, struct drbd_conf, md_io);
  74
  75        md_io->error = error;
  76
  77        /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
  78         * to timeout on the lower level device, and eventually detach from it.
  79         * If this io completion runs after that timeout expired, this
  80         * drbd_md_put_buffer() may allow us to finally try and re-attach.
  81         * During normal operation, this only puts that extra reference
  82         * down to 1 again.
  83         * Make sure we first drop the reference, and only then signal
  84         * completion, or we may (in drbd_al_read_log()) cycle so fast into the
  85         * next drbd_md_sync_page_io(), that we trigger the
  86         * ASSERT(atomic_read(&mdev->md_io_in_use) == 1) there.
  87         */
  88        drbd_md_put_buffer(mdev);
  89        md_io->done = 1;
  90        wake_up(&mdev->misc_wait);
  91        bio_put(bio);
  92        if (mdev->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
  93                put_ldev(mdev);
  94}
  95
  96/* reads on behalf of the partner,
  97 * "submitted" by the receiver
  98 */
  99void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
 100{
 101        unsigned long flags = 0;
 102        struct drbd_conf *mdev = peer_req->w.mdev;
 103
 104        spin_lock_irqsave(&mdev->tconn->req_lock, flags);
 105        mdev->read_cnt += peer_req->i.size >> 9;
 106        list_del(&peer_req->w.list);
 107        if (list_empty(&mdev->read_ee))
 108                wake_up(&mdev->ee_wait);
 109        if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
 110                __drbd_chk_io_error(mdev, DRBD_READ_ERROR);
 111        spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
 112
 113        drbd_queue_work(&mdev->tconn->sender_work, &peer_req->w);
 114        put_ldev(mdev);
 115}
 116
 117/* writes on behalf of the partner, or resync writes,
 118 * "submitted" by the receiver, final stage.  */
 119static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
 120{
 121        unsigned long flags = 0;
 122        struct drbd_conf *mdev = peer_req->w.mdev;
 123        struct drbd_interval i;
 124        int do_wake;
 125        u64 block_id;
 126        int do_al_complete_io;
 127
 128        /* after we moved peer_req to done_ee,
 129         * we may no longer access it,
 130         * it may be freed/reused already!
 131         * (as soon as we release the req_lock) */
 132        i = peer_req->i;
 133        do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
 134        block_id = peer_req->block_id;
 135
 136        spin_lock_irqsave(&mdev->tconn->req_lock, flags);
 137        mdev->writ_cnt += peer_req->i.size >> 9;
 138        list_move_tail(&peer_req->w.list, &mdev->done_ee);
 139
 140        /*
 141         * Do not remove from the write_requests tree here: we did not send the
 142         * Ack yet and did not wake possibly waiting conflicting requests.
 143         * Removed from the tree from "drbd_process_done_ee" within the
 144         * appropriate w.cb (e_end_block/e_end_resync_block) or from
 145         * _drbd_clear_done_ee.
 146         */
 147
 148        do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee);
 149
 150        if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
 151                __drbd_chk_io_error(mdev, DRBD_WRITE_ERROR);
 152        spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
 153
 154        if (block_id == ID_SYNCER)
 155                drbd_rs_complete_io(mdev, i.sector);
 156
 157        if (do_wake)
 158                wake_up(&mdev->ee_wait);
 159
 160        if (do_al_complete_io)
 161                drbd_al_complete_io(mdev, &i);
 162
 163        wake_asender(mdev->tconn);
 164        put_ldev(mdev);
 165}
 166
 167/* writes on behalf of the partner, or resync writes,
 168 * "submitted" by the receiver.
 169 */
 170void drbd_peer_request_endio(struct bio *bio, int error)
 171{
 172        struct drbd_peer_request *peer_req = bio->bi_private;
 173        struct drbd_conf *mdev = peer_req->w.mdev;
 174        int uptodate = bio_flagged(bio, BIO_UPTODATE);
 175        int is_write = bio_data_dir(bio) == WRITE;
 176
 177        if (error && __ratelimit(&drbd_ratelimit_state))
 178                dev_warn(DEV, "%s: error=%d s=%llus\n",
 179                                is_write ? "write" : "read", error,
 180                                (unsigned long long)peer_req->i.sector);
 181        if (!error && !uptodate) {
 182                if (__ratelimit(&drbd_ratelimit_state))
 183                        dev_warn(DEV, "%s: setting error to -EIO s=%llus\n",
 184                                        is_write ? "write" : "read",
 185                                        (unsigned long long)peer_req->i.sector);
 186                /* strange behavior of some lower level drivers...
 187                 * fail the request by clearing the uptodate flag,
 188                 * but do not return any error?! */
 189                error = -EIO;
 190        }
 191
 192        if (error)
 193                set_bit(__EE_WAS_ERROR, &peer_req->flags);
 194
 195        bio_put(bio); /* no need for the bio anymore */
 196        if (atomic_dec_and_test(&peer_req->pending_bios)) {
 197                if (is_write)
 198                        drbd_endio_write_sec_final(peer_req);
 199                else
 200                        drbd_endio_read_sec_final(peer_req);
 201        }
 202}
 203
 204/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
 205 */
 206void drbd_request_endio(struct bio *bio, int error)
 207{
 208        unsigned long flags;
 209        struct drbd_request *req = bio->bi_private;
 210        struct drbd_conf *mdev = req->w.mdev;
 211        struct bio_and_error m;
 212        enum drbd_req_event what;
 213        int uptodate = bio_flagged(bio, BIO_UPTODATE);
 214
 215        if (!error && !uptodate) {
 216                dev_warn(DEV, "p %s: setting error to -EIO\n",
 217                         bio_data_dir(bio) == WRITE ? "write" : "read");
 218                /* strange behavior of some lower level drivers...
 219                 * fail the request by clearing the uptodate flag,
 220                 * but do not return any error?! */
 221                error = -EIO;
 222        }
 223
 224
 225        /* If this request was aborted locally before,
 226         * but now was completed "successfully",
 227         * chances are that this caused arbitrary data corruption.
 228         *
 229         * "aborting" requests, or force-detaching the disk, is intended for
 230         * completely blocked/hung local backing devices which do no longer
 231         * complete requests at all, not even do error completions.  In this
 232         * situation, usually a hard-reset and failover is the only way out.
 233         *
 234         * By "aborting", basically faking a local error-completion,
 235         * we allow for a more graceful swichover by cleanly migrating services.
 236         * Still the affected node has to be rebooted "soon".
 237         *
 238         * By completing these requests, we allow the upper layers to re-use
 239         * the associated data pages.
 240         *
 241         * If later the local backing device "recovers", and now DMAs some data
 242         * from disk into the original request pages, in the best case it will
 243         * just put random data into unused pages; but typically it will corrupt
 244         * meanwhile completely unrelated data, causing all sorts of damage.
 245         *
 246         * Which means delayed successful completion,
 247         * especially for READ requests,
 248         * is a reason to panic().
 249         *
 250         * We assume that a delayed *error* completion is OK,
 251         * though we still will complain noisily about it.
 252         */
 253        if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
 254                if (__ratelimit(&drbd_ratelimit_state))
 255                        dev_emerg(DEV, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
 256
 257                if (!error)
 258                        panic("possible random memory corruption caused by delayed completion of aborted local request\n");
 259        }
 260
 261        /* to avoid recursion in __req_mod */
 262        if (unlikely(error)) {
 263                what = (bio_data_dir(bio) == WRITE)
 264                        ? WRITE_COMPLETED_WITH_ERROR
 265                        : (bio_rw(bio) == READ)
 266                          ? READ_COMPLETED_WITH_ERROR
 267                          : READ_AHEAD_COMPLETED_WITH_ERROR;
 268        } else
 269                what = COMPLETED_OK;
 270
 271        bio_put(req->private_bio);
 272        req->private_bio = ERR_PTR(error);
 273
 274        /* not req_mod(), we need irqsave here! */
 275        spin_lock_irqsave(&mdev->tconn->req_lock, flags);
 276        __req_mod(req, what, &m);
 277        spin_unlock_irqrestore(&mdev->tconn->req_lock, flags);
 278        put_ldev(mdev);
 279
 280        if (m.bio)
 281                complete_master_bio(mdev, &m);
 282}
 283
 284void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm,
 285                  struct drbd_peer_request *peer_req, void *digest)
 286{
 287        struct hash_desc desc;
 288        struct scatterlist sg;
 289        struct page *page = peer_req->pages;
 290        struct page *tmp;
 291        unsigned len;
 292
 293        desc.tfm = tfm;
 294        desc.flags = 0;
 295
 296        sg_init_table(&sg, 1);
 297        crypto_hash_init(&desc);
 298
 299        while ((tmp = page_chain_next(page))) {
 300                /* all but the last page will be fully used */
 301                sg_set_page(&sg, page, PAGE_SIZE, 0);
 302                crypto_hash_update(&desc, &sg, sg.length);
 303                page = tmp;
 304        }
 305        /* and now the last, possibly only partially used page */
 306        len = peer_req->i.size & (PAGE_SIZE - 1);
 307        sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
 308        crypto_hash_update(&desc, &sg, sg.length);
 309        crypto_hash_final(&desc, digest);
 310}
 311
 312void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest)
 313{
 314        struct hash_desc desc;
 315        struct scatterlist sg;
 316        struct bio_vec *bvec;
 317        int i;
 318
 319        desc.tfm = tfm;
 320        desc.flags = 0;
 321
 322        sg_init_table(&sg, 1);
 323        crypto_hash_init(&desc);
 324
 325        bio_for_each_segment(bvec, bio, i) {
 326                sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset);
 327                crypto_hash_update(&desc, &sg, sg.length);
 328        }
 329        crypto_hash_final(&desc, digest);
 330}
 331
 332/* MAYBE merge common code with w_e_end_ov_req */
 333static int w_e_send_csum(struct drbd_work *w, int cancel)
 334{
 335        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
 336        struct drbd_conf *mdev = w->mdev;
 337        int digest_size;
 338        void *digest;
 339        int err = 0;
 340
 341        if (unlikely(cancel))
 342                goto out;
 343
 344        if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
 345                goto out;
 346
 347        digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
 348        digest = kmalloc(digest_size, GFP_NOIO);
 349        if (digest) {
 350                sector_t sector = peer_req->i.sector;
 351                unsigned int size = peer_req->i.size;
 352                drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
 353                /* Free peer_req and pages before send.
 354                 * In case we block on congestion, we could otherwise run into
 355                 * some distributed deadlock, if the other side blocks on
 356                 * congestion as well, because our receiver blocks in
 357                 * drbd_alloc_pages due to pp_in_use > max_buffers. */
 358                drbd_free_peer_req(mdev, peer_req);
 359                peer_req = NULL;
 360                inc_rs_pending(mdev);
 361                err = drbd_send_drequest_csum(mdev, sector, size,
 362                                              digest, digest_size,
 363                                              P_CSUM_RS_REQUEST);
 364                kfree(digest);
 365        } else {
 366                dev_err(DEV, "kmalloc() of digest failed.\n");
 367                err = -ENOMEM;
 368        }
 369
 370out:
 371        if (peer_req)
 372                drbd_free_peer_req(mdev, peer_req);
 373
 374        if (unlikely(err))
 375                dev_err(DEV, "drbd_send_drequest(..., csum) failed\n");
 376        return err;
 377}
 378
 379#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
 380
 381static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size)
 382{
 383        struct drbd_peer_request *peer_req;
 384
 385        if (!get_ldev(mdev))
 386                return -EIO;
 387
 388        if (drbd_rs_should_slow_down(mdev, sector))
 389                goto defer;
 390
 391        /* GFP_TRY, because if there is no memory available right now, this may
 392         * be rescheduled for later. It is "only" background resync, after all. */
 393        peer_req = drbd_alloc_peer_req(mdev, ID_SYNCER /* unused */, sector,
 394                                       size, GFP_TRY);
 395        if (!peer_req)
 396                goto defer;
 397
 398        peer_req->w.cb = w_e_send_csum;
 399        spin_lock_irq(&mdev->tconn->req_lock);
 400        list_add(&peer_req->w.list, &mdev->read_ee);
 401        spin_unlock_irq(&mdev->tconn->req_lock);
 402
 403        atomic_add(size >> 9, &mdev->rs_sect_ev);
 404        if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
 405                return 0;
 406
 407        /* If it failed because of ENOMEM, retry should help.  If it failed
 408         * because bio_add_page failed (probably broken lower level driver),
 409         * retry may or may not help.
 410         * If it does not, you may need to force disconnect. */
 411        spin_lock_irq(&mdev->tconn->req_lock);
 412        list_del(&peer_req->w.list);
 413        spin_unlock_irq(&mdev->tconn->req_lock);
 414
 415        drbd_free_peer_req(mdev, peer_req);
 416defer:
 417        put_ldev(mdev);
 418        return -EAGAIN;
 419}
 420
 421int w_resync_timer(struct drbd_work *w, int cancel)
 422{
 423        struct drbd_conf *mdev = w->mdev;
 424        switch (mdev->state.conn) {
 425        case C_VERIFY_S:
 426                w_make_ov_request(w, cancel);
 427                break;
 428        case C_SYNC_TARGET:
 429                w_make_resync_request(w, cancel);
 430                break;
 431        }
 432
 433        return 0;
 434}
 435
 436void resync_timer_fn(unsigned long data)
 437{
 438        struct drbd_conf *mdev = (struct drbd_conf *) data;
 439
 440        if (list_empty(&mdev->resync_work.list))
 441                drbd_queue_work(&mdev->tconn->sender_work, &mdev->resync_work);
 442}
 443
 444static void fifo_set(struct fifo_buffer *fb, int value)
 445{
 446        int i;
 447
 448        for (i = 0; i < fb->size; i++)
 449                fb->values[i] = value;
 450}
 451
 452static int fifo_push(struct fifo_buffer *fb, int value)
 453{
 454        int ov;
 455
 456        ov = fb->values[fb->head_index];
 457        fb->values[fb->head_index++] = value;
 458
 459        if (fb->head_index >= fb->size)
 460                fb->head_index = 0;
 461
 462        return ov;
 463}
 464
 465static void fifo_add_val(struct fifo_buffer *fb, int value)
 466{
 467        int i;
 468
 469        for (i = 0; i < fb->size; i++)
 470                fb->values[i] += value;
 471}
 472
 473struct fifo_buffer *fifo_alloc(int fifo_size)
 474{
 475        struct fifo_buffer *fb;
 476
 477        fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
 478        if (!fb)
 479                return NULL;
 480
 481        fb->head_index = 0;
 482        fb->size = fifo_size;
 483        fb->total = 0;
 484
 485        return fb;
 486}
 487
 488static int drbd_rs_controller(struct drbd_conf *mdev)
 489{
 490        struct disk_conf *dc;
 491        unsigned int sect_in;  /* Number of sectors that came in since the last turn */
 492        unsigned int want;     /* The number of sectors we want in the proxy */
 493        int req_sect; /* Number of sectors to request in this turn */
 494        int correction; /* Number of sectors more we need in the proxy*/
 495        int cps; /* correction per invocation of drbd_rs_controller() */
 496        int steps; /* Number of time steps to plan ahead */
 497        int curr_corr;
 498        int max_sect;
 499        struct fifo_buffer *plan;
 500
 501        sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */
 502        mdev->rs_in_flight -= sect_in;
 503
 504        dc = rcu_dereference(mdev->ldev->disk_conf);
 505        plan = rcu_dereference(mdev->rs_plan_s);
 506
 507        steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
 508
 509        if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */
 510                want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
 511        } else { /* normal path */
 512                want = dc->c_fill_target ? dc->c_fill_target :
 513                        sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
 514        }
 515
 516        correction = want - mdev->rs_in_flight - plan->total;
 517
 518        /* Plan ahead */
 519        cps = correction / steps;
 520        fifo_add_val(plan, cps);
 521        plan->total += cps * steps;
 522
 523        /* What we do in this step */
 524        curr_corr = fifo_push(plan, 0);
 525        plan->total -= curr_corr;
 526
 527        req_sect = sect_in + curr_corr;
 528        if (req_sect < 0)
 529                req_sect = 0;
 530
 531        max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
 532        if (req_sect > max_sect)
 533                req_sect = max_sect;
 534
 535        /*
 536        dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
 537                 sect_in, mdev->rs_in_flight, want, correction,
 538                 steps, cps, mdev->rs_planed, curr_corr, req_sect);
 539        */
 540
 541        return req_sect;
 542}
 543
 544static int drbd_rs_number_requests(struct drbd_conf *mdev)
 545{
 546        int number;
 547
 548        rcu_read_lock();
 549        if (rcu_dereference(mdev->rs_plan_s)->size) {
 550                number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9);
 551                mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
 552        } else {
 553                mdev->c_sync_rate = rcu_dereference(mdev->ldev->disk_conf)->resync_rate;
 554                number = SLEEP_TIME * mdev->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
 555        }
 556        rcu_read_unlock();
 557
 558        /* ignore the amount of pending requests, the resync controller should
 559         * throttle down to incoming reply rate soon enough anyways. */
 560        return number;
 561}
 562
 563int w_make_resync_request(struct drbd_work *w, int cancel)
 564{
 565        struct drbd_conf *mdev = w->mdev;
 566        unsigned long bit;
 567        sector_t sector;
 568        const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
 569        int max_bio_size;
 570        int number, rollback_i, size;
 571        int align, queued, sndbuf;
 572        int i = 0;
 573
 574        if (unlikely(cancel))
 575                return 0;
 576
 577        if (mdev->rs_total == 0) {
 578                /* empty resync? */
 579                drbd_resync_finished(mdev);
 580                return 0;
 581        }
 582
 583        if (!get_ldev(mdev)) {
 584                /* Since we only need to access mdev->rsync a
 585                   get_ldev_if_state(mdev,D_FAILED) would be sufficient, but
 586                   to continue resync with a broken disk makes no sense at
 587                   all */
 588                dev_err(DEV, "Disk broke down during resync!\n");
 589                return 0;
 590        }
 591
 592        max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9;
 593        number = drbd_rs_number_requests(mdev);
 594        if (number == 0)
 595                goto requeue;
 596
 597        for (i = 0; i < number; i++) {
 598                /* Stop generating RS requests, when half of the send buffer is filled */
 599                mutex_lock(&mdev->tconn->data.mutex);
 600                if (mdev->tconn->data.socket) {
 601                        queued = mdev->tconn->data.socket->sk->sk_wmem_queued;
 602                        sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf;
 603                } else {
 604                        queued = 1;
 605                        sndbuf = 0;
 606                }
 607                mutex_unlock(&mdev->tconn->data.mutex);
 608                if (queued > sndbuf / 2)
 609                        goto requeue;
 610
 611next_sector:
 612                size = BM_BLOCK_SIZE;
 613                bit  = drbd_bm_find_next(mdev, mdev->bm_resync_fo);
 614
 615                if (bit == DRBD_END_OF_BITMAP) {
 616                        mdev->bm_resync_fo = drbd_bm_bits(mdev);
 617                        put_ldev(mdev);
 618                        return 0;
 619                }
 620
 621                sector = BM_BIT_TO_SECT(bit);
 622
 623                if (drbd_rs_should_slow_down(mdev, sector) ||
 624                    drbd_try_rs_begin_io(mdev, sector)) {
 625                        mdev->bm_resync_fo = bit;
 626                        goto requeue;
 627                }
 628                mdev->bm_resync_fo = bit + 1;
 629
 630                if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) {
 631                        drbd_rs_complete_io(mdev, sector);
 632                        goto next_sector;
 633                }
 634
 635#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
 636                /* try to find some adjacent bits.
 637                 * we stop if we have already the maximum req size.
 638                 *
 639                 * Additionally always align bigger requests, in order to
 640                 * be prepared for all stripe sizes of software RAIDs.
 641                 */
 642                align = 1;
 643                rollback_i = i;
 644                for (;;) {
 645                        if (size + BM_BLOCK_SIZE > max_bio_size)
 646                                break;
 647
 648                        /* Be always aligned */
 649                        if (sector & ((1<<(align+3))-1))
 650                                break;
 651
 652                        /* do not cross extent boundaries */
 653                        if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
 654                                break;
 655                        /* now, is it actually dirty, after all?
 656                         * caution, drbd_bm_test_bit is tri-state for some
 657                         * obscure reason; ( b == 0 ) would get the out-of-band
 658                         * only accidentally right because of the "oddly sized"
 659                         * adjustment below */
 660                        if (drbd_bm_test_bit(mdev, bit+1) != 1)
 661                                break;
 662                        bit++;
 663                        size += BM_BLOCK_SIZE;
 664                        if ((BM_BLOCK_SIZE << align) <= size)
 665                                align++;
 666                        i++;
 667                }
 668                /* if we merged some,
 669                 * reset the offset to start the next drbd_bm_find_next from */
 670                if (size > BM_BLOCK_SIZE)
 671                        mdev->bm_resync_fo = bit + 1;
 672#endif
 673
 674                /* adjust very last sectors, in case we are oddly sized */
 675                if (sector + (size>>9) > capacity)
 676                        size = (capacity-sector)<<9;
 677                if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) {
 678                        switch (read_for_csum(mdev, sector, size)) {
 679                        case -EIO: /* Disk failure */
 680                                put_ldev(mdev);
 681                                return -EIO;
 682                        case -EAGAIN: /* allocation failed, or ldev busy */
 683                                drbd_rs_complete_io(mdev, sector);
 684                                mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
 685                                i = rollback_i;
 686                                goto requeue;
 687                        case 0:
 688                                /* everything ok */
 689                                break;
 690                        default:
 691                                BUG();
 692                        }
 693                } else {
 694                        int err;
 695
 696                        inc_rs_pending(mdev);
 697                        err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST,
 698                                                 sector, size, ID_SYNCER);
 699                        if (err) {
 700                                dev_err(DEV, "drbd_send_drequest() failed, aborting...\n");
 701                                dec_rs_pending(mdev);
 702                                put_ldev(mdev);
 703                                return err;
 704                        }
 705                }
 706        }
 707
 708        if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) {
 709                /* last syncer _request_ was sent,
 710                 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
 711                 * next sync group will resume), as soon as we receive the last
 712                 * resync data block, and the last bit is cleared.
 713                 * until then resync "work" is "inactive" ...
 714                 */
 715                put_ldev(mdev);
 716                return 0;
 717        }
 718
 719 requeue:
 720        mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 721        mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
 722        put_ldev(mdev);
 723        return 0;
 724}
 725
 726static int w_make_ov_request(struct drbd_work *w, int cancel)
 727{
 728        struct drbd_conf *mdev = w->mdev;
 729        int number, i, size;
 730        sector_t sector;
 731        const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
 732        bool stop_sector_reached = false;
 733
 734        if (unlikely(cancel))
 735                return 1;
 736
 737        number = drbd_rs_number_requests(mdev);
 738
 739        sector = mdev->ov_position;
 740        for (i = 0; i < number; i++) {
 741                if (sector >= capacity)
 742                        return 1;
 743
 744                /* We check for "finished" only in the reply path:
 745                 * w_e_end_ov_reply().
 746                 * We need to send at least one request out. */
 747                stop_sector_reached = i > 0
 748                        && verify_can_do_stop_sector(mdev)
 749                        && sector >= mdev->ov_stop_sector;
 750                if (stop_sector_reached)
 751                        break;
 752
 753                size = BM_BLOCK_SIZE;
 754
 755                if (drbd_rs_should_slow_down(mdev, sector) ||
 756                    drbd_try_rs_begin_io(mdev, sector)) {
 757                        mdev->ov_position = sector;
 758                        goto requeue;
 759                }
 760
 761                if (sector + (size>>9) > capacity)
 762                        size = (capacity-sector)<<9;
 763
 764                inc_rs_pending(mdev);
 765                if (drbd_send_ov_request(mdev, sector, size)) {
 766                        dec_rs_pending(mdev);
 767                        return 0;
 768                }
 769                sector += BM_SECT_PER_BIT;
 770        }
 771        mdev->ov_position = sector;
 772
 773 requeue:
 774        mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 775        if (i == 0 || !stop_sector_reached)
 776                mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME);
 777        return 1;
 778}
 779
 780int w_ov_finished(struct drbd_work *w, int cancel)
 781{
 782        struct drbd_conf *mdev = w->mdev;
 783        kfree(w);
 784        ov_out_of_sync_print(mdev);
 785        drbd_resync_finished(mdev);
 786
 787        return 0;
 788}
 789
 790static int w_resync_finished(struct drbd_work *w, int cancel)
 791{
 792        struct drbd_conf *mdev = w->mdev;
 793        kfree(w);
 794
 795        drbd_resync_finished(mdev);
 796
 797        return 0;
 798}
 799
 800static void ping_peer(struct drbd_conf *mdev)
 801{
 802        struct drbd_tconn *tconn = mdev->tconn;
 803
 804        clear_bit(GOT_PING_ACK, &tconn->flags);
 805        request_ping(tconn);
 806        wait_event(tconn->ping_wait,
 807                   test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED);
 808}
 809
 810int drbd_resync_finished(struct drbd_conf *mdev)
 811{
 812        unsigned long db, dt, dbdt;
 813        unsigned long n_oos;
 814        union drbd_state os, ns;
 815        struct drbd_work *w;
 816        char *khelper_cmd = NULL;
 817        int verify_done = 0;
 818
 819        /* Remove all elements from the resync LRU. Since future actions
 820         * might set bits in the (main) bitmap, then the entries in the
 821         * resync LRU would be wrong. */
 822        if (drbd_rs_del_all(mdev)) {
 823                /* In case this is not possible now, most probably because
 824                 * there are P_RS_DATA_REPLY Packets lingering on the worker's
 825                 * queue (or even the read operations for those packets
 826                 * is not finished by now).   Retry in 100ms. */
 827
 828                schedule_timeout_interruptible(HZ / 10);
 829                w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC);
 830                if (w) {
 831                        w->cb = w_resync_finished;
 832                        w->mdev = mdev;
 833                        drbd_queue_work(&mdev->tconn->sender_work, w);
 834                        return 1;
 835                }
 836                dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n");
 837        }
 838
 839        dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ;
 840        if (dt <= 0)
 841                dt = 1;
 842        
 843        db = mdev->rs_total;
 844        /* adjust for verify start and stop sectors, respective reached position */
 845        if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
 846                db -= mdev->ov_left;
 847
 848        dbdt = Bit2KB(db/dt);
 849        mdev->rs_paused /= HZ;
 850
 851        if (!get_ldev(mdev))
 852                goto out;
 853
 854        ping_peer(mdev);
 855
 856        spin_lock_irq(&mdev->tconn->req_lock);
 857        os = drbd_read_state(mdev);
 858
 859        verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
 860
 861        /* This protects us against multiple calls (that can happen in the presence
 862           of application IO), and against connectivity loss just before we arrive here. */
 863        if (os.conn <= C_CONNECTED)
 864                goto out_unlock;
 865
 866        ns = os;
 867        ns.conn = C_CONNECTED;
 868
 869        dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
 870             verify_done ? "Online verify" : "Resync",
 871             dt + mdev->rs_paused, mdev->rs_paused, dbdt);
 872
 873        n_oos = drbd_bm_total_weight(mdev);
 874
 875        if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
 876                if (n_oos) {
 877                        dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n",
 878                              n_oos, Bit2KB(1));
 879                        khelper_cmd = "out-of-sync";
 880                }
 881        } else {
 882                D_ASSERT((n_oos - mdev->rs_failed) == 0);
 883
 884                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
 885                        khelper_cmd = "after-resync-target";
 886
 887                if (mdev->tconn->csums_tfm && mdev->rs_total) {
 888                        const unsigned long s = mdev->rs_same_csum;
 889                        const unsigned long t = mdev->rs_total;
 890                        const int ratio =
 891                                (t == 0)     ? 0 :
 892                        (t < 100000) ? ((s*100)/t) : (s/(t/100));
 893                        dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; "
 894                             "transferred %luK total %luK\n",
 895                             ratio,
 896                             Bit2KB(mdev->rs_same_csum),
 897                             Bit2KB(mdev->rs_total - mdev->rs_same_csum),
 898                             Bit2KB(mdev->rs_total));
 899                }
 900        }
 901
 902        if (mdev->rs_failed) {
 903                dev_info(DEV, "            %lu failed blocks\n", mdev->rs_failed);
 904
 905                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 906                        ns.disk = D_INCONSISTENT;
 907                        ns.pdsk = D_UP_TO_DATE;
 908                } else {
 909                        ns.disk = D_UP_TO_DATE;
 910                        ns.pdsk = D_INCONSISTENT;
 911                }
 912        } else {
 913                ns.disk = D_UP_TO_DATE;
 914                ns.pdsk = D_UP_TO_DATE;
 915
 916                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 917                        if (mdev->p_uuid) {
 918                                int i;
 919                                for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
 920                                        _drbd_uuid_set(mdev, i, mdev->p_uuid[i]);
 921                                drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]);
 922                                _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]);
 923                        } else {
 924                                dev_err(DEV, "mdev->p_uuid is NULL! BUG\n");
 925                        }
 926                }
 927
 928                if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
 929                        /* for verify runs, we don't update uuids here,
 930                         * so there would be nothing to report. */
 931                        drbd_uuid_set_bm(mdev, 0UL);
 932                        drbd_print_uuids(mdev, "updated UUIDs");
 933                        if (mdev->p_uuid) {
 934                                /* Now the two UUID sets are equal, update what we
 935                                 * know of the peer. */
 936                                int i;
 937                                for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
 938                                        mdev->p_uuid[i] = mdev->ldev->md.uuid[i];
 939                        }
 940                }
 941        }
 942
 943        _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
 944out_unlock:
 945        spin_unlock_irq(&mdev->tconn->req_lock);
 946        put_ldev(mdev);
 947out:
 948        mdev->rs_total  = 0;
 949        mdev->rs_failed = 0;
 950        mdev->rs_paused = 0;
 951
 952        /* reset start sector, if we reached end of device */
 953        if (verify_done && mdev->ov_left == 0)
 954                mdev->ov_start_sector = 0;
 955
 956        drbd_md_sync(mdev);
 957
 958        if (khelper_cmd)
 959                drbd_khelper(mdev, khelper_cmd);
 960
 961        return 1;
 962}
 963
 964/* helper */
 965static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
 966{
 967        if (drbd_peer_req_has_active_page(peer_req)) {
 968                /* This might happen if sendpage() has not finished */
 969                int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
 970                atomic_add(i, &mdev->pp_in_use_by_net);
 971                atomic_sub(i, &mdev->pp_in_use);
 972                spin_lock_irq(&mdev->tconn->req_lock);
 973                list_add_tail(&peer_req->w.list, &mdev->net_ee);
 974                spin_unlock_irq(&mdev->tconn->req_lock);
 975                wake_up(&drbd_pp_wait);
 976        } else
 977                drbd_free_peer_req(mdev, peer_req);
 978}
 979
 980/**
 981 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
 982 * @mdev:       DRBD device.
 983 * @w:          work object.
 984 * @cancel:     The connection will be closed anyways
 985 */
 986int w_e_end_data_req(struct drbd_work *w, int cancel)
 987{
 988        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
 989        struct drbd_conf *mdev = w->mdev;
 990        int err;
 991
 992        if (unlikely(cancel)) {
 993                drbd_free_peer_req(mdev, peer_req);
 994                dec_unacked(mdev);
 995                return 0;
 996        }
 997
 998        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
 999                err = drbd_send_block(mdev, P_DATA_REPLY, peer_req);
1000        } else {
1001                if (__ratelimit(&drbd_ratelimit_state))
1002                        dev_err(DEV, "Sending NegDReply. sector=%llus.\n",
1003                            (unsigned long long)peer_req->i.sector);
1004
1005                err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req);
1006        }
1007
1008        dec_unacked(mdev);
1009
1010        move_to_net_ee_or_free(mdev, peer_req);
1011
1012        if (unlikely(err))
1013                dev_err(DEV, "drbd_send_block() failed\n");
1014        return err;
1015}
1016
1017/**
1018 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1019 * @mdev:       DRBD device.
1020 * @w:          work object.
1021 * @cancel:     The connection will be closed anyways
1022 */
1023int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1024{
1025        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1026        struct drbd_conf *mdev = w->mdev;
1027        int err;
1028
1029        if (unlikely(cancel)) {
1030                drbd_free_peer_req(mdev, peer_req);
1031                dec_unacked(mdev);
1032                return 0;
1033        }
1034
1035        if (get_ldev_if_state(mdev, D_FAILED)) {
1036                drbd_rs_complete_io(mdev, peer_req->i.sector);
1037                put_ldev(mdev);
1038        }
1039
1040        if (mdev->state.conn == C_AHEAD) {
1041                err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req);
1042        } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1043                if (likely(mdev->state.pdsk >= D_INCONSISTENT)) {
1044                        inc_rs_pending(mdev);
1045                        err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1046                } else {
1047                        if (__ratelimit(&drbd_ratelimit_state))
1048                                dev_err(DEV, "Not sending RSDataReply, "
1049                                    "partner DISKLESS!\n");
1050                        err = 0;
1051                }
1052        } else {
1053                if (__ratelimit(&drbd_ratelimit_state))
1054                        dev_err(DEV, "Sending NegRSDReply. sector %llus.\n",
1055                            (unsigned long long)peer_req->i.sector);
1056
1057                err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1058
1059                /* update resync data with failure */
1060                drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size);
1061        }
1062
1063        dec_unacked(mdev);
1064
1065        move_to_net_ee_or_free(mdev, peer_req);
1066
1067        if (unlikely(err))
1068                dev_err(DEV, "drbd_send_block() failed\n");
1069        return err;
1070}
1071
1072int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1073{
1074        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1075        struct drbd_conf *mdev = w->mdev;
1076        struct digest_info *di;
1077        int digest_size;
1078        void *digest = NULL;
1079        int err, eq = 0;
1080
1081        if (unlikely(cancel)) {
1082                drbd_free_peer_req(mdev, peer_req);
1083                dec_unacked(mdev);
1084                return 0;
1085        }
1086
1087        if (get_ldev(mdev)) {
1088                drbd_rs_complete_io(mdev, peer_req->i.sector);
1089                put_ldev(mdev);
1090        }
1091
1092        di = peer_req->digest;
1093
1094        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1095                /* quick hack to try to avoid a race against reconfiguration.
1096                 * a real fix would be much more involved,
1097                 * introducing more locking mechanisms */
1098                if (mdev->tconn->csums_tfm) {
1099                        digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm);
1100                        D_ASSERT(digest_size == di->digest_size);
1101                        digest = kmalloc(digest_size, GFP_NOIO);
1102                }
1103                if (digest) {
1104                        drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest);
1105                        eq = !memcmp(digest, di->digest, digest_size);
1106                        kfree(digest);
1107                }
1108
1109                if (eq) {
1110                        drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size);
1111                        /* rs_same_csums unit is BM_BLOCK_SIZE */
1112                        mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1113                        err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req);
1114                } else {
1115                        inc_rs_pending(mdev);
1116                        peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1117                        peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1118                        kfree(di);
1119                        err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req);
1120                }
1121        } else {
1122                err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req);
1123                if (__ratelimit(&drbd_ratelimit_state))
1124                        dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n");
1125        }
1126
1127        dec_unacked(mdev);
1128        move_to_net_ee_or_free(mdev, peer_req);
1129
1130        if (unlikely(err))
1131                dev_err(DEV, "drbd_send_block/ack() failed\n");
1132        return err;
1133}
1134
1135int w_e_end_ov_req(struct drbd_work *w, int cancel)
1136{
1137        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1138        struct drbd_conf *mdev = w->mdev;
1139        sector_t sector = peer_req->i.sector;
1140        unsigned int size = peer_req->i.size;
1141        int digest_size;
1142        void *digest;
1143        int err = 0;
1144
1145        if (unlikely(cancel))
1146                goto out;
1147
1148        digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1149        digest = kmalloc(digest_size, GFP_NOIO);
1150        if (!digest) {
1151                err = 1;        /* terminate the connection in case the allocation failed */
1152                goto out;
1153        }
1154
1155        if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1156                drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1157        else
1158                memset(digest, 0, digest_size);
1159
1160        /* Free e and pages before send.
1161         * In case we block on congestion, we could otherwise run into
1162         * some distributed deadlock, if the other side blocks on
1163         * congestion as well, because our receiver blocks in
1164         * drbd_alloc_pages due to pp_in_use > max_buffers. */
1165        drbd_free_peer_req(mdev, peer_req);
1166        peer_req = NULL;
1167        inc_rs_pending(mdev);
1168        err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY);
1169        if (err)
1170                dec_rs_pending(mdev);
1171        kfree(digest);
1172
1173out:
1174        if (peer_req)
1175                drbd_free_peer_req(mdev, peer_req);
1176        dec_unacked(mdev);
1177        return err;
1178}
1179
1180void drbd_ov_out_of_sync_found(struct drbd_conf *mdev, sector_t sector, int size)
1181{
1182        if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) {
1183                mdev->ov_last_oos_size += size>>9;
1184        } else {
1185                mdev->ov_last_oos_start = sector;
1186                mdev->ov_last_oos_size = size>>9;
1187        }
1188        drbd_set_out_of_sync(mdev, sector, size);
1189}
1190
1191int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1192{
1193        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1194        struct drbd_conf *mdev = w->mdev;
1195        struct digest_info *di;
1196        void *digest;
1197        sector_t sector = peer_req->i.sector;
1198        unsigned int size = peer_req->i.size;
1199        int digest_size;
1200        int err, eq = 0;
1201        bool stop_sector_reached = false;
1202
1203        if (unlikely(cancel)) {
1204                drbd_free_peer_req(mdev, peer_req);
1205                dec_unacked(mdev);
1206                return 0;
1207        }
1208
1209        /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1210         * the resync lru has been cleaned up already */
1211        if (get_ldev(mdev)) {
1212                drbd_rs_complete_io(mdev, peer_req->i.sector);
1213                put_ldev(mdev);
1214        }
1215
1216        di = peer_req->digest;
1217
1218        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1219                digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm);
1220                digest = kmalloc(digest_size, GFP_NOIO);
1221                if (digest) {
1222                        drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest);
1223
1224                        D_ASSERT(digest_size == di->digest_size);
1225                        eq = !memcmp(digest, di->digest, digest_size);
1226                        kfree(digest);
1227                }
1228        }
1229
1230        /* Free peer_req and pages before send.
1231         * In case we block on congestion, we could otherwise run into
1232         * some distributed deadlock, if the other side blocks on
1233         * congestion as well, because our receiver blocks in
1234         * drbd_alloc_pages due to pp_in_use > max_buffers. */
1235        drbd_free_peer_req(mdev, peer_req);
1236        if (!eq)
1237                drbd_ov_out_of_sync_found(mdev, sector, size);
1238        else
1239                ov_out_of_sync_print(mdev);
1240
1241        err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size,
1242                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1243
1244        dec_unacked(mdev);
1245
1246        --mdev->ov_left;
1247
1248        /* let's advance progress step marks only for every other megabyte */
1249        if ((mdev->ov_left & 0x200) == 0x200)
1250                drbd_advance_rs_marks(mdev, mdev->ov_left);
1251
1252        stop_sector_reached = verify_can_do_stop_sector(mdev) &&
1253                (sector + (size>>9)) >= mdev->ov_stop_sector;
1254
1255        if (mdev->ov_left == 0 || stop_sector_reached) {
1256                ov_out_of_sync_print(mdev);
1257                drbd_resync_finished(mdev);
1258        }
1259
1260        return err;
1261}
1262
1263int w_prev_work_done(struct drbd_work *w, int cancel)
1264{
1265        struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w);
1266
1267        complete(&b->done);
1268        return 0;
1269}
1270
1271/* FIXME
1272 * We need to track the number of pending barrier acks,
1273 * and to be able to wait for them.
1274 * See also comment in drbd_adm_attach before drbd_suspend_io.
1275 */
1276int drbd_send_barrier(struct drbd_tconn *tconn)
1277{
1278        struct p_barrier *p;
1279        struct drbd_socket *sock;
1280
1281        sock = &tconn->data;
1282        p = conn_prepare_command(tconn, sock);
1283        if (!p)
1284                return -EIO;
1285        p->barrier = tconn->send.current_epoch_nr;
1286        p->pad = 0;
1287        tconn->send.current_epoch_writes = 0;
1288
1289        return conn_send_command(tconn, sock, P_BARRIER, sizeof(*p), NULL, 0);
1290}
1291
1292int w_send_write_hint(struct drbd_work *w, int cancel)
1293{
1294        struct drbd_conf *mdev = w->mdev;
1295        struct drbd_socket *sock;
1296
1297        if (cancel)
1298                return 0;
1299        sock = &mdev->tconn->data;
1300        if (!drbd_prepare_command(mdev, sock))
1301                return -EIO;
1302        return drbd_send_command(mdev, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1303}
1304
1305static void re_init_if_first_write(struct drbd_tconn *tconn, unsigned int epoch)
1306{
1307        if (!tconn->send.seen_any_write_yet) {
1308                tconn->send.seen_any_write_yet = true;
1309                tconn->send.current_epoch_nr = epoch;
1310                tconn->send.current_epoch_writes = 0;
1311        }
1312}
1313
1314static void maybe_send_barrier(struct drbd_tconn *tconn, unsigned int epoch)
1315{
1316        /* re-init if first write on this connection */
1317        if (!tconn->send.seen_any_write_yet)
1318                return;
1319        if (tconn->send.current_epoch_nr != epoch) {
1320                if (tconn->send.current_epoch_writes)
1321                        drbd_send_barrier(tconn);
1322                tconn->send.current_epoch_nr = epoch;
1323        }
1324}
1325
1326int w_send_out_of_sync(struct drbd_work *w, int cancel)
1327{
1328        struct drbd_request *req = container_of(w, struct drbd_request, w);
1329        struct drbd_conf *mdev = w->mdev;
1330        struct drbd_tconn *tconn = mdev->tconn;
1331        int err;
1332
1333        if (unlikely(cancel)) {
1334                req_mod(req, SEND_CANCELED);
1335                return 0;
1336        }
1337
1338        /* this time, no tconn->send.current_epoch_writes++;
1339         * If it was sent, it was the closing barrier for the last
1340         * replicated epoch, before we went into AHEAD mode.
1341         * No more barriers will be sent, until we leave AHEAD mode again. */
1342        maybe_send_barrier(tconn, req->epoch);
1343
1344        err = drbd_send_out_of_sync(mdev, req);
1345        req_mod(req, OOS_HANDED_TO_NETWORK);
1346
1347        return err;
1348}
1349
1350/**
1351 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1352 * @mdev:       DRBD device.
1353 * @w:          work object.
1354 * @cancel:     The connection will be closed anyways
1355 */
1356int w_send_dblock(struct drbd_work *w, int cancel)
1357{
1358        struct drbd_request *req = container_of(w, struct drbd_request, w);
1359        struct drbd_conf *mdev = w->mdev;
1360        struct drbd_tconn *tconn = mdev->tconn;
1361        int err;
1362
1363        if (unlikely(cancel)) {
1364                req_mod(req, SEND_CANCELED);
1365                return 0;
1366        }
1367
1368        re_init_if_first_write(tconn, req->epoch);
1369        maybe_send_barrier(tconn, req->epoch);
1370        tconn->send.current_epoch_writes++;
1371
1372        err = drbd_send_dblock(mdev, req);
1373        req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1374
1375        return err;
1376}
1377
1378/**
1379 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1380 * @mdev:       DRBD device.
1381 * @w:          work object.
1382 * @cancel:     The connection will be closed anyways
1383 */
1384int w_send_read_req(struct drbd_work *w, int cancel)
1385{
1386        struct drbd_request *req = container_of(w, struct drbd_request, w);
1387        struct drbd_conf *mdev = w->mdev;
1388        struct drbd_tconn *tconn = mdev->tconn;
1389        int err;
1390
1391        if (unlikely(cancel)) {
1392                req_mod(req, SEND_CANCELED);
1393                return 0;
1394        }
1395
1396        /* Even read requests may close a write epoch,
1397         * if there was any yet. */
1398        maybe_send_barrier(tconn, req->epoch);
1399
1400        err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size,
1401                                 (unsigned long)req);
1402
1403        req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1404
1405        return err;
1406}
1407
1408int w_restart_disk_io(struct drbd_work *w, int cancel)
1409{
1410        struct drbd_request *req = container_of(w, struct drbd_request, w);
1411        struct drbd_conf *mdev = w->mdev;
1412
1413        if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1414                drbd_al_begin_io(mdev, &req->i, false);
1415
1416        drbd_req_make_private_bio(req, req->master_bio);
1417        req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
1418        generic_make_request(req->private_bio);
1419
1420        return 0;
1421}
1422
1423static int _drbd_may_sync_now(struct drbd_conf *mdev)
1424{
1425        struct drbd_conf *odev = mdev;
1426        int resync_after;
1427
1428        while (1) {
1429                if (!odev->ldev || odev->state.disk == D_DISKLESS)
1430                        return 1;
1431                rcu_read_lock();
1432                resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1433                rcu_read_unlock();
1434                if (resync_after == -1)
1435                        return 1;
1436                odev = minor_to_mdev(resync_after);
1437                if (!odev)
1438                        return 1;
1439                if ((odev->state.conn >= C_SYNC_SOURCE &&
1440                     odev->state.conn <= C_PAUSED_SYNC_T) ||
1441                    odev->state.aftr_isp || odev->state.peer_isp ||
1442                    odev->state.user_isp)
1443                        return 0;
1444        }
1445}
1446
1447/**
1448 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1449 * @mdev:       DRBD device.
1450 *
1451 * Called from process context only (admin command and after_state_ch).
1452 */
1453static int _drbd_pause_after(struct drbd_conf *mdev)
1454{
1455        struct drbd_conf *odev;
1456        int i, rv = 0;
1457
1458        rcu_read_lock();
1459        idr_for_each_entry(&minors, odev, i) {
1460                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1461                        continue;
1462                if (!_drbd_may_sync_now(odev))
1463                        rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1464                               != SS_NOTHING_TO_DO);
1465        }
1466        rcu_read_unlock();
1467
1468        return rv;
1469}
1470
1471/**
1472 * _drbd_resume_next() - Resume resync on all devices that may resync now
1473 * @mdev:       DRBD device.
1474 *
1475 * Called from process context only (admin command and worker).
1476 */
1477static int _drbd_resume_next(struct drbd_conf *mdev)
1478{
1479        struct drbd_conf *odev;
1480        int i, rv = 0;
1481
1482        rcu_read_lock();
1483        idr_for_each_entry(&minors, odev, i) {
1484                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1485                        continue;
1486                if (odev->state.aftr_isp) {
1487                        if (_drbd_may_sync_now(odev))
1488                                rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1489                                                        CS_HARD, NULL)
1490                                       != SS_NOTHING_TO_DO) ;
1491                }
1492        }
1493        rcu_read_unlock();
1494        return rv;
1495}
1496
1497void resume_next_sg(struct drbd_conf *mdev)
1498{
1499        write_lock_irq(&global_state_lock);
1500        _drbd_resume_next(mdev);
1501        write_unlock_irq(&global_state_lock);
1502}
1503
1504void suspend_other_sg(struct drbd_conf *mdev)
1505{
1506        write_lock_irq(&global_state_lock);
1507        _drbd_pause_after(mdev);
1508        write_unlock_irq(&global_state_lock);
1509}
1510
1511/* caller must hold global_state_lock */
1512enum drbd_ret_code drbd_resync_after_valid(struct drbd_conf *mdev, int o_minor)
1513{
1514        struct drbd_conf *odev;
1515        int resync_after;
1516
1517        if (o_minor == -1)
1518                return NO_ERROR;
1519        if (o_minor < -1 || o_minor > MINORMASK)
1520                return ERR_RESYNC_AFTER;
1521
1522        /* check for loops */
1523        odev = minor_to_mdev(o_minor);
1524        while (1) {
1525                if (odev == mdev)
1526                        return ERR_RESYNC_AFTER_CYCLE;
1527
1528                /* You are free to depend on diskless, non-existing,
1529                 * or not yet/no longer existing minors.
1530                 * We only reject dependency loops.
1531                 * We cannot follow the dependency chain beyond a detached or
1532                 * missing minor.
1533                 */
1534                if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1535                        return NO_ERROR;
1536
1537                rcu_read_lock();
1538                resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1539                rcu_read_unlock();
1540                /* dependency chain ends here, no cycles. */
1541                if (resync_after == -1)
1542                        return NO_ERROR;
1543
1544                /* follow the dependency chain */
1545                odev = minor_to_mdev(resync_after);
1546        }
1547}
1548
1549/* caller must hold global_state_lock */
1550void drbd_resync_after_changed(struct drbd_conf *mdev)
1551{
1552        int changes;
1553
1554        do {
1555                changes  = _drbd_pause_after(mdev);
1556                changes |= _drbd_resume_next(mdev);
1557        } while (changes);
1558}
1559
1560void drbd_rs_controller_reset(struct drbd_conf *mdev)
1561{
1562        struct fifo_buffer *plan;
1563
1564        atomic_set(&mdev->rs_sect_in, 0);
1565        atomic_set(&mdev->rs_sect_ev, 0);
1566        mdev->rs_in_flight = 0;
1567
1568        /* Updating the RCU protected object in place is necessary since
1569           this function gets called from atomic context.
1570           It is valid since all other updates also lead to an completely
1571           empty fifo */
1572        rcu_read_lock();
1573        plan = rcu_dereference(mdev->rs_plan_s);
1574        plan->total = 0;
1575        fifo_set(plan, 0);
1576        rcu_read_unlock();
1577}
1578
1579void start_resync_timer_fn(unsigned long data)
1580{
1581        struct drbd_conf *mdev = (struct drbd_conf *) data;
1582
1583        drbd_queue_work(&mdev->tconn->sender_work, &mdev->start_resync_work);
1584}
1585
1586int w_start_resync(struct drbd_work *w, int cancel)
1587{
1588        struct drbd_conf *mdev = w->mdev;
1589
1590        if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) {
1591                dev_warn(DEV, "w_start_resync later...\n");
1592                mdev->start_resync_timer.expires = jiffies + HZ/10;
1593                add_timer(&mdev->start_resync_timer);
1594                return 0;
1595        }
1596
1597        drbd_start_resync(mdev, C_SYNC_SOURCE);
1598        clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags);
1599        return 0;
1600}
1601
1602/**
1603 * drbd_start_resync() - Start the resync process
1604 * @mdev:       DRBD device.
1605 * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1606 *
1607 * This function might bring you directly into one of the
1608 * C_PAUSED_SYNC_* states.
1609 */
1610void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side)
1611{
1612        union drbd_state ns;
1613        int r;
1614
1615        if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) {
1616                dev_err(DEV, "Resync already running!\n");
1617                return;
1618        }
1619
1620        if (!test_bit(B_RS_H_DONE, &mdev->flags)) {
1621                if (side == C_SYNC_TARGET) {
1622                        /* Since application IO was locked out during C_WF_BITMAP_T and
1623                           C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1624                           we check that we might make the data inconsistent. */
1625                        r = drbd_khelper(mdev, "before-resync-target");
1626                        r = (r >> 8) & 0xff;
1627                        if (r > 0) {
1628                                dev_info(DEV, "before-resync-target handler returned %d, "
1629                                         "dropping connection.\n", r);
1630                                conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1631                                return;
1632                        }
1633                } else /* C_SYNC_SOURCE */ {
1634                        r = drbd_khelper(mdev, "before-resync-source");
1635                        r = (r >> 8) & 0xff;
1636                        if (r > 0) {
1637                                if (r == 3) {
1638                                        dev_info(DEV, "before-resync-source handler returned %d, "
1639                                                 "ignoring. Old userland tools?", r);
1640                                } else {
1641                                        dev_info(DEV, "before-resync-source handler returned %d, "
1642                                                 "dropping connection.\n", r);
1643                                        conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
1644                                        return;
1645                                }
1646                        }
1647                }
1648        }
1649
1650        if (current == mdev->tconn->worker.task) {
1651                /* The worker should not sleep waiting for state_mutex,
1652                   that can take long */
1653                if (!mutex_trylock(mdev->state_mutex)) {
1654                        set_bit(B_RS_H_DONE, &mdev->flags);
1655                        mdev->start_resync_timer.expires = jiffies + HZ/5;
1656                        add_timer(&mdev->start_resync_timer);
1657                        return;
1658                }
1659        } else {
1660                mutex_lock(mdev->state_mutex);
1661        }
1662        clear_bit(B_RS_H_DONE, &mdev->flags);
1663
1664        write_lock_irq(&global_state_lock);
1665        /* Did some connection breakage or IO error race with us? */
1666        if (mdev->state.conn < C_CONNECTED
1667        || !get_ldev_if_state(mdev, D_NEGOTIATING)) {
1668                write_unlock_irq(&global_state_lock);
1669                mutex_unlock(mdev->state_mutex);
1670                return;
1671        }
1672
1673        ns = drbd_read_state(mdev);
1674
1675        ns.aftr_isp = !_drbd_may_sync_now(mdev);
1676
1677        ns.conn = side;
1678
1679        if (side == C_SYNC_TARGET)
1680                ns.disk = D_INCONSISTENT;
1681        else /* side == C_SYNC_SOURCE */
1682                ns.pdsk = D_INCONSISTENT;
1683
1684        r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
1685        ns = drbd_read_state(mdev);
1686
1687        if (ns.conn < C_CONNECTED)
1688                r = SS_UNKNOWN_ERROR;
1689
1690        if (r == SS_SUCCESS) {
1691                unsigned long tw = drbd_bm_total_weight(mdev);
1692                unsigned long now = jiffies;
1693                int i;
1694
1695                mdev->rs_failed    = 0;
1696                mdev->rs_paused    = 0;
1697                mdev->rs_same_csum = 0;
1698                mdev->rs_last_events = 0;
1699                mdev->rs_last_sect_ev = 0;
1700                mdev->rs_total     = tw;
1701                mdev->rs_start     = now;
1702                for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1703                        mdev->rs_mark_left[i] = tw;
1704                        mdev->rs_mark_time[i] = now;
1705                }
1706                _drbd_pause_after(mdev);
1707        }
1708        write_unlock_irq(&global_state_lock);
1709
1710        if (r == SS_SUCCESS) {
1711                /* reset rs_last_bcast when a resync or verify is started,
1712                 * to deal with potential jiffies wrap. */
1713                mdev->rs_last_bcast = jiffies - HZ;
1714
1715                dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1716                     drbd_conn_str(ns.conn),
1717                     (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10),
1718                     (unsigned long) mdev->rs_total);
1719                if (side == C_SYNC_TARGET)
1720                        mdev->bm_resync_fo = 0;
1721
1722                /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1723                 * with w_send_oos, or the sync target will get confused as to
1724                 * how much bits to resync.  We cannot do that always, because for an
1725                 * empty resync and protocol < 95, we need to do it here, as we call
1726                 * drbd_resync_finished from here in that case.
1727                 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1728                 * and from after_state_ch otherwise. */
1729                if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96)
1730                        drbd_gen_and_send_sync_uuid(mdev);
1731
1732                if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) {
1733                        /* This still has a race (about when exactly the peers
1734                         * detect connection loss) that can lead to a full sync
1735                         * on next handshake. In 8.3.9 we fixed this with explicit
1736                         * resync-finished notifications, but the fix
1737                         * introduces a protocol change.  Sleeping for some
1738                         * time longer than the ping interval + timeout on the
1739                         * SyncSource, to give the SyncTarget the chance to
1740                         * detect connection loss, then waiting for a ping
1741                         * response (implicit in drbd_resync_finished) reduces
1742                         * the race considerably, but does not solve it. */
1743                        if (side == C_SYNC_SOURCE) {
1744                                struct net_conf *nc;
1745                                int timeo;
1746
1747                                rcu_read_lock();
1748                                nc = rcu_dereference(mdev->tconn->net_conf);
1749                                timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1750                                rcu_read_unlock();
1751                                schedule_timeout_interruptible(timeo);
1752                        }
1753                        drbd_resync_finished(mdev);
1754                }
1755
1756                drbd_rs_controller_reset(mdev);
1757                /* ns.conn may already be != mdev->state.conn,
1758                 * we may have been paused in between, or become paused until
1759                 * the timer triggers.
1760                 * No matter, that is handled in resync_timer_fn() */
1761                if (ns.conn == C_SYNC_TARGET)
1762                        mod_timer(&mdev->resync_timer, jiffies);
1763
1764                drbd_md_sync(mdev);
1765        }
1766        put_ldev(mdev);
1767        mutex_unlock(mdev->state_mutex);
1768}
1769
1770/* If the resource already closed the current epoch, but we did not
1771 * (because we have not yet seen new requests), we should send the
1772 * corresponding barrier now.  Must be checked within the same spinlock
1773 * that is used to check for new requests. */
1774bool need_to_send_barrier(struct drbd_tconn *connection)
1775{
1776        if (!connection->send.seen_any_write_yet)
1777                return false;
1778
1779        /* Skip barriers that do not contain any writes.
1780         * This may happen during AHEAD mode. */
1781        if (!connection->send.current_epoch_writes)
1782                return false;
1783
1784        /* ->req_lock is held when requests are queued on
1785         * connection->sender_work, and put into ->transfer_log.
1786         * It is also held when ->current_tle_nr is increased.
1787         * So either there are already new requests queued,
1788         * and corresponding barriers will be send there.
1789         * Or nothing new is queued yet, so the difference will be 1.
1790         */
1791        if (atomic_read(&connection->current_tle_nr) !=
1792            connection->send.current_epoch_nr + 1)
1793                return false;
1794
1795        return true;
1796}
1797
1798bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1799{
1800        spin_lock_irq(&queue->q_lock);
1801        list_splice_init(&queue->q, work_list);
1802        spin_unlock_irq(&queue->q_lock);
1803        return !list_empty(work_list);
1804}
1805
1806bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list)
1807{
1808        spin_lock_irq(&queue->q_lock);
1809        if (!list_empty(&queue->q))
1810                list_move(queue->q.next, work_list);
1811        spin_unlock_irq(&queue->q_lock);
1812        return !list_empty(work_list);
1813}
1814
1815void wait_for_work(struct drbd_tconn *connection, struct list_head *work_list)
1816{
1817        DEFINE_WAIT(wait);
1818        struct net_conf *nc;
1819        int uncork, cork;
1820
1821        dequeue_work_item(&connection->sender_work, work_list);
1822        if (!list_empty(work_list))
1823                return;
1824
1825        /* Still nothing to do?
1826         * Maybe we still need to close the current epoch,
1827         * even if no new requests are queued yet.
1828         *
1829         * Also, poke TCP, just in case.
1830         * Then wait for new work (or signal). */
1831        rcu_read_lock();
1832        nc = rcu_dereference(connection->net_conf);
1833        uncork = nc ? nc->tcp_cork : 0;
1834        rcu_read_unlock();
1835        if (uncork) {
1836                mutex_lock(&connection->data.mutex);
1837                if (connection->data.socket)
1838                        drbd_tcp_uncork(connection->data.socket);
1839                mutex_unlock(&connection->data.mutex);
1840        }
1841
1842        for (;;) {
1843                int send_barrier;
1844                prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
1845                spin_lock_irq(&connection->req_lock);
1846                spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
1847                /* dequeue single item only,
1848                 * we still use drbd_queue_work_front() in some places */
1849                if (!list_empty(&connection->sender_work.q))
1850                        list_move(connection->sender_work.q.next, work_list);
1851                spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
1852                if (!list_empty(work_list) || signal_pending(current)) {
1853                        spin_unlock_irq(&connection->req_lock);
1854                        break;
1855                }
1856                send_barrier = need_to_send_barrier(connection);
1857                spin_unlock_irq(&connection->req_lock);
1858                if (send_barrier) {
1859                        drbd_send_barrier(connection);
1860                        connection->send.current_epoch_nr++;
1861                }
1862                schedule();
1863                /* may be woken up for other things but new work, too,
1864                 * e.g. if the current epoch got closed.
1865                 * In which case we send the barrier above. */
1866        }
1867        finish_wait(&connection->sender_work.q_wait, &wait);
1868
1869        /* someone may have changed the config while we have been waiting above. */
1870        rcu_read_lock();
1871        nc = rcu_dereference(connection->net_conf);
1872        cork = nc ? nc->tcp_cork : 0;
1873        rcu_read_unlock();
1874        mutex_lock(&connection->data.mutex);
1875        if (connection->data.socket) {
1876                if (cork)
1877                        drbd_tcp_cork(connection->data.socket);
1878                else if (!uncork)
1879                        drbd_tcp_uncork(connection->data.socket);
1880        }
1881        mutex_unlock(&connection->data.mutex);
1882}
1883
1884int drbd_worker(struct drbd_thread *thi)
1885{
1886        struct drbd_tconn *tconn = thi->tconn;
1887        struct drbd_work *w = NULL;
1888        struct drbd_conf *mdev;
1889        LIST_HEAD(work_list);
1890        int vnr;
1891
1892        while (get_t_state(thi) == RUNNING) {
1893                drbd_thread_current_set_cpu(thi);
1894
1895                /* as long as we use drbd_queue_work_front(),
1896                 * we may only dequeue single work items here, not batches. */
1897                if (list_empty(&work_list))
1898                        wait_for_work(tconn, &work_list);
1899
1900                if (signal_pending(current)) {
1901                        flush_signals(current);
1902                        if (get_t_state(thi) == RUNNING) {
1903                                conn_warn(tconn, "Worker got an unexpected signal\n");
1904                                continue;
1905                        }
1906                        break;
1907                }
1908
1909                if (get_t_state(thi) != RUNNING)
1910                        break;
1911
1912                while (!list_empty(&work_list)) {
1913                        w = list_first_entry(&work_list, struct drbd_work, list);
1914                        list_del_init(&w->list);
1915                        if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS) == 0)
1916                                continue;
1917                        if (tconn->cstate >= C_WF_REPORT_PARAMS)
1918                                conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
1919                }
1920        }
1921
1922        do {
1923                while (!list_empty(&work_list)) {
1924                        w = list_first_entry(&work_list, struct drbd_work, list);
1925                        list_del_init(&w->list);
1926                        w->cb(w, 1);
1927                }
1928                dequeue_work_batch(&tconn->sender_work, &work_list);
1929        } while (!list_empty(&work_list));
1930
1931        rcu_read_lock();
1932        idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1933                D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE);
1934                kref_get(&mdev->kref);
1935                rcu_read_unlock();
1936                drbd_mdev_cleanup(mdev);
1937                kref_put(&mdev->kref, &drbd_minor_destroy);
1938                rcu_read_lock();
1939        }
1940        rcu_read_unlock();
1941
1942        return 0;
1943}
1944