linux/drivers/block/drbd/drbd_worker.c
<<
>>
Prefs
   1/*
   2   drbd_worker.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23
  24*/
  25
  26#include <linux/module.h>
  27#include <linux/drbd.h>
  28#include <linux/sched.h>
  29#include <linux/wait.h>
  30#include <linux/mm.h>
  31#include <linux/memcontrol.h>
  32#include <linux/mm_inline.h>
  33#include <linux/slab.h>
  34#include <linux/random.h>
  35#include <linux/string.h>
  36#include <linux/scatterlist.h>
  37
  38#include "drbd_int.h"
  39#include "drbd_protocol.h"
  40#include "drbd_req.h"
  41
  42static int make_ov_request(struct drbd_device *, int);
  43static int make_resync_request(struct drbd_device *, int);
  44
  45/* endio handlers:
  46 *   drbd_md_endio (defined here)
  47 *   drbd_request_endio (defined here)
  48 *   drbd_peer_request_endio (defined here)
  49 *   drbd_bm_endio (defined in drbd_bitmap.c)
  50 *
  51 * For all these callbacks, note the following:
  52 * The callbacks will be called in irq context by the IDE drivers,
  53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
  54 * Try to get the locking right :)
  55 *
  56 */
  57
  58
  59/* About the global_state_lock
  60   Each state transition on an device holds a read lock. In case we have
  61   to evaluate the resync after dependencies, we grab a write lock, because
  62   we need stable states on all devices for that.  */
  63rwlock_t global_state_lock;
  64
  65/* used for synchronous meta data and bitmap IO
  66 * submitted by drbd_md_sync_page_io()
  67 */
  68void drbd_md_endio(struct bio *bio)
  69{
  70        struct drbd_device *device;
  71
  72        device = bio->bi_private;
  73        device->md_io.error = bio->bi_error;
  74
  75        /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
  76         * to timeout on the lower level device, and eventually detach from it.
  77         * If this io completion runs after that timeout expired, this
  78         * drbd_md_put_buffer() may allow us to finally try and re-attach.
  79         * During normal operation, this only puts that extra reference
  80         * down to 1 again.
  81         * Make sure we first drop the reference, and only then signal
  82         * completion, or we may (in drbd_al_read_log()) cycle so fast into the
  83         * next drbd_md_sync_page_io(), that we trigger the
  84         * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
  85         */
  86        drbd_md_put_buffer(device);
  87        device->md_io.done = 1;
  88        wake_up(&device->misc_wait);
  89        bio_put(bio);
  90        if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
  91                put_ldev(device);
  92}
  93
  94/* reads on behalf of the partner,
  95 * "submitted" by the receiver
  96 */
  97static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
  98{
  99        unsigned long flags = 0;
 100        struct drbd_peer_device *peer_device = peer_req->peer_device;
 101        struct drbd_device *device = peer_device->device;
 102
 103        spin_lock_irqsave(&device->resource->req_lock, flags);
 104        device->read_cnt += peer_req->i.size >> 9;
 105        list_del(&peer_req->w.list);
 106        if (list_empty(&device->read_ee))
 107                wake_up(&device->ee_wait);
 108        if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
 109                __drbd_chk_io_error(device, DRBD_READ_ERROR);
 110        spin_unlock_irqrestore(&device->resource->req_lock, flags);
 111
 112        drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
 113        put_ldev(device);
 114}
 115
 116/* writes on behalf of the partner, or resync writes,
 117 * "submitted" by the receiver, final stage.  */
 118void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
 119{
 120        unsigned long flags = 0;
 121        struct drbd_peer_device *peer_device = peer_req->peer_device;
 122        struct drbd_device *device = peer_device->device;
 123        struct drbd_interval i;
 124        int do_wake;
 125        u64 block_id;
 126        int do_al_complete_io;
 127
 128        /* after we moved peer_req to done_ee,
 129         * we may no longer access it,
 130         * it may be freed/reused already!
 131         * (as soon as we release the req_lock) */
 132        i = peer_req->i;
 133        do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
 134        block_id = peer_req->block_id;
 135        peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
 136
 137        spin_lock_irqsave(&device->resource->req_lock, flags);
 138        device->writ_cnt += peer_req->i.size >> 9;
 139        list_move_tail(&peer_req->w.list, &device->done_ee);
 140
 141        /*
 142         * Do not remove from the write_requests tree here: we did not send the
 143         * Ack yet and did not wake possibly waiting conflicting requests.
 144         * Removed from the tree from "drbd_process_done_ee" within the
 145         * appropriate dw.cb (e_end_block/e_end_resync_block) or from
 146         * _drbd_clear_done_ee.
 147         */
 148
 149        do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
 150
 151        /* FIXME do we want to detach for failed REQ_DISCARD?
 152         * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
 153        if (peer_req->flags & EE_WAS_ERROR)
 154                __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
 155        spin_unlock_irqrestore(&device->resource->req_lock, flags);
 156
 157        if (block_id == ID_SYNCER)
 158                drbd_rs_complete_io(device, i.sector);
 159
 160        if (do_wake)
 161                wake_up(&device->ee_wait);
 162
 163        if (do_al_complete_io)
 164                drbd_al_complete_io(device, &i);
 165
 166        wake_asender(peer_device->connection);
 167        put_ldev(device);
 168}
 169
 170/* writes on behalf of the partner, or resync writes,
 171 * "submitted" by the receiver.
 172 */
 173void drbd_peer_request_endio(struct bio *bio)
 174{
 175        struct drbd_peer_request *peer_req = bio->bi_private;
 176        struct drbd_device *device = peer_req->peer_device->device;
 177        int is_write = bio_data_dir(bio) == WRITE;
 178        int is_discard = !!(bio->bi_rw & REQ_DISCARD);
 179
 180        if (bio->bi_error && __ratelimit(&drbd_ratelimit_state))
 181                drbd_warn(device, "%s: error=%d s=%llus\n",
 182                                is_write ? (is_discard ? "discard" : "write")
 183                                        : "read", bio->bi_error,
 184                                (unsigned long long)peer_req->i.sector);
 185
 186        if (bio->bi_error)
 187                set_bit(__EE_WAS_ERROR, &peer_req->flags);
 188
 189        bio_put(bio); /* no need for the bio anymore */
 190        if (atomic_dec_and_test(&peer_req->pending_bios)) {
 191                if (is_write)
 192                        drbd_endio_write_sec_final(peer_req);
 193                else
 194                        drbd_endio_read_sec_final(peer_req);
 195        }
 196}
 197
 198/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
 199 */
 200void drbd_request_endio(struct bio *bio)
 201{
 202        unsigned long flags;
 203        struct drbd_request *req = bio->bi_private;
 204        struct drbd_device *device = req->device;
 205        struct bio_and_error m;
 206        enum drbd_req_event what;
 207
 208        /* If this request was aborted locally before,
 209         * but now was completed "successfully",
 210         * chances are that this caused arbitrary data corruption.
 211         *
 212         * "aborting" requests, or force-detaching the disk, is intended for
 213         * completely blocked/hung local backing devices which do no longer
 214         * complete requests at all, not even do error completions.  In this
 215         * situation, usually a hard-reset and failover is the only way out.
 216         *
 217         * By "aborting", basically faking a local error-completion,
 218         * we allow for a more graceful swichover by cleanly migrating services.
 219         * Still the affected node has to be rebooted "soon".
 220         *
 221         * By completing these requests, we allow the upper layers to re-use
 222         * the associated data pages.
 223         *
 224         * If later the local backing device "recovers", and now DMAs some data
 225         * from disk into the original request pages, in the best case it will
 226         * just put random data into unused pages; but typically it will corrupt
 227         * meanwhile completely unrelated data, causing all sorts of damage.
 228         *
 229         * Which means delayed successful completion,
 230         * especially for READ requests,
 231         * is a reason to panic().
 232         *
 233         * We assume that a delayed *error* completion is OK,
 234         * though we still will complain noisily about it.
 235         */
 236        if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
 237                if (__ratelimit(&drbd_ratelimit_state))
 238                        drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
 239
 240                if (!bio->bi_error)
 241                        panic("possible random memory corruption caused by delayed completion of aborted local request\n");
 242        }
 243
 244        /* to avoid recursion in __req_mod */
 245        if (unlikely(bio->bi_error)) {
 246                if (bio->bi_rw & REQ_DISCARD)
 247                        what = (bio->bi_error == -EOPNOTSUPP)
 248                                ? DISCARD_COMPLETED_NOTSUPP
 249                                : DISCARD_COMPLETED_WITH_ERROR;
 250                else
 251                        what = (bio_data_dir(bio) == WRITE)
 252                        ? WRITE_COMPLETED_WITH_ERROR
 253                        : (bio_rw(bio) == READ)
 254                          ? READ_COMPLETED_WITH_ERROR
 255                          : READ_AHEAD_COMPLETED_WITH_ERROR;
 256        } else
 257                what = COMPLETED_OK;
 258
 259        bio_put(req->private_bio);
 260        req->private_bio = ERR_PTR(bio->bi_error);
 261
 262        /* not req_mod(), we need irqsave here! */
 263        spin_lock_irqsave(&device->resource->req_lock, flags);
 264        __req_mod(req, what, &m);
 265        spin_unlock_irqrestore(&device->resource->req_lock, flags);
 266        put_ldev(device);
 267
 268        if (m.bio)
 269                complete_master_bio(device, &m);
 270}
 271
 272void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
 273{
 274        struct hash_desc desc;
 275        struct scatterlist sg;
 276        struct page *page = peer_req->pages;
 277        struct page *tmp;
 278        unsigned len;
 279
 280        desc.tfm = tfm;
 281        desc.flags = 0;
 282
 283        sg_init_table(&sg, 1);
 284        crypto_hash_init(&desc);
 285
 286        while ((tmp = page_chain_next(page))) {
 287                /* all but the last page will be fully used */
 288                sg_set_page(&sg, page, PAGE_SIZE, 0);
 289                crypto_hash_update(&desc, &sg, sg.length);
 290                page = tmp;
 291        }
 292        /* and now the last, possibly only partially used page */
 293        len = peer_req->i.size & (PAGE_SIZE - 1);
 294        sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
 295        crypto_hash_update(&desc, &sg, sg.length);
 296        crypto_hash_final(&desc, digest);
 297}
 298
 299void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
 300{
 301        struct hash_desc desc;
 302        struct scatterlist sg;
 303        struct bio_vec bvec;
 304        struct bvec_iter iter;
 305
 306        desc.tfm = tfm;
 307        desc.flags = 0;
 308
 309        sg_init_table(&sg, 1);
 310        crypto_hash_init(&desc);
 311
 312        bio_for_each_segment(bvec, bio, iter) {
 313                sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
 314                crypto_hash_update(&desc, &sg, sg.length);
 315        }
 316        crypto_hash_final(&desc, digest);
 317}
 318
 319/* MAYBE merge common code with w_e_end_ov_req */
 320static int w_e_send_csum(struct drbd_work *w, int cancel)
 321{
 322        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
 323        struct drbd_peer_device *peer_device = peer_req->peer_device;
 324        struct drbd_device *device = peer_device->device;
 325        int digest_size;
 326        void *digest;
 327        int err = 0;
 328
 329        if (unlikely(cancel))
 330                goto out;
 331
 332        if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
 333                goto out;
 334
 335        digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
 336        digest = kmalloc(digest_size, GFP_NOIO);
 337        if (digest) {
 338                sector_t sector = peer_req->i.sector;
 339                unsigned int size = peer_req->i.size;
 340                drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
 341                /* Free peer_req and pages before send.
 342                 * In case we block on congestion, we could otherwise run into
 343                 * some distributed deadlock, if the other side blocks on
 344                 * congestion as well, because our receiver blocks in
 345                 * drbd_alloc_pages due to pp_in_use > max_buffers. */
 346                drbd_free_peer_req(device, peer_req);
 347                peer_req = NULL;
 348                inc_rs_pending(device);
 349                err = drbd_send_drequest_csum(peer_device, sector, size,
 350                                              digest, digest_size,
 351                                              P_CSUM_RS_REQUEST);
 352                kfree(digest);
 353        } else {
 354                drbd_err(device, "kmalloc() of digest failed.\n");
 355                err = -ENOMEM;
 356        }
 357
 358out:
 359        if (peer_req)
 360                drbd_free_peer_req(device, peer_req);
 361
 362        if (unlikely(err))
 363                drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
 364        return err;
 365}
 366
 367#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
 368
 369static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
 370{
 371        struct drbd_device *device = peer_device->device;
 372        struct drbd_peer_request *peer_req;
 373
 374        if (!get_ldev(device))
 375                return -EIO;
 376
 377        /* GFP_TRY, because if there is no memory available right now, this may
 378         * be rescheduled for later. It is "only" background resync, after all. */
 379        peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
 380                                       size, true /* has real payload */, GFP_TRY);
 381        if (!peer_req)
 382                goto defer;
 383
 384        peer_req->w.cb = w_e_send_csum;
 385        spin_lock_irq(&device->resource->req_lock);
 386        list_add_tail(&peer_req->w.list, &device->read_ee);
 387        spin_unlock_irq(&device->resource->req_lock);
 388
 389        atomic_add(size >> 9, &device->rs_sect_ev);
 390        if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
 391                return 0;
 392
 393        /* If it failed because of ENOMEM, retry should help.  If it failed
 394         * because bio_add_page failed (probably broken lower level driver),
 395         * retry may or may not help.
 396         * If it does not, you may need to force disconnect. */
 397        spin_lock_irq(&device->resource->req_lock);
 398        list_del(&peer_req->w.list);
 399        spin_unlock_irq(&device->resource->req_lock);
 400
 401        drbd_free_peer_req(device, peer_req);
 402defer:
 403        put_ldev(device);
 404        return -EAGAIN;
 405}
 406
 407int w_resync_timer(struct drbd_work *w, int cancel)
 408{
 409        struct drbd_device *device =
 410                container_of(w, struct drbd_device, resync_work);
 411
 412        switch (device->state.conn) {
 413        case C_VERIFY_S:
 414                make_ov_request(device, cancel);
 415                break;
 416        case C_SYNC_TARGET:
 417                make_resync_request(device, cancel);
 418                break;
 419        }
 420
 421        return 0;
 422}
 423
 424void resync_timer_fn(unsigned long data)
 425{
 426        struct drbd_device *device = (struct drbd_device *) data;
 427
 428        drbd_queue_work_if_unqueued(
 429                &first_peer_device(device)->connection->sender_work,
 430                &device->resync_work);
 431}
 432
 433static void fifo_set(struct fifo_buffer *fb, int value)
 434{
 435        int i;
 436
 437        for (i = 0; i < fb->size; i++)
 438                fb->values[i] = value;
 439}
 440
 441static int fifo_push(struct fifo_buffer *fb, int value)
 442{
 443        int ov;
 444
 445        ov = fb->values[fb->head_index];
 446        fb->values[fb->head_index++] = value;
 447
 448        if (fb->head_index >= fb->size)
 449                fb->head_index = 0;
 450
 451        return ov;
 452}
 453
 454static void fifo_add_val(struct fifo_buffer *fb, int value)
 455{
 456        int i;
 457
 458        for (i = 0; i < fb->size; i++)
 459                fb->values[i] += value;
 460}
 461
 462struct fifo_buffer *fifo_alloc(int fifo_size)
 463{
 464        struct fifo_buffer *fb;
 465
 466        fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
 467        if (!fb)
 468                return NULL;
 469
 470        fb->head_index = 0;
 471        fb->size = fifo_size;
 472        fb->total = 0;
 473
 474        return fb;
 475}
 476
 477static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
 478{
 479        struct disk_conf *dc;
 480        unsigned int want;     /* The number of sectors we want in-flight */
 481        int req_sect; /* Number of sectors to request in this turn */
 482        int correction; /* Number of sectors more we need in-flight */
 483        int cps; /* correction per invocation of drbd_rs_controller() */
 484        int steps; /* Number of time steps to plan ahead */
 485        int curr_corr;
 486        int max_sect;
 487        struct fifo_buffer *plan;
 488
 489        dc = rcu_dereference(device->ldev->disk_conf);
 490        plan = rcu_dereference(device->rs_plan_s);
 491
 492        steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
 493
 494        if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
 495                want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
 496        } else { /* normal path */
 497                want = dc->c_fill_target ? dc->c_fill_target :
 498                        sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
 499        }
 500
 501        correction = want - device->rs_in_flight - plan->total;
 502
 503        /* Plan ahead */
 504        cps = correction / steps;
 505        fifo_add_val(plan, cps);
 506        plan->total += cps * steps;
 507
 508        /* What we do in this step */
 509        curr_corr = fifo_push(plan, 0);
 510        plan->total -= curr_corr;
 511
 512        req_sect = sect_in + curr_corr;
 513        if (req_sect < 0)
 514                req_sect = 0;
 515
 516        max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
 517        if (req_sect > max_sect)
 518                req_sect = max_sect;
 519
 520        /*
 521        drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
 522                 sect_in, device->rs_in_flight, want, correction,
 523                 steps, cps, device->rs_planed, curr_corr, req_sect);
 524        */
 525
 526        return req_sect;
 527}
 528
 529static int drbd_rs_number_requests(struct drbd_device *device)
 530{
 531        unsigned int sect_in;  /* Number of sectors that came in since the last turn */
 532        int number, mxb;
 533
 534        sect_in = atomic_xchg(&device->rs_sect_in, 0);
 535        device->rs_in_flight -= sect_in;
 536
 537        rcu_read_lock();
 538        mxb = drbd_get_max_buffers(device) / 2;
 539        if (rcu_dereference(device->rs_plan_s)->size) {
 540                number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
 541                device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
 542        } else {
 543                device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
 544                number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
 545        }
 546        rcu_read_unlock();
 547
 548        /* Don't have more than "max-buffers"/2 in-flight.
 549         * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
 550         * potentially causing a distributed deadlock on congestion during
 551         * online-verify or (checksum-based) resync, if max-buffers,
 552         * socket buffer sizes and resync rate settings are mis-configured. */
 553
 554        /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
 555         * mxb (as used here, and in drbd_alloc_pages on the peer) is
 556         * "number of pages" (typically also 4k),
 557         * but "rs_in_flight" is in "sectors" (512 Byte). */
 558        if (mxb - device->rs_in_flight/8 < number)
 559                number = mxb - device->rs_in_flight/8;
 560
 561        return number;
 562}
 563
 564static int make_resync_request(struct drbd_device *const device, int cancel)
 565{
 566        struct drbd_peer_device *const peer_device = first_peer_device(device);
 567        struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
 568        unsigned long bit;
 569        sector_t sector;
 570        const sector_t capacity = drbd_get_capacity(device->this_bdev);
 571        int max_bio_size;
 572        int number, rollback_i, size;
 573        int align, requeue = 0;
 574        int i = 0;
 575
 576        if (unlikely(cancel))
 577                return 0;
 578
 579        if (device->rs_total == 0) {
 580                /* empty resync? */
 581                drbd_resync_finished(device);
 582                return 0;
 583        }
 584
 585        if (!get_ldev(device)) {
 586                /* Since we only need to access device->rsync a
 587                   get_ldev_if_state(device,D_FAILED) would be sufficient, but
 588                   to continue resync with a broken disk makes no sense at
 589                   all */
 590                drbd_err(device, "Disk broke down during resync!\n");
 591                return 0;
 592        }
 593
 594        max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
 595        number = drbd_rs_number_requests(device);
 596        if (number <= 0)
 597                goto requeue;
 598
 599        for (i = 0; i < number; i++) {
 600                /* Stop generating RS requests when half of the send buffer is filled,
 601                 * but notify TCP that we'd like to have more space. */
 602                mutex_lock(&connection->data.mutex);
 603                if (connection->data.socket) {
 604                        struct sock *sk = connection->data.socket->sk;
 605                        int queued = sk->sk_wmem_queued;
 606                        int sndbuf = sk->sk_sndbuf;
 607                        if (queued > sndbuf / 2) {
 608                                requeue = 1;
 609                                if (sk->sk_socket)
 610                                        set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 611                        }
 612                } else
 613                        requeue = 1;
 614                mutex_unlock(&connection->data.mutex);
 615                if (requeue)
 616                        goto requeue;
 617
 618next_sector:
 619                size = BM_BLOCK_SIZE;
 620                bit  = drbd_bm_find_next(device, device->bm_resync_fo);
 621
 622                if (bit == DRBD_END_OF_BITMAP) {
 623                        device->bm_resync_fo = drbd_bm_bits(device);
 624                        put_ldev(device);
 625                        return 0;
 626                }
 627
 628                sector = BM_BIT_TO_SECT(bit);
 629
 630                if (drbd_try_rs_begin_io(device, sector)) {
 631                        device->bm_resync_fo = bit;
 632                        goto requeue;
 633                }
 634                device->bm_resync_fo = bit + 1;
 635
 636                if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
 637                        drbd_rs_complete_io(device, sector);
 638                        goto next_sector;
 639                }
 640
 641#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
 642                /* try to find some adjacent bits.
 643                 * we stop if we have already the maximum req size.
 644                 *
 645                 * Additionally always align bigger requests, in order to
 646                 * be prepared for all stripe sizes of software RAIDs.
 647                 */
 648                align = 1;
 649                rollback_i = i;
 650                while (i < number) {
 651                        if (size + BM_BLOCK_SIZE > max_bio_size)
 652                                break;
 653
 654                        /* Be always aligned */
 655                        if (sector & ((1<<(align+3))-1))
 656                                break;
 657
 658                        /* do not cross extent boundaries */
 659                        if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
 660                                break;
 661                        /* now, is it actually dirty, after all?
 662                         * caution, drbd_bm_test_bit is tri-state for some
 663                         * obscure reason; ( b == 0 ) would get the out-of-band
 664                         * only accidentally right because of the "oddly sized"
 665                         * adjustment below */
 666                        if (drbd_bm_test_bit(device, bit+1) != 1)
 667                                break;
 668                        bit++;
 669                        size += BM_BLOCK_SIZE;
 670                        if ((BM_BLOCK_SIZE << align) <= size)
 671                                align++;
 672                        i++;
 673                }
 674                /* if we merged some,
 675                 * reset the offset to start the next drbd_bm_find_next from */
 676                if (size > BM_BLOCK_SIZE)
 677                        device->bm_resync_fo = bit + 1;
 678#endif
 679
 680                /* adjust very last sectors, in case we are oddly sized */
 681                if (sector + (size>>9) > capacity)
 682                        size = (capacity-sector)<<9;
 683
 684                if (device->use_csums) {
 685                        switch (read_for_csum(peer_device, sector, size)) {
 686                        case -EIO: /* Disk failure */
 687                                put_ldev(device);
 688                                return -EIO;
 689                        case -EAGAIN: /* allocation failed, or ldev busy */
 690                                drbd_rs_complete_io(device, sector);
 691                                device->bm_resync_fo = BM_SECT_TO_BIT(sector);
 692                                i = rollback_i;
 693                                goto requeue;
 694                        case 0:
 695                                /* everything ok */
 696                                break;
 697                        default:
 698                                BUG();
 699                        }
 700                } else {
 701                        int err;
 702
 703                        inc_rs_pending(device);
 704                        err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
 705                                                 sector, size, ID_SYNCER);
 706                        if (err) {
 707                                drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
 708                                dec_rs_pending(device);
 709                                put_ldev(device);
 710                                return err;
 711                        }
 712                }
 713        }
 714
 715        if (device->bm_resync_fo >= drbd_bm_bits(device)) {
 716                /* last syncer _request_ was sent,
 717                 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
 718                 * next sync group will resume), as soon as we receive the last
 719                 * resync data block, and the last bit is cleared.
 720                 * until then resync "work" is "inactive" ...
 721                 */
 722                put_ldev(device);
 723                return 0;
 724        }
 725
 726 requeue:
 727        device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 728        mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 729        put_ldev(device);
 730        return 0;
 731}
 732
 733static int make_ov_request(struct drbd_device *device, int cancel)
 734{
 735        int number, i, size;
 736        sector_t sector;
 737        const sector_t capacity = drbd_get_capacity(device->this_bdev);
 738        bool stop_sector_reached = false;
 739
 740        if (unlikely(cancel))
 741                return 1;
 742
 743        number = drbd_rs_number_requests(device);
 744
 745        sector = device->ov_position;
 746        for (i = 0; i < number; i++) {
 747                if (sector >= capacity)
 748                        return 1;
 749
 750                /* We check for "finished" only in the reply path:
 751                 * w_e_end_ov_reply().
 752                 * We need to send at least one request out. */
 753                stop_sector_reached = i > 0
 754                        && verify_can_do_stop_sector(device)
 755                        && sector >= device->ov_stop_sector;
 756                if (stop_sector_reached)
 757                        break;
 758
 759                size = BM_BLOCK_SIZE;
 760
 761                if (drbd_try_rs_begin_io(device, sector)) {
 762                        device->ov_position = sector;
 763                        goto requeue;
 764                }
 765
 766                if (sector + (size>>9) > capacity)
 767                        size = (capacity-sector)<<9;
 768
 769                inc_rs_pending(device);
 770                if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
 771                        dec_rs_pending(device);
 772                        return 0;
 773                }
 774                sector += BM_SECT_PER_BIT;
 775        }
 776        device->ov_position = sector;
 777
 778 requeue:
 779        device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 780        if (i == 0 || !stop_sector_reached)
 781                mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 782        return 1;
 783}
 784
 785int w_ov_finished(struct drbd_work *w, int cancel)
 786{
 787        struct drbd_device_work *dw =
 788                container_of(w, struct drbd_device_work, w);
 789        struct drbd_device *device = dw->device;
 790        kfree(dw);
 791        ov_out_of_sync_print(device);
 792        drbd_resync_finished(device);
 793
 794        return 0;
 795}
 796
 797static int w_resync_finished(struct drbd_work *w, int cancel)
 798{
 799        struct drbd_device_work *dw =
 800                container_of(w, struct drbd_device_work, w);
 801        struct drbd_device *device = dw->device;
 802        kfree(dw);
 803
 804        drbd_resync_finished(device);
 805
 806        return 0;
 807}
 808
 809static void ping_peer(struct drbd_device *device)
 810{
 811        struct drbd_connection *connection = first_peer_device(device)->connection;
 812
 813        clear_bit(GOT_PING_ACK, &connection->flags);
 814        request_ping(connection);
 815        wait_event(connection->ping_wait,
 816                   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
 817}
 818
 819int drbd_resync_finished(struct drbd_device *device)
 820{
 821        unsigned long db, dt, dbdt;
 822        unsigned long n_oos;
 823        union drbd_state os, ns;
 824        struct drbd_device_work *dw;
 825        char *khelper_cmd = NULL;
 826        int verify_done = 0;
 827
 828        /* Remove all elements from the resync LRU. Since future actions
 829         * might set bits in the (main) bitmap, then the entries in the
 830         * resync LRU would be wrong. */
 831        if (drbd_rs_del_all(device)) {
 832                /* In case this is not possible now, most probably because
 833                 * there are P_RS_DATA_REPLY Packets lingering on the worker's
 834                 * queue (or even the read operations for those packets
 835                 * is not finished by now).   Retry in 100ms. */
 836
 837                schedule_timeout_interruptible(HZ / 10);
 838                dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
 839                if (dw) {
 840                        dw->w.cb = w_resync_finished;
 841                        dw->device = device;
 842                        drbd_queue_work(&first_peer_device(device)->connection->sender_work,
 843                                        &dw->w);
 844                        return 1;
 845                }
 846                drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
 847        }
 848
 849        dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
 850        if (dt <= 0)
 851                dt = 1;
 852
 853        db = device->rs_total;
 854        /* adjust for verify start and stop sectors, respective reached position */
 855        if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
 856                db -= device->ov_left;
 857
 858        dbdt = Bit2KB(db/dt);
 859        device->rs_paused /= HZ;
 860
 861        if (!get_ldev(device))
 862                goto out;
 863
 864        ping_peer(device);
 865
 866        spin_lock_irq(&device->resource->req_lock);
 867        os = drbd_read_state(device);
 868
 869        verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
 870
 871        /* This protects us against multiple calls (that can happen in the presence
 872           of application IO), and against connectivity loss just before we arrive here. */
 873        if (os.conn <= C_CONNECTED)
 874                goto out_unlock;
 875
 876        ns = os;
 877        ns.conn = C_CONNECTED;
 878
 879        drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
 880             verify_done ? "Online verify" : "Resync",
 881             dt + device->rs_paused, device->rs_paused, dbdt);
 882
 883        n_oos = drbd_bm_total_weight(device);
 884
 885        if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
 886                if (n_oos) {
 887                        drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
 888                              n_oos, Bit2KB(1));
 889                        khelper_cmd = "out-of-sync";
 890                }
 891        } else {
 892                D_ASSERT(device, (n_oos - device->rs_failed) == 0);
 893
 894                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
 895                        khelper_cmd = "after-resync-target";
 896
 897                if (device->use_csums && device->rs_total) {
 898                        const unsigned long s = device->rs_same_csum;
 899                        const unsigned long t = device->rs_total;
 900                        const int ratio =
 901                                (t == 0)     ? 0 :
 902                        (t < 100000) ? ((s*100)/t) : (s/(t/100));
 903                        drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
 904                             "transferred %luK total %luK\n",
 905                             ratio,
 906                             Bit2KB(device->rs_same_csum),
 907                             Bit2KB(device->rs_total - device->rs_same_csum),
 908                             Bit2KB(device->rs_total));
 909                }
 910        }
 911
 912        if (device->rs_failed) {
 913                drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
 914
 915                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 916                        ns.disk = D_INCONSISTENT;
 917                        ns.pdsk = D_UP_TO_DATE;
 918                } else {
 919                        ns.disk = D_UP_TO_DATE;
 920                        ns.pdsk = D_INCONSISTENT;
 921                }
 922        } else {
 923                ns.disk = D_UP_TO_DATE;
 924                ns.pdsk = D_UP_TO_DATE;
 925
 926                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 927                        if (device->p_uuid) {
 928                                int i;
 929                                for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
 930                                        _drbd_uuid_set(device, i, device->p_uuid[i]);
 931                                drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
 932                                _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
 933                        } else {
 934                                drbd_err(device, "device->p_uuid is NULL! BUG\n");
 935                        }
 936                }
 937
 938                if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
 939                        /* for verify runs, we don't update uuids here,
 940                         * so there would be nothing to report. */
 941                        drbd_uuid_set_bm(device, 0UL);
 942                        drbd_print_uuids(device, "updated UUIDs");
 943                        if (device->p_uuid) {
 944                                /* Now the two UUID sets are equal, update what we
 945                                 * know of the peer. */
 946                                int i;
 947                                for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
 948                                        device->p_uuid[i] = device->ldev->md.uuid[i];
 949                        }
 950                }
 951        }
 952
 953        _drbd_set_state(device, ns, CS_VERBOSE, NULL);
 954out_unlock:
 955        spin_unlock_irq(&device->resource->req_lock);
 956        put_ldev(device);
 957out:
 958        device->rs_total  = 0;
 959        device->rs_failed = 0;
 960        device->rs_paused = 0;
 961
 962        /* reset start sector, if we reached end of device */
 963        if (verify_done && device->ov_left == 0)
 964                device->ov_start_sector = 0;
 965
 966        drbd_md_sync(device);
 967
 968        if (khelper_cmd)
 969                drbd_khelper(device, khelper_cmd);
 970
 971        return 1;
 972}
 973
 974/* helper */
 975static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
 976{
 977        if (drbd_peer_req_has_active_page(peer_req)) {
 978                /* This might happen if sendpage() has not finished */
 979                int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
 980                atomic_add(i, &device->pp_in_use_by_net);
 981                atomic_sub(i, &device->pp_in_use);
 982                spin_lock_irq(&device->resource->req_lock);
 983                list_add_tail(&peer_req->w.list, &device->net_ee);
 984                spin_unlock_irq(&device->resource->req_lock);
 985                wake_up(&drbd_pp_wait);
 986        } else
 987                drbd_free_peer_req(device, peer_req);
 988}
 989
 990/**
 991 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
 992 * @device:     DRBD device.
 993 * @w:          work object.
 994 * @cancel:     The connection will be closed anyways
 995 */
 996int w_e_end_data_req(struct drbd_work *w, int cancel)
 997{
 998        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
 999        struct drbd_peer_device *peer_device = peer_req->peer_device;
1000        struct drbd_device *device = peer_device->device;
1001        int err;
1002
1003        if (unlikely(cancel)) {
1004                drbd_free_peer_req(device, peer_req);
1005                dec_unacked(device);
1006                return 0;
1007        }
1008
1009        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1010                err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1011        } else {
1012                if (__ratelimit(&drbd_ratelimit_state))
1013                        drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1014                            (unsigned long long)peer_req->i.sector);
1015
1016                err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1017        }
1018
1019        dec_unacked(device);
1020
1021        move_to_net_ee_or_free(device, peer_req);
1022
1023        if (unlikely(err))
1024                drbd_err(device, "drbd_send_block() failed\n");
1025        return err;
1026}
1027
1028/**
1029 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1030 * @w:          work object.
1031 * @cancel:     The connection will be closed anyways
1032 */
1033int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1034{
1035        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1036        struct drbd_peer_device *peer_device = peer_req->peer_device;
1037        struct drbd_device *device = peer_device->device;
1038        int err;
1039
1040        if (unlikely(cancel)) {
1041                drbd_free_peer_req(device, peer_req);
1042                dec_unacked(device);
1043                return 0;
1044        }
1045
1046        if (get_ldev_if_state(device, D_FAILED)) {
1047                drbd_rs_complete_io(device, peer_req->i.sector);
1048                put_ldev(device);
1049        }
1050
1051        if (device->state.conn == C_AHEAD) {
1052                err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1053        } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1054                if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1055                        inc_rs_pending(device);
1056                        err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1057                } else {
1058                        if (__ratelimit(&drbd_ratelimit_state))
1059                                drbd_err(device, "Not sending RSDataReply, "
1060                                    "partner DISKLESS!\n");
1061                        err = 0;
1062                }
1063        } else {
1064                if (__ratelimit(&drbd_ratelimit_state))
1065                        drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1066                            (unsigned long long)peer_req->i.sector);
1067
1068                err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1069
1070                /* update resync data with failure */
1071                drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1072        }
1073
1074        dec_unacked(device);
1075
1076        move_to_net_ee_or_free(device, peer_req);
1077
1078        if (unlikely(err))
1079                drbd_err(device, "drbd_send_block() failed\n");
1080        return err;
1081}
1082
1083int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1084{
1085        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1086        struct drbd_peer_device *peer_device = peer_req->peer_device;
1087        struct drbd_device *device = peer_device->device;
1088        struct digest_info *di;
1089        int digest_size;
1090        void *digest = NULL;
1091        int err, eq = 0;
1092
1093        if (unlikely(cancel)) {
1094                drbd_free_peer_req(device, peer_req);
1095                dec_unacked(device);
1096                return 0;
1097        }
1098
1099        if (get_ldev(device)) {
1100                drbd_rs_complete_io(device, peer_req->i.sector);
1101                put_ldev(device);
1102        }
1103
1104        di = peer_req->digest;
1105
1106        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1107                /* quick hack to try to avoid a race against reconfiguration.
1108                 * a real fix would be much more involved,
1109                 * introducing more locking mechanisms */
1110                if (peer_device->connection->csums_tfm) {
1111                        digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1112                        D_ASSERT(device, digest_size == di->digest_size);
1113                        digest = kmalloc(digest_size, GFP_NOIO);
1114                }
1115                if (digest) {
1116                        drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1117                        eq = !memcmp(digest, di->digest, digest_size);
1118                        kfree(digest);
1119                }
1120
1121                if (eq) {
1122                        drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1123                        /* rs_same_csums unit is BM_BLOCK_SIZE */
1124                        device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1125                        err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1126                } else {
1127                        inc_rs_pending(device);
1128                        peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1129                        peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1130                        kfree(di);
1131                        err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1132                }
1133        } else {
1134                err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1135                if (__ratelimit(&drbd_ratelimit_state))
1136                        drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1137        }
1138
1139        dec_unacked(device);
1140        move_to_net_ee_or_free(device, peer_req);
1141
1142        if (unlikely(err))
1143                drbd_err(device, "drbd_send_block/ack() failed\n");
1144        return err;
1145}
1146
1147int w_e_end_ov_req(struct drbd_work *w, int cancel)
1148{
1149        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1150        struct drbd_peer_device *peer_device = peer_req->peer_device;
1151        struct drbd_device *device = peer_device->device;
1152        sector_t sector = peer_req->i.sector;
1153        unsigned int size = peer_req->i.size;
1154        int digest_size;
1155        void *digest;
1156        int err = 0;
1157
1158        if (unlikely(cancel))
1159                goto out;
1160
1161        digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1162        digest = kmalloc(digest_size, GFP_NOIO);
1163        if (!digest) {
1164                err = 1;        /* terminate the connection in case the allocation failed */
1165                goto out;
1166        }
1167
1168        if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1169                drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1170        else
1171                memset(digest, 0, digest_size);
1172
1173        /* Free e and pages before send.
1174         * In case we block on congestion, we could otherwise run into
1175         * some distributed deadlock, if the other side blocks on
1176         * congestion as well, because our receiver blocks in
1177         * drbd_alloc_pages due to pp_in_use > max_buffers. */
1178        drbd_free_peer_req(device, peer_req);
1179        peer_req = NULL;
1180        inc_rs_pending(device);
1181        err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1182        if (err)
1183                dec_rs_pending(device);
1184        kfree(digest);
1185
1186out:
1187        if (peer_req)
1188                drbd_free_peer_req(device, peer_req);
1189        dec_unacked(device);
1190        return err;
1191}
1192
1193void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1194{
1195        if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1196                device->ov_last_oos_size += size>>9;
1197        } else {
1198                device->ov_last_oos_start = sector;
1199                device->ov_last_oos_size = size>>9;
1200        }
1201        drbd_set_out_of_sync(device, sector, size);
1202}
1203
1204int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1205{
1206        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1207        struct drbd_peer_device *peer_device = peer_req->peer_device;
1208        struct drbd_device *device = peer_device->device;
1209        struct digest_info *di;
1210        void *digest;
1211        sector_t sector = peer_req->i.sector;
1212        unsigned int size = peer_req->i.size;
1213        int digest_size;
1214        int err, eq = 0;
1215        bool stop_sector_reached = false;
1216
1217        if (unlikely(cancel)) {
1218                drbd_free_peer_req(device, peer_req);
1219                dec_unacked(device);
1220                return 0;
1221        }
1222
1223        /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1224         * the resync lru has been cleaned up already */
1225        if (get_ldev(device)) {
1226                drbd_rs_complete_io(device, peer_req->i.sector);
1227                put_ldev(device);
1228        }
1229
1230        di = peer_req->digest;
1231
1232        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1233                digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1234                digest = kmalloc(digest_size, GFP_NOIO);
1235                if (digest) {
1236                        drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1237
1238                        D_ASSERT(device, digest_size == di->digest_size);
1239                        eq = !memcmp(digest, di->digest, digest_size);
1240                        kfree(digest);
1241                }
1242        }
1243
1244        /* Free peer_req and pages before send.
1245         * In case we block on congestion, we could otherwise run into
1246         * some distributed deadlock, if the other side blocks on
1247         * congestion as well, because our receiver blocks in
1248         * drbd_alloc_pages due to pp_in_use > max_buffers. */
1249        drbd_free_peer_req(device, peer_req);
1250        if (!eq)
1251                drbd_ov_out_of_sync_found(device, sector, size);
1252        else
1253                ov_out_of_sync_print(device);
1254
1255        err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1256                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1257
1258        dec_unacked(device);
1259
1260        --device->ov_left;
1261
1262        /* let's advance progress step marks only for every other megabyte */
1263        if ((device->ov_left & 0x200) == 0x200)
1264                drbd_advance_rs_marks(device, device->ov_left);
1265
1266        stop_sector_reached = verify_can_do_stop_sector(device) &&
1267                (sector + (size>>9)) >= device->ov_stop_sector;
1268
1269        if (device->ov_left == 0 || stop_sector_reached) {
1270                ov_out_of_sync_print(device);
1271                drbd_resync_finished(device);
1272        }
1273
1274        return err;
1275}
1276
1277/* FIXME
1278 * We need to track the number of pending barrier acks,
1279 * and to be able to wait for them.
1280 * See also comment in drbd_adm_attach before drbd_suspend_io.
1281 */
1282static int drbd_send_barrier(struct drbd_connection *connection)
1283{
1284        struct p_barrier *p;
1285        struct drbd_socket *sock;
1286
1287        sock = &connection->data;
1288        p = conn_prepare_command(connection, sock);
1289        if (!p)
1290                return -EIO;
1291        p->barrier = connection->send.current_epoch_nr;
1292        p->pad = 0;
1293        connection->send.current_epoch_writes = 0;
1294
1295        return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1296}
1297
1298int w_send_write_hint(struct drbd_work *w, int cancel)
1299{
1300        struct drbd_device *device =
1301                container_of(w, struct drbd_device, unplug_work);
1302        struct drbd_socket *sock;
1303
1304        if (cancel)
1305                return 0;
1306        sock = &first_peer_device(device)->connection->data;
1307        if (!drbd_prepare_command(first_peer_device(device), sock))
1308                return -EIO;
1309        return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1310}
1311
1312static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1313{
1314        if (!connection->send.seen_any_write_yet) {
1315                connection->send.seen_any_write_yet = true;
1316                connection->send.current_epoch_nr = epoch;
1317                connection->send.current_epoch_writes = 0;
1318        }
1319}
1320
1321static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1322{
1323        /* re-init if first write on this connection */
1324        if (!connection->send.seen_any_write_yet)
1325                return;
1326        if (connection->send.current_epoch_nr != epoch) {
1327                if (connection->send.current_epoch_writes)
1328                        drbd_send_barrier(connection);
1329                connection->send.current_epoch_nr = epoch;
1330        }
1331}
1332
1333int w_send_out_of_sync(struct drbd_work *w, int cancel)
1334{
1335        struct drbd_request *req = container_of(w, struct drbd_request, w);
1336        struct drbd_device *device = req->device;
1337        struct drbd_peer_device *const peer_device = first_peer_device(device);
1338        struct drbd_connection *const connection = peer_device->connection;
1339        int err;
1340
1341        if (unlikely(cancel)) {
1342                req_mod(req, SEND_CANCELED);
1343                return 0;
1344        }
1345        req->pre_send_jif = jiffies;
1346
1347        /* this time, no connection->send.current_epoch_writes++;
1348         * If it was sent, it was the closing barrier for the last
1349         * replicated epoch, before we went into AHEAD mode.
1350         * No more barriers will be sent, until we leave AHEAD mode again. */
1351        maybe_send_barrier(connection, req->epoch);
1352
1353        err = drbd_send_out_of_sync(peer_device, req);
1354        req_mod(req, OOS_HANDED_TO_NETWORK);
1355
1356        return err;
1357}
1358
1359/**
1360 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1361 * @w:          work object.
1362 * @cancel:     The connection will be closed anyways
1363 */
1364int w_send_dblock(struct drbd_work *w, int cancel)
1365{
1366        struct drbd_request *req = container_of(w, struct drbd_request, w);
1367        struct drbd_device *device = req->device;
1368        struct drbd_peer_device *const peer_device = first_peer_device(device);
1369        struct drbd_connection *connection = peer_device->connection;
1370        int err;
1371
1372        if (unlikely(cancel)) {
1373                req_mod(req, SEND_CANCELED);
1374                return 0;
1375        }
1376        req->pre_send_jif = jiffies;
1377
1378        re_init_if_first_write(connection, req->epoch);
1379        maybe_send_barrier(connection, req->epoch);
1380        connection->send.current_epoch_writes++;
1381
1382        err = drbd_send_dblock(peer_device, req);
1383        req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1384
1385        return err;
1386}
1387
1388/**
1389 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1390 * @w:          work object.
1391 * @cancel:     The connection will be closed anyways
1392 */
1393int w_send_read_req(struct drbd_work *w, int cancel)
1394{
1395        struct drbd_request *req = container_of(w, struct drbd_request, w);
1396        struct drbd_device *device = req->device;
1397        struct drbd_peer_device *const peer_device = first_peer_device(device);
1398        struct drbd_connection *connection = peer_device->connection;
1399        int err;
1400
1401        if (unlikely(cancel)) {
1402                req_mod(req, SEND_CANCELED);
1403                return 0;
1404        }
1405        req->pre_send_jif = jiffies;
1406
1407        /* Even read requests may close a write epoch,
1408         * if there was any yet. */
1409        maybe_send_barrier(connection, req->epoch);
1410
1411        err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1412                                 (unsigned long)req);
1413
1414        req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1415
1416        return err;
1417}
1418
1419int w_restart_disk_io(struct drbd_work *w, int cancel)
1420{
1421        struct drbd_request *req = container_of(w, struct drbd_request, w);
1422        struct drbd_device *device = req->device;
1423
1424        if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1425                drbd_al_begin_io(device, &req->i);
1426
1427        drbd_req_make_private_bio(req, req->master_bio);
1428        req->private_bio->bi_bdev = device->ldev->backing_bdev;
1429        generic_make_request(req->private_bio);
1430
1431        return 0;
1432}
1433
1434static int _drbd_may_sync_now(struct drbd_device *device)
1435{
1436        struct drbd_device *odev = device;
1437        int resync_after;
1438
1439        while (1) {
1440                if (!odev->ldev || odev->state.disk == D_DISKLESS)
1441                        return 1;
1442                rcu_read_lock();
1443                resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1444                rcu_read_unlock();
1445                if (resync_after == -1)
1446                        return 1;
1447                odev = minor_to_device(resync_after);
1448                if (!odev)
1449                        return 1;
1450                if ((odev->state.conn >= C_SYNC_SOURCE &&
1451                     odev->state.conn <= C_PAUSED_SYNC_T) ||
1452                    odev->state.aftr_isp || odev->state.peer_isp ||
1453                    odev->state.user_isp)
1454                        return 0;
1455        }
1456}
1457
1458/**
1459 * _drbd_pause_after() - Pause resync on all devices that may not resync now
1460 * @device:     DRBD device.
1461 *
1462 * Called from process context only (admin command and after_state_ch).
1463 */
1464static int _drbd_pause_after(struct drbd_device *device)
1465{
1466        struct drbd_device *odev;
1467        int i, rv = 0;
1468
1469        rcu_read_lock();
1470        idr_for_each_entry(&drbd_devices, odev, i) {
1471                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1472                        continue;
1473                if (!_drbd_may_sync_now(odev))
1474                        rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL)
1475                               != SS_NOTHING_TO_DO);
1476        }
1477        rcu_read_unlock();
1478
1479        return rv;
1480}
1481
1482/**
1483 * _drbd_resume_next() - Resume resync on all devices that may resync now
1484 * @device:     DRBD device.
1485 *
1486 * Called from process context only (admin command and worker).
1487 */
1488static int _drbd_resume_next(struct drbd_device *device)
1489{
1490        struct drbd_device *odev;
1491        int i, rv = 0;
1492
1493        rcu_read_lock();
1494        idr_for_each_entry(&drbd_devices, odev, i) {
1495                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1496                        continue;
1497                if (odev->state.aftr_isp) {
1498                        if (_drbd_may_sync_now(odev))
1499                                rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0),
1500                                                        CS_HARD, NULL)
1501                                       != SS_NOTHING_TO_DO) ;
1502                }
1503        }
1504        rcu_read_unlock();
1505        return rv;
1506}
1507
1508void resume_next_sg(struct drbd_device *device)
1509{
1510        write_lock_irq(&global_state_lock);
1511        _drbd_resume_next(device);
1512        write_unlock_irq(&global_state_lock);
1513}
1514
1515void suspend_other_sg(struct drbd_device *device)
1516{
1517        write_lock_irq(&global_state_lock);
1518        _drbd_pause_after(device);
1519        write_unlock_irq(&global_state_lock);
1520}
1521
1522/* caller must hold global_state_lock */
1523enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1524{
1525        struct drbd_device *odev;
1526        int resync_after;
1527
1528        if (o_minor == -1)
1529                return NO_ERROR;
1530        if (o_minor < -1 || o_minor > MINORMASK)
1531                return ERR_RESYNC_AFTER;
1532
1533        /* check for loops */
1534        odev = minor_to_device(o_minor);
1535        while (1) {
1536                if (odev == device)
1537                        return ERR_RESYNC_AFTER_CYCLE;
1538
1539                /* You are free to depend on diskless, non-existing,
1540                 * or not yet/no longer existing minors.
1541                 * We only reject dependency loops.
1542                 * We cannot follow the dependency chain beyond a detached or
1543                 * missing minor.
1544                 */
1545                if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1546                        return NO_ERROR;
1547
1548                rcu_read_lock();
1549                resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1550                rcu_read_unlock();
1551                /* dependency chain ends here, no cycles. */
1552                if (resync_after == -1)
1553                        return NO_ERROR;
1554
1555                /* follow the dependency chain */
1556                odev = minor_to_device(resync_after);
1557        }
1558}
1559
1560/* caller must hold global_state_lock */
1561void drbd_resync_after_changed(struct drbd_device *device)
1562{
1563        int changes;
1564
1565        do {
1566                changes  = _drbd_pause_after(device);
1567                changes |= _drbd_resume_next(device);
1568        } while (changes);
1569}
1570
1571void drbd_rs_controller_reset(struct drbd_device *device)
1572{
1573        struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1574        struct fifo_buffer *plan;
1575
1576        atomic_set(&device->rs_sect_in, 0);
1577        atomic_set(&device->rs_sect_ev, 0);
1578        device->rs_in_flight = 0;
1579        device->rs_last_events =
1580                (int)part_stat_read(&disk->part0, sectors[0]) +
1581                (int)part_stat_read(&disk->part0, sectors[1]);
1582
1583        /* Updating the RCU protected object in place is necessary since
1584           this function gets called from atomic context.
1585           It is valid since all other updates also lead to an completely
1586           empty fifo */
1587        rcu_read_lock();
1588        plan = rcu_dereference(device->rs_plan_s);
1589        plan->total = 0;
1590        fifo_set(plan, 0);
1591        rcu_read_unlock();
1592}
1593
1594void start_resync_timer_fn(unsigned long data)
1595{
1596        struct drbd_device *device = (struct drbd_device *) data;
1597        drbd_device_post_work(device, RS_START);
1598}
1599
1600static void do_start_resync(struct drbd_device *device)
1601{
1602        if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1603                drbd_warn(device, "postponing start_resync ...\n");
1604                device->start_resync_timer.expires = jiffies + HZ/10;
1605                add_timer(&device->start_resync_timer);
1606                return;
1607        }
1608
1609        drbd_start_resync(device, C_SYNC_SOURCE);
1610        clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1611}
1612
1613static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1614{
1615        bool csums_after_crash_only;
1616        rcu_read_lock();
1617        csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1618        rcu_read_unlock();
1619        return connection->agreed_pro_version >= 89 &&          /* supported? */
1620                connection->csums_tfm &&                        /* configured? */
1621                (csums_after_crash_only == 0                    /* use for each resync? */
1622                 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1623}
1624
1625/**
1626 * drbd_start_resync() - Start the resync process
1627 * @device:     DRBD device.
1628 * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1629 *
1630 * This function might bring you directly into one of the
1631 * C_PAUSED_SYNC_* states.
1632 */
1633void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1634{
1635        struct drbd_peer_device *peer_device = first_peer_device(device);
1636        struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1637        union drbd_state ns;
1638        int r;
1639
1640        if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1641                drbd_err(device, "Resync already running!\n");
1642                return;
1643        }
1644
1645        if (!test_bit(B_RS_H_DONE, &device->flags)) {
1646                if (side == C_SYNC_TARGET) {
1647                        /* Since application IO was locked out during C_WF_BITMAP_T and
1648                           C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1649                           we check that we might make the data inconsistent. */
1650                        r = drbd_khelper(device, "before-resync-target");
1651                        r = (r >> 8) & 0xff;
1652                        if (r > 0) {
1653                                drbd_info(device, "before-resync-target handler returned %d, "
1654                                         "dropping connection.\n", r);
1655                                conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1656                                return;
1657                        }
1658                } else /* C_SYNC_SOURCE */ {
1659                        r = drbd_khelper(device, "before-resync-source");
1660                        r = (r >> 8) & 0xff;
1661                        if (r > 0) {
1662                                if (r == 3) {
1663                                        drbd_info(device, "before-resync-source handler returned %d, "
1664                                                 "ignoring. Old userland tools?", r);
1665                                } else {
1666                                        drbd_info(device, "before-resync-source handler returned %d, "
1667                                                 "dropping connection.\n", r);
1668                                        conn_request_state(connection,
1669                                                           NS(conn, C_DISCONNECTING), CS_HARD);
1670                                        return;
1671                                }
1672                        }
1673                }
1674        }
1675
1676        if (current == connection->worker.task) {
1677                /* The worker should not sleep waiting for state_mutex,
1678                   that can take long */
1679                if (!mutex_trylock(device->state_mutex)) {
1680                        set_bit(B_RS_H_DONE, &device->flags);
1681                        device->start_resync_timer.expires = jiffies + HZ/5;
1682                        add_timer(&device->start_resync_timer);
1683                        return;
1684                }
1685        } else {
1686                mutex_lock(device->state_mutex);
1687        }
1688        clear_bit(B_RS_H_DONE, &device->flags);
1689
1690        /* req_lock: serialize with drbd_send_and_submit() and others
1691         * global_state_lock: for stable sync-after dependencies */
1692        spin_lock_irq(&device->resource->req_lock);
1693        write_lock(&global_state_lock);
1694        /* Did some connection breakage or IO error race with us? */
1695        if (device->state.conn < C_CONNECTED
1696        || !get_ldev_if_state(device, D_NEGOTIATING)) {
1697                write_unlock(&global_state_lock);
1698                spin_unlock_irq(&device->resource->req_lock);
1699                mutex_unlock(device->state_mutex);
1700                return;
1701        }
1702
1703        ns = drbd_read_state(device);
1704
1705        ns.aftr_isp = !_drbd_may_sync_now(device);
1706
1707        ns.conn = side;
1708
1709        if (side == C_SYNC_TARGET)
1710                ns.disk = D_INCONSISTENT;
1711        else /* side == C_SYNC_SOURCE */
1712                ns.pdsk = D_INCONSISTENT;
1713
1714        r = __drbd_set_state(device, ns, CS_VERBOSE, NULL);
1715        ns = drbd_read_state(device);
1716
1717        if (ns.conn < C_CONNECTED)
1718                r = SS_UNKNOWN_ERROR;
1719
1720        if (r == SS_SUCCESS) {
1721                unsigned long tw = drbd_bm_total_weight(device);
1722                unsigned long now = jiffies;
1723                int i;
1724
1725                device->rs_failed    = 0;
1726                device->rs_paused    = 0;
1727                device->rs_same_csum = 0;
1728                device->rs_last_sect_ev = 0;
1729                device->rs_total     = tw;
1730                device->rs_start     = now;
1731                for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1732                        device->rs_mark_left[i] = tw;
1733                        device->rs_mark_time[i] = now;
1734                }
1735                _drbd_pause_after(device);
1736                /* Forget potentially stale cached per resync extent bit-counts.
1737                 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1738                 * disabled, and know the disk state is ok. */
1739                spin_lock(&device->al_lock);
1740                lc_reset(device->resync);
1741                device->resync_locked = 0;
1742                device->resync_wenr = LC_FREE;
1743                spin_unlock(&device->al_lock);
1744        }
1745        write_unlock(&global_state_lock);
1746        spin_unlock_irq(&device->resource->req_lock);
1747
1748        if (r == SS_SUCCESS) {
1749                wake_up(&device->al_wait); /* for lc_reset() above */
1750                /* reset rs_last_bcast when a resync or verify is started,
1751                 * to deal with potential jiffies wrap. */
1752                device->rs_last_bcast = jiffies - HZ;
1753
1754                drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1755                     drbd_conn_str(ns.conn),
1756                     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1757                     (unsigned long) device->rs_total);
1758                if (side == C_SYNC_TARGET) {
1759                        device->bm_resync_fo = 0;
1760                        device->use_csums = use_checksum_based_resync(connection, device);
1761                } else {
1762                        device->use_csums = 0;
1763                }
1764
1765                /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1766                 * with w_send_oos, or the sync target will get confused as to
1767                 * how much bits to resync.  We cannot do that always, because for an
1768                 * empty resync and protocol < 95, we need to do it here, as we call
1769                 * drbd_resync_finished from here in that case.
1770                 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1771                 * and from after_state_ch otherwise. */
1772                if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1773                        drbd_gen_and_send_sync_uuid(peer_device);
1774
1775                if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1776                        /* This still has a race (about when exactly the peers
1777                         * detect connection loss) that can lead to a full sync
1778                         * on next handshake. In 8.3.9 we fixed this with explicit
1779                         * resync-finished notifications, but the fix
1780                         * introduces a protocol change.  Sleeping for some
1781                         * time longer than the ping interval + timeout on the
1782                         * SyncSource, to give the SyncTarget the chance to
1783                         * detect connection loss, then waiting for a ping
1784                         * response (implicit in drbd_resync_finished) reduces
1785                         * the race considerably, but does not solve it. */
1786                        if (side == C_SYNC_SOURCE) {
1787                                struct net_conf *nc;
1788                                int timeo;
1789
1790                                rcu_read_lock();
1791                                nc = rcu_dereference(connection->net_conf);
1792                                timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1793                                rcu_read_unlock();
1794                                schedule_timeout_interruptible(timeo);
1795                        }
1796                        drbd_resync_finished(device);
1797                }
1798
1799                drbd_rs_controller_reset(device);
1800                /* ns.conn may already be != device->state.conn,
1801                 * we may have been paused in between, or become paused until
1802                 * the timer triggers.
1803                 * No matter, that is handled in resync_timer_fn() */
1804                if (ns.conn == C_SYNC_TARGET)
1805                        mod_timer(&device->resync_timer, jiffies);
1806
1807                drbd_md_sync(device);
1808        }
1809        put_ldev(device);
1810        mutex_unlock(device->state_mutex);
1811}
1812
1813static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1814{
1815        struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1816        device->rs_last_bcast = jiffies;
1817
1818        if (!get_ldev(device))
1819                return;
1820
1821        drbd_bm_write_lazy(device, 0);
1822        if (resync_done && is_sync_state(device->state.conn))
1823                drbd_resync_finished(device);
1824
1825        drbd_bcast_event(device, &sib);
1826        /* update timestamp, in case it took a while to write out stuff */
1827        device->rs_last_bcast = jiffies;
1828        put_ldev(device);
1829}
1830
1831static void drbd_ldev_destroy(struct drbd_device *device)
1832{
1833        lc_destroy(device->resync);
1834        device->resync = NULL;
1835        lc_destroy(device->act_log);
1836        device->act_log = NULL;
1837
1838        __acquire(local);
1839        drbd_free_ldev(device->ldev);
1840        device->ldev = NULL;
1841        __release(local);
1842
1843        clear_bit(GOING_DISKLESS, &device->flags);
1844        wake_up(&device->misc_wait);
1845}
1846
1847static void go_diskless(struct drbd_device *device)
1848{
1849        D_ASSERT(device, device->state.disk == D_FAILED);
1850        /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1851         * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1852         * the protected members anymore, though, so once put_ldev reaches zero
1853         * again, it will be safe to free them. */
1854
1855        /* Try to write changed bitmap pages, read errors may have just
1856         * set some bits outside the area covered by the activity log.
1857         *
1858         * If we have an IO error during the bitmap writeout,
1859         * we will want a full sync next time, just in case.
1860         * (Do we want a specific meta data flag for this?)
1861         *
1862         * If that does not make it to stable storage either,
1863         * we cannot do anything about that anymore.
1864         *
1865         * We still need to check if both bitmap and ldev are present, we may
1866         * end up here after a failed attach, before ldev was even assigned.
1867         */
1868        if (device->bitmap && device->ldev) {
1869                /* An interrupted resync or similar is allowed to recounts bits
1870                 * while we detach.
1871                 * Any modifications would not be expected anymore, though.
1872                 */
1873                if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1874                                        "detach", BM_LOCKED_TEST_ALLOWED)) {
1875                        if (test_bit(WAS_READ_ERROR, &device->flags)) {
1876                                drbd_md_set_flag(device, MDF_FULL_SYNC);
1877                                drbd_md_sync(device);
1878                        }
1879                }
1880        }
1881
1882        drbd_force_state(device, NS(disk, D_DISKLESS));
1883}
1884
1885static int do_md_sync(struct drbd_device *device)
1886{
1887        drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1888        drbd_md_sync(device);
1889        return 0;
1890}
1891
1892/* only called from drbd_worker thread, no locking */
1893void __update_timing_details(
1894                struct drbd_thread_timing_details *tdp,
1895                unsigned int *cb_nr,
1896                void *cb,
1897                const char *fn, const unsigned int line)
1898{
1899        unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1900        struct drbd_thread_timing_details *td = tdp + i;
1901
1902        td->start_jif = jiffies;
1903        td->cb_addr = cb;
1904        td->caller_fn = fn;
1905        td->line = line;
1906        td->cb_nr = *cb_nr;
1907
1908        i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1909        td = tdp + i;
1910        memset(td, 0, sizeof(*td));
1911
1912        ++(*cb_nr);
1913}
1914
1915static void do_device_work(struct drbd_device *device, const unsigned long todo)
1916{
1917        if (test_bit(MD_SYNC, &todo))
1918                do_md_sync(device);
1919        if (test_bit(RS_DONE, &todo) ||
1920            test_bit(RS_PROGRESS, &todo))
1921                update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1922        if (test_bit(GO_DISKLESS, &todo))
1923                go_diskless(device);
1924        if (test_bit(DESTROY_DISK, &todo))
1925                drbd_ldev_destroy(device);
1926        if (test_bit(RS_START, &todo))
1927                do_start_resync(device);
1928}
1929
1930#define DRBD_DEVICE_WORK_MASK   \
1931        ((1UL << GO_DISKLESS)   \
1932        |(1UL << DESTROY_DISK)  \
1933        |(1UL << MD_SYNC)       \
1934        |(1UL << RS_START)      \
1935        |(1UL << RS_PROGRESS)   \
1936        |(1UL << RS_DONE)       \
1937        )
1938
1939static unsigned long get_work_bits(unsigned long *flags)
1940{
1941        unsigned long old, new;
1942        do {
1943                old = *flags;
1944                new = old & ~DRBD_DEVICE_WORK_MASK;
1945        } while (cmpxchg(flags, old, new) != old);
1946        return old & DRBD_DEVICE_WORK_MASK;
1947}
1948
1949static void do_unqueued_work(struct drbd_connection *connection)
1950{
1951        struct drbd_peer_device *peer_device;
1952        int vnr;
1953
1954        rcu_read_lock();
1955        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1956                struct drbd_device *device = peer_device->device;
1957                unsigned long todo = get_work_bits(&device->flags);
1958                if (!todo)
1959                        continue;
1960
1961                kref_get(&device->kref);
1962                rcu_read_unlock();
1963                do_device_work(device, todo);
1964                kref_put(&device->kref, drbd_destroy_device);
1965                rcu_read_lock();
1966        }
1967        rcu_read_unlock();
1968}
1969
1970static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1971{
1972        spin_lock_irq(&queue->q_lock);
1973        list_splice_tail_init(&queue->q, work_list);
1974        spin_unlock_irq(&queue->q_lock);
1975        return !list_empty(work_list);
1976}
1977
1978static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1979{
1980        DEFINE_WAIT(wait);
1981        struct net_conf *nc;
1982        int uncork, cork;
1983
1984        dequeue_work_batch(&connection->sender_work, work_list);
1985        if (!list_empty(work_list))
1986                return;
1987
1988        /* Still nothing to do?
1989         * Maybe we still need to close the current epoch,
1990         * even if no new requests are queued yet.
1991         *
1992         * Also, poke TCP, just in case.
1993         * Then wait for new work (or signal). */
1994        rcu_read_lock();
1995        nc = rcu_dereference(connection->net_conf);
1996        uncork = nc ? nc->tcp_cork : 0;
1997        rcu_read_unlock();
1998        if (uncork) {
1999                mutex_lock(&connection->data.mutex);
2000                if (connection->data.socket)
2001                        drbd_tcp_uncork(connection->data.socket);
2002                mutex_unlock(&connection->data.mutex);
2003        }
2004
2005        for (;;) {
2006                int send_barrier;
2007                prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2008                spin_lock_irq(&connection->resource->req_lock);
2009                spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2010                if (!list_empty(&connection->sender_work.q))
2011                        list_splice_tail_init(&connection->sender_work.q, work_list);
2012                spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2013                if (!list_empty(work_list) || signal_pending(current)) {
2014                        spin_unlock_irq(&connection->resource->req_lock);
2015                        break;
2016                }
2017
2018                /* We found nothing new to do, no to-be-communicated request,
2019                 * no other work item.  We may still need to close the last
2020                 * epoch.  Next incoming request epoch will be connection ->
2021                 * current transfer log epoch number.  If that is different
2022                 * from the epoch of the last request we communicated, it is
2023                 * safe to send the epoch separating barrier now.
2024                 */
2025                send_barrier =
2026                        atomic_read(&connection->current_tle_nr) !=
2027                        connection->send.current_epoch_nr;
2028                spin_unlock_irq(&connection->resource->req_lock);
2029
2030                if (send_barrier)
2031                        maybe_send_barrier(connection,
2032                                        connection->send.current_epoch_nr + 1);
2033
2034                if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2035                        break;
2036
2037                /* drbd_send() may have called flush_signals() */
2038                if (get_t_state(&connection->worker) != RUNNING)
2039                        break;
2040
2041                schedule();
2042                /* may be woken up for other things but new work, too,
2043                 * e.g. if the current epoch got closed.
2044                 * In which case we send the barrier above. */
2045        }
2046        finish_wait(&connection->sender_work.q_wait, &wait);
2047
2048        /* someone may have changed the config while we have been waiting above. */
2049        rcu_read_lock();
2050        nc = rcu_dereference(connection->net_conf);
2051        cork = nc ? nc->tcp_cork : 0;
2052        rcu_read_unlock();
2053        mutex_lock(&connection->data.mutex);
2054        if (connection->data.socket) {
2055                if (cork)
2056                        drbd_tcp_cork(connection->data.socket);
2057                else if (!uncork)
2058                        drbd_tcp_uncork(connection->data.socket);
2059        }
2060        mutex_unlock(&connection->data.mutex);
2061}
2062
2063int drbd_worker(struct drbd_thread *thi)
2064{
2065        struct drbd_connection *connection = thi->connection;
2066        struct drbd_work *w = NULL;
2067        struct drbd_peer_device *peer_device;
2068        LIST_HEAD(work_list);
2069        int vnr;
2070
2071        while (get_t_state(thi) == RUNNING) {
2072                drbd_thread_current_set_cpu(thi);
2073
2074                if (list_empty(&work_list)) {
2075                        update_worker_timing_details(connection, wait_for_work);
2076                        wait_for_work(connection, &work_list);
2077                }
2078
2079                if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2080                        update_worker_timing_details(connection, do_unqueued_work);
2081                        do_unqueued_work(connection);
2082                }
2083
2084                if (signal_pending(current)) {
2085                        flush_signals(current);
2086                        if (get_t_state(thi) == RUNNING) {
2087                                drbd_warn(connection, "Worker got an unexpected signal\n");
2088                                continue;
2089                        }
2090                        break;
2091                }
2092
2093                if (get_t_state(thi) != RUNNING)
2094                        break;
2095
2096                if (!list_empty(&work_list)) {
2097                        w = list_first_entry(&work_list, struct drbd_work, list);
2098                        list_del_init(&w->list);
2099                        update_worker_timing_details(connection, w->cb);
2100                        if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2101                                continue;
2102                        if (connection->cstate >= C_WF_REPORT_PARAMS)
2103                                conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2104                }
2105        }
2106
2107        do {
2108                if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2109                        update_worker_timing_details(connection, do_unqueued_work);
2110                        do_unqueued_work(connection);
2111                }
2112                if (!list_empty(&work_list)) {
2113                        w = list_first_entry(&work_list, struct drbd_work, list);
2114                        list_del_init(&w->list);
2115                        update_worker_timing_details(connection, w->cb);
2116                        w->cb(w, 1);
2117                } else
2118                        dequeue_work_batch(&connection->sender_work, &work_list);
2119        } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2120
2121        rcu_read_lock();
2122        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2123                struct drbd_device *device = peer_device->device;
2124                D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2125                kref_get(&device->kref);
2126                rcu_read_unlock();
2127                drbd_device_cleanup(device);
2128                kref_put(&device->kref, drbd_destroy_device);
2129                rcu_read_lock();
2130        }
2131        rcu_read_unlock();
2132
2133        return 0;
2134}
2135