linux/drivers/block/drbd/drbd_worker.c
<<
>>
Prefs
   1/*
   2   drbd_worker.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23
  24*/
  25
  26#include <linux/module.h>
  27#include <linux/drbd.h>
  28#include <linux/sched.h>
  29#include <linux/wait.h>
  30#include <linux/mm.h>
  31#include <linux/memcontrol.h>
  32#include <linux/mm_inline.h>
  33#include <linux/slab.h>
  34#include <linux/random.h>
  35#include <linux/string.h>
  36#include <linux/scatterlist.h>
  37
  38#include "drbd_int.h"
  39#include "drbd_protocol.h"
  40#include "drbd_req.h"
  41
  42static int make_ov_request(struct drbd_device *, int);
  43static int make_resync_request(struct drbd_device *, int);
  44
  45/* endio handlers:
  46 *   drbd_md_endio (defined here)
  47 *   drbd_request_endio (defined here)
  48 *   drbd_peer_request_endio (defined here)
  49 *   drbd_bm_endio (defined in drbd_bitmap.c)
  50 *
  51 * For all these callbacks, note the following:
  52 * The callbacks will be called in irq context by the IDE drivers,
  53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
  54 * Try to get the locking right :)
  55 *
  56 */
  57
  58/* used for synchronous meta data and bitmap IO
  59 * submitted by drbd_md_sync_page_io()
  60 */
  61void drbd_md_endio(struct bio *bio)
  62{
  63        struct drbd_device *device;
  64
  65        device = bio->bi_private;
  66        device->md_io.error = bio->bi_error;
  67
  68        /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
  69         * to timeout on the lower level device, and eventually detach from it.
  70         * If this io completion runs after that timeout expired, this
  71         * drbd_md_put_buffer() may allow us to finally try and re-attach.
  72         * During normal operation, this only puts that extra reference
  73         * down to 1 again.
  74         * Make sure we first drop the reference, and only then signal
  75         * completion, or we may (in drbd_al_read_log()) cycle so fast into the
  76         * next drbd_md_sync_page_io(), that we trigger the
  77         * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
  78         */
  79        drbd_md_put_buffer(device);
  80        device->md_io.done = 1;
  81        wake_up(&device->misc_wait);
  82        bio_put(bio);
  83        if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
  84                put_ldev(device);
  85}
  86
  87/* reads on behalf of the partner,
  88 * "submitted" by the receiver
  89 */
  90static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
  91{
  92        unsigned long flags = 0;
  93        struct drbd_peer_device *peer_device = peer_req->peer_device;
  94        struct drbd_device *device = peer_device->device;
  95
  96        spin_lock_irqsave(&device->resource->req_lock, flags);
  97        device->read_cnt += peer_req->i.size >> 9;
  98        list_del(&peer_req->w.list);
  99        if (list_empty(&device->read_ee))
 100                wake_up(&device->ee_wait);
 101        if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
 102                __drbd_chk_io_error(device, DRBD_READ_ERROR);
 103        spin_unlock_irqrestore(&device->resource->req_lock, flags);
 104
 105        drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
 106        put_ldev(device);
 107}
 108
 109/* writes on behalf of the partner, or resync writes,
 110 * "submitted" by the receiver, final stage.  */
 111void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
 112{
 113        unsigned long flags = 0;
 114        struct drbd_peer_device *peer_device = peer_req->peer_device;
 115        struct drbd_device *device = peer_device->device;
 116        struct drbd_connection *connection = peer_device->connection;
 117        struct drbd_interval i;
 118        int do_wake;
 119        u64 block_id;
 120        int do_al_complete_io;
 121
 122        /* after we moved peer_req to done_ee,
 123         * we may no longer access it,
 124         * it may be freed/reused already!
 125         * (as soon as we release the req_lock) */
 126        i = peer_req->i;
 127        do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
 128        block_id = peer_req->block_id;
 129        peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
 130
 131        spin_lock_irqsave(&device->resource->req_lock, flags);
 132        device->writ_cnt += peer_req->i.size >> 9;
 133        list_move_tail(&peer_req->w.list, &device->done_ee);
 134
 135        /*
 136         * Do not remove from the write_requests tree here: we did not send the
 137         * Ack yet and did not wake possibly waiting conflicting requests.
 138         * Removed from the tree from "drbd_process_done_ee" within the
 139         * appropriate dw.cb (e_end_block/e_end_resync_block) or from
 140         * _drbd_clear_done_ee.
 141         */
 142
 143        do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
 144
 145        /* FIXME do we want to detach for failed REQ_DISCARD?
 146         * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
 147        if (peer_req->flags & EE_WAS_ERROR)
 148                __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
 149
 150        if (connection->cstate >= C_WF_REPORT_PARAMS) {
 151                kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
 152                if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
 153                        kref_put(&device->kref, drbd_destroy_device);
 154        }
 155        spin_unlock_irqrestore(&device->resource->req_lock, flags);
 156
 157        if (block_id == ID_SYNCER)
 158                drbd_rs_complete_io(device, i.sector);
 159
 160        if (do_wake)
 161                wake_up(&device->ee_wait);
 162
 163        if (do_al_complete_io)
 164                drbd_al_complete_io(device, &i);
 165
 166        put_ldev(device);
 167}
 168
 169/* writes on behalf of the partner, or resync writes,
 170 * "submitted" by the receiver.
 171 */
 172void drbd_peer_request_endio(struct bio *bio)
 173{
 174        struct drbd_peer_request *peer_req = bio->bi_private;
 175        struct drbd_device *device = peer_req->peer_device->device;
 176        int is_write = bio_data_dir(bio) == WRITE;
 177        int is_discard = !!(bio->bi_rw & REQ_DISCARD);
 178
 179        if (bio->bi_error && __ratelimit(&drbd_ratelimit_state))
 180                drbd_warn(device, "%s: error=%d s=%llus\n",
 181                                is_write ? (is_discard ? "discard" : "write")
 182                                        : "read", bio->bi_error,
 183                                (unsigned long long)peer_req->i.sector);
 184
 185        if (bio->bi_error)
 186                set_bit(__EE_WAS_ERROR, &peer_req->flags);
 187
 188        bio_put(bio); /* no need for the bio anymore */
 189        if (atomic_dec_and_test(&peer_req->pending_bios)) {
 190                if (is_write)
 191                        drbd_endio_write_sec_final(peer_req);
 192                else
 193                        drbd_endio_read_sec_final(peer_req);
 194        }
 195}
 196
 197void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
 198{
 199        panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
 200                device->minor, device->resource->name, device->vnr);
 201}
 202
 203/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
 204 */
 205void drbd_request_endio(struct bio *bio)
 206{
 207        unsigned long flags;
 208        struct drbd_request *req = bio->bi_private;
 209        struct drbd_device *device = req->device;
 210        struct bio_and_error m;
 211        enum drbd_req_event what;
 212
 213        /* If this request was aborted locally before,
 214         * but now was completed "successfully",
 215         * chances are that this caused arbitrary data corruption.
 216         *
 217         * "aborting" requests, or force-detaching the disk, is intended for
 218         * completely blocked/hung local backing devices which do no longer
 219         * complete requests at all, not even do error completions.  In this
 220         * situation, usually a hard-reset and failover is the only way out.
 221         *
 222         * By "aborting", basically faking a local error-completion,
 223         * we allow for a more graceful swichover by cleanly migrating services.
 224         * Still the affected node has to be rebooted "soon".
 225         *
 226         * By completing these requests, we allow the upper layers to re-use
 227         * the associated data pages.
 228         *
 229         * If later the local backing device "recovers", and now DMAs some data
 230         * from disk into the original request pages, in the best case it will
 231         * just put random data into unused pages; but typically it will corrupt
 232         * meanwhile completely unrelated data, causing all sorts of damage.
 233         *
 234         * Which means delayed successful completion,
 235         * especially for READ requests,
 236         * is a reason to panic().
 237         *
 238         * We assume that a delayed *error* completion is OK,
 239         * though we still will complain noisily about it.
 240         */
 241        if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
 242                if (__ratelimit(&drbd_ratelimit_state))
 243                        drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
 244
 245                if (!bio->bi_error)
 246                        drbd_panic_after_delayed_completion_of_aborted_request(device);
 247        }
 248
 249        /* to avoid recursion in __req_mod */
 250        if (unlikely(bio->bi_error)) {
 251                if (bio->bi_rw & REQ_DISCARD)
 252                        what = (bio->bi_error == -EOPNOTSUPP)
 253                                ? DISCARD_COMPLETED_NOTSUPP
 254                                : DISCARD_COMPLETED_WITH_ERROR;
 255                else
 256                        what = (bio_data_dir(bio) == WRITE)
 257                        ? WRITE_COMPLETED_WITH_ERROR
 258                        : (bio_rw(bio) == READ)
 259                          ? READ_COMPLETED_WITH_ERROR
 260                          : READ_AHEAD_COMPLETED_WITH_ERROR;
 261        } else
 262                what = COMPLETED_OK;
 263
 264        bio_put(req->private_bio);
 265        req->private_bio = ERR_PTR(bio->bi_error);
 266
 267        /* not req_mod(), we need irqsave here! */
 268        spin_lock_irqsave(&device->resource->req_lock, flags);
 269        __req_mod(req, what, &m);
 270        spin_unlock_irqrestore(&device->resource->req_lock, flags);
 271        put_ldev(device);
 272
 273        if (m.bio)
 274                complete_master_bio(device, &m);
 275}
 276
 277void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest)
 278{
 279        struct hash_desc desc;
 280        struct scatterlist sg;
 281        struct page *page = peer_req->pages;
 282        struct page *tmp;
 283        unsigned len;
 284
 285        desc.tfm = tfm;
 286        desc.flags = 0;
 287
 288        sg_init_table(&sg, 1);
 289        crypto_hash_init(&desc);
 290
 291        while ((tmp = page_chain_next(page))) {
 292                /* all but the last page will be fully used */
 293                sg_set_page(&sg, page, PAGE_SIZE, 0);
 294                crypto_hash_update(&desc, &sg, sg.length);
 295                page = tmp;
 296        }
 297        /* and now the last, possibly only partially used page */
 298        len = peer_req->i.size & (PAGE_SIZE - 1);
 299        sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
 300        crypto_hash_update(&desc, &sg, sg.length);
 301        crypto_hash_final(&desc, digest);
 302}
 303
 304void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest)
 305{
 306        struct hash_desc desc;
 307        struct scatterlist sg;
 308        struct bio_vec bvec;
 309        struct bvec_iter iter;
 310
 311        desc.tfm = tfm;
 312        desc.flags = 0;
 313
 314        sg_init_table(&sg, 1);
 315        crypto_hash_init(&desc);
 316
 317        bio_for_each_segment(bvec, bio, iter) {
 318                sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
 319                crypto_hash_update(&desc, &sg, sg.length);
 320        }
 321        crypto_hash_final(&desc, digest);
 322}
 323
 324/* MAYBE merge common code with w_e_end_ov_req */
 325static int w_e_send_csum(struct drbd_work *w, int cancel)
 326{
 327        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
 328        struct drbd_peer_device *peer_device = peer_req->peer_device;
 329        struct drbd_device *device = peer_device->device;
 330        int digest_size;
 331        void *digest;
 332        int err = 0;
 333
 334        if (unlikely(cancel))
 335                goto out;
 336
 337        if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
 338                goto out;
 339
 340        digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
 341        digest = kmalloc(digest_size, GFP_NOIO);
 342        if (digest) {
 343                sector_t sector = peer_req->i.sector;
 344                unsigned int size = peer_req->i.size;
 345                drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
 346                /* Free peer_req and pages before send.
 347                 * In case we block on congestion, we could otherwise run into
 348                 * some distributed deadlock, if the other side blocks on
 349                 * congestion as well, because our receiver blocks in
 350                 * drbd_alloc_pages due to pp_in_use > max_buffers. */
 351                drbd_free_peer_req(device, peer_req);
 352                peer_req = NULL;
 353                inc_rs_pending(device);
 354                err = drbd_send_drequest_csum(peer_device, sector, size,
 355                                              digest, digest_size,
 356                                              P_CSUM_RS_REQUEST);
 357                kfree(digest);
 358        } else {
 359                drbd_err(device, "kmalloc() of digest failed.\n");
 360                err = -ENOMEM;
 361        }
 362
 363out:
 364        if (peer_req)
 365                drbd_free_peer_req(device, peer_req);
 366
 367        if (unlikely(err))
 368                drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
 369        return err;
 370}
 371
 372#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
 373
 374static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
 375{
 376        struct drbd_device *device = peer_device->device;
 377        struct drbd_peer_request *peer_req;
 378
 379        if (!get_ldev(device))
 380                return -EIO;
 381
 382        /* GFP_TRY, because if there is no memory available right now, this may
 383         * be rescheduled for later. It is "only" background resync, after all. */
 384        peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
 385                                       size, true /* has real payload */, GFP_TRY);
 386        if (!peer_req)
 387                goto defer;
 388
 389        peer_req->w.cb = w_e_send_csum;
 390        spin_lock_irq(&device->resource->req_lock);
 391        list_add_tail(&peer_req->w.list, &device->read_ee);
 392        spin_unlock_irq(&device->resource->req_lock);
 393
 394        atomic_add(size >> 9, &device->rs_sect_ev);
 395        if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0)
 396                return 0;
 397
 398        /* If it failed because of ENOMEM, retry should help.  If it failed
 399         * because bio_add_page failed (probably broken lower level driver),
 400         * retry may or may not help.
 401         * If it does not, you may need to force disconnect. */
 402        spin_lock_irq(&device->resource->req_lock);
 403        list_del(&peer_req->w.list);
 404        spin_unlock_irq(&device->resource->req_lock);
 405
 406        drbd_free_peer_req(device, peer_req);
 407defer:
 408        put_ldev(device);
 409        return -EAGAIN;
 410}
 411
 412int w_resync_timer(struct drbd_work *w, int cancel)
 413{
 414        struct drbd_device *device =
 415                container_of(w, struct drbd_device, resync_work);
 416
 417        switch (device->state.conn) {
 418        case C_VERIFY_S:
 419                make_ov_request(device, cancel);
 420                break;
 421        case C_SYNC_TARGET:
 422                make_resync_request(device, cancel);
 423                break;
 424        }
 425
 426        return 0;
 427}
 428
 429void resync_timer_fn(unsigned long data)
 430{
 431        struct drbd_device *device = (struct drbd_device *) data;
 432
 433        drbd_queue_work_if_unqueued(
 434                &first_peer_device(device)->connection->sender_work,
 435                &device->resync_work);
 436}
 437
 438static void fifo_set(struct fifo_buffer *fb, int value)
 439{
 440        int i;
 441
 442        for (i = 0; i < fb->size; i++)
 443                fb->values[i] = value;
 444}
 445
 446static int fifo_push(struct fifo_buffer *fb, int value)
 447{
 448        int ov;
 449
 450        ov = fb->values[fb->head_index];
 451        fb->values[fb->head_index++] = value;
 452
 453        if (fb->head_index >= fb->size)
 454                fb->head_index = 0;
 455
 456        return ov;
 457}
 458
 459static void fifo_add_val(struct fifo_buffer *fb, int value)
 460{
 461        int i;
 462
 463        for (i = 0; i < fb->size; i++)
 464                fb->values[i] += value;
 465}
 466
 467struct fifo_buffer *fifo_alloc(int fifo_size)
 468{
 469        struct fifo_buffer *fb;
 470
 471        fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
 472        if (!fb)
 473                return NULL;
 474
 475        fb->head_index = 0;
 476        fb->size = fifo_size;
 477        fb->total = 0;
 478
 479        return fb;
 480}
 481
 482static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
 483{
 484        struct disk_conf *dc;
 485        unsigned int want;     /* The number of sectors we want in-flight */
 486        int req_sect; /* Number of sectors to request in this turn */
 487        int correction; /* Number of sectors more we need in-flight */
 488        int cps; /* correction per invocation of drbd_rs_controller() */
 489        int steps; /* Number of time steps to plan ahead */
 490        int curr_corr;
 491        int max_sect;
 492        struct fifo_buffer *plan;
 493
 494        dc = rcu_dereference(device->ldev->disk_conf);
 495        plan = rcu_dereference(device->rs_plan_s);
 496
 497        steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
 498
 499        if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
 500                want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
 501        } else { /* normal path */
 502                want = dc->c_fill_target ? dc->c_fill_target :
 503                        sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
 504        }
 505
 506        correction = want - device->rs_in_flight - plan->total;
 507
 508        /* Plan ahead */
 509        cps = correction / steps;
 510        fifo_add_val(plan, cps);
 511        plan->total += cps * steps;
 512
 513        /* What we do in this step */
 514        curr_corr = fifo_push(plan, 0);
 515        plan->total -= curr_corr;
 516
 517        req_sect = sect_in + curr_corr;
 518        if (req_sect < 0)
 519                req_sect = 0;
 520
 521        max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
 522        if (req_sect > max_sect)
 523                req_sect = max_sect;
 524
 525        /*
 526        drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
 527                 sect_in, device->rs_in_flight, want, correction,
 528                 steps, cps, device->rs_planed, curr_corr, req_sect);
 529        */
 530
 531        return req_sect;
 532}
 533
 534static int drbd_rs_number_requests(struct drbd_device *device)
 535{
 536        unsigned int sect_in;  /* Number of sectors that came in since the last turn */
 537        int number, mxb;
 538
 539        sect_in = atomic_xchg(&device->rs_sect_in, 0);
 540        device->rs_in_flight -= sect_in;
 541
 542        rcu_read_lock();
 543        mxb = drbd_get_max_buffers(device) / 2;
 544        if (rcu_dereference(device->rs_plan_s)->size) {
 545                number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
 546                device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
 547        } else {
 548                device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
 549                number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
 550        }
 551        rcu_read_unlock();
 552
 553        /* Don't have more than "max-buffers"/2 in-flight.
 554         * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
 555         * potentially causing a distributed deadlock on congestion during
 556         * online-verify or (checksum-based) resync, if max-buffers,
 557         * socket buffer sizes and resync rate settings are mis-configured. */
 558
 559        /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
 560         * mxb (as used here, and in drbd_alloc_pages on the peer) is
 561         * "number of pages" (typically also 4k),
 562         * but "rs_in_flight" is in "sectors" (512 Byte). */
 563        if (mxb - device->rs_in_flight/8 < number)
 564                number = mxb - device->rs_in_flight/8;
 565
 566        return number;
 567}
 568
 569static int make_resync_request(struct drbd_device *const device, int cancel)
 570{
 571        struct drbd_peer_device *const peer_device = first_peer_device(device);
 572        struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
 573        unsigned long bit;
 574        sector_t sector;
 575        const sector_t capacity = drbd_get_capacity(device->this_bdev);
 576        int max_bio_size;
 577        int number, rollback_i, size;
 578        int align, requeue = 0;
 579        int i = 0;
 580
 581        if (unlikely(cancel))
 582                return 0;
 583
 584        if (device->rs_total == 0) {
 585                /* empty resync? */
 586                drbd_resync_finished(device);
 587                return 0;
 588        }
 589
 590        if (!get_ldev(device)) {
 591                /* Since we only need to access device->rsync a
 592                   get_ldev_if_state(device,D_FAILED) would be sufficient, but
 593                   to continue resync with a broken disk makes no sense at
 594                   all */
 595                drbd_err(device, "Disk broke down during resync!\n");
 596                return 0;
 597        }
 598
 599        max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
 600        number = drbd_rs_number_requests(device);
 601        if (number <= 0)
 602                goto requeue;
 603
 604        for (i = 0; i < number; i++) {
 605                /* Stop generating RS requests when half of the send buffer is filled,
 606                 * but notify TCP that we'd like to have more space. */
 607                mutex_lock(&connection->data.mutex);
 608                if (connection->data.socket) {
 609                        struct sock *sk = connection->data.socket->sk;
 610                        int queued = sk->sk_wmem_queued;
 611                        int sndbuf = sk->sk_sndbuf;
 612                        if (queued > sndbuf / 2) {
 613                                requeue = 1;
 614                                if (sk->sk_socket)
 615                                        set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 616                        }
 617                } else
 618                        requeue = 1;
 619                mutex_unlock(&connection->data.mutex);
 620                if (requeue)
 621                        goto requeue;
 622
 623next_sector:
 624                size = BM_BLOCK_SIZE;
 625                bit  = drbd_bm_find_next(device, device->bm_resync_fo);
 626
 627                if (bit == DRBD_END_OF_BITMAP) {
 628                        device->bm_resync_fo = drbd_bm_bits(device);
 629                        put_ldev(device);
 630                        return 0;
 631                }
 632
 633                sector = BM_BIT_TO_SECT(bit);
 634
 635                if (drbd_try_rs_begin_io(device, sector)) {
 636                        device->bm_resync_fo = bit;
 637                        goto requeue;
 638                }
 639                device->bm_resync_fo = bit + 1;
 640
 641                if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
 642                        drbd_rs_complete_io(device, sector);
 643                        goto next_sector;
 644                }
 645
 646#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
 647                /* try to find some adjacent bits.
 648                 * we stop if we have already the maximum req size.
 649                 *
 650                 * Additionally always align bigger requests, in order to
 651                 * be prepared for all stripe sizes of software RAIDs.
 652                 */
 653                align = 1;
 654                rollback_i = i;
 655                while (i < number) {
 656                        if (size + BM_BLOCK_SIZE > max_bio_size)
 657                                break;
 658
 659                        /* Be always aligned */
 660                        if (sector & ((1<<(align+3))-1))
 661                                break;
 662
 663                        /* do not cross extent boundaries */
 664                        if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
 665                                break;
 666                        /* now, is it actually dirty, after all?
 667                         * caution, drbd_bm_test_bit is tri-state for some
 668                         * obscure reason; ( b == 0 ) would get the out-of-band
 669                         * only accidentally right because of the "oddly sized"
 670                         * adjustment below */
 671                        if (drbd_bm_test_bit(device, bit+1) != 1)
 672                                break;
 673                        bit++;
 674                        size += BM_BLOCK_SIZE;
 675                        if ((BM_BLOCK_SIZE << align) <= size)
 676                                align++;
 677                        i++;
 678                }
 679                /* if we merged some,
 680                 * reset the offset to start the next drbd_bm_find_next from */
 681                if (size > BM_BLOCK_SIZE)
 682                        device->bm_resync_fo = bit + 1;
 683#endif
 684
 685                /* adjust very last sectors, in case we are oddly sized */
 686                if (sector + (size>>9) > capacity)
 687                        size = (capacity-sector)<<9;
 688
 689                if (device->use_csums) {
 690                        switch (read_for_csum(peer_device, sector, size)) {
 691                        case -EIO: /* Disk failure */
 692                                put_ldev(device);
 693                                return -EIO;
 694                        case -EAGAIN: /* allocation failed, or ldev busy */
 695                                drbd_rs_complete_io(device, sector);
 696                                device->bm_resync_fo = BM_SECT_TO_BIT(sector);
 697                                i = rollback_i;
 698                                goto requeue;
 699                        case 0:
 700                                /* everything ok */
 701                                break;
 702                        default:
 703                                BUG();
 704                        }
 705                } else {
 706                        int err;
 707
 708                        inc_rs_pending(device);
 709                        err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST,
 710                                                 sector, size, ID_SYNCER);
 711                        if (err) {
 712                                drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
 713                                dec_rs_pending(device);
 714                                put_ldev(device);
 715                                return err;
 716                        }
 717                }
 718        }
 719
 720        if (device->bm_resync_fo >= drbd_bm_bits(device)) {
 721                /* last syncer _request_ was sent,
 722                 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
 723                 * next sync group will resume), as soon as we receive the last
 724                 * resync data block, and the last bit is cleared.
 725                 * until then resync "work" is "inactive" ...
 726                 */
 727                put_ldev(device);
 728                return 0;
 729        }
 730
 731 requeue:
 732        device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 733        mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 734        put_ldev(device);
 735        return 0;
 736}
 737
 738static int make_ov_request(struct drbd_device *device, int cancel)
 739{
 740        int number, i, size;
 741        sector_t sector;
 742        const sector_t capacity = drbd_get_capacity(device->this_bdev);
 743        bool stop_sector_reached = false;
 744
 745        if (unlikely(cancel))
 746                return 1;
 747
 748        number = drbd_rs_number_requests(device);
 749
 750        sector = device->ov_position;
 751        for (i = 0; i < number; i++) {
 752                if (sector >= capacity)
 753                        return 1;
 754
 755                /* We check for "finished" only in the reply path:
 756                 * w_e_end_ov_reply().
 757                 * We need to send at least one request out. */
 758                stop_sector_reached = i > 0
 759                        && verify_can_do_stop_sector(device)
 760                        && sector >= device->ov_stop_sector;
 761                if (stop_sector_reached)
 762                        break;
 763
 764                size = BM_BLOCK_SIZE;
 765
 766                if (drbd_try_rs_begin_io(device, sector)) {
 767                        device->ov_position = sector;
 768                        goto requeue;
 769                }
 770
 771                if (sector + (size>>9) > capacity)
 772                        size = (capacity-sector)<<9;
 773
 774                inc_rs_pending(device);
 775                if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
 776                        dec_rs_pending(device);
 777                        return 0;
 778                }
 779                sector += BM_SECT_PER_BIT;
 780        }
 781        device->ov_position = sector;
 782
 783 requeue:
 784        device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 785        if (i == 0 || !stop_sector_reached)
 786                mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 787        return 1;
 788}
 789
 790int w_ov_finished(struct drbd_work *w, int cancel)
 791{
 792        struct drbd_device_work *dw =
 793                container_of(w, struct drbd_device_work, w);
 794        struct drbd_device *device = dw->device;
 795        kfree(dw);
 796        ov_out_of_sync_print(device);
 797        drbd_resync_finished(device);
 798
 799        return 0;
 800}
 801
 802static int w_resync_finished(struct drbd_work *w, int cancel)
 803{
 804        struct drbd_device_work *dw =
 805                container_of(w, struct drbd_device_work, w);
 806        struct drbd_device *device = dw->device;
 807        kfree(dw);
 808
 809        drbd_resync_finished(device);
 810
 811        return 0;
 812}
 813
 814static void ping_peer(struct drbd_device *device)
 815{
 816        struct drbd_connection *connection = first_peer_device(device)->connection;
 817
 818        clear_bit(GOT_PING_ACK, &connection->flags);
 819        request_ping(connection);
 820        wait_event(connection->ping_wait,
 821                   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
 822}
 823
 824int drbd_resync_finished(struct drbd_device *device)
 825{
 826        unsigned long db, dt, dbdt;
 827        unsigned long n_oos;
 828        union drbd_state os, ns;
 829        struct drbd_device_work *dw;
 830        char *khelper_cmd = NULL;
 831        int verify_done = 0;
 832
 833        /* Remove all elements from the resync LRU. Since future actions
 834         * might set bits in the (main) bitmap, then the entries in the
 835         * resync LRU would be wrong. */
 836        if (drbd_rs_del_all(device)) {
 837                /* In case this is not possible now, most probably because
 838                 * there are P_RS_DATA_REPLY Packets lingering on the worker's
 839                 * queue (or even the read operations for those packets
 840                 * is not finished by now).   Retry in 100ms. */
 841
 842                schedule_timeout_interruptible(HZ / 10);
 843                dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
 844                if (dw) {
 845                        dw->w.cb = w_resync_finished;
 846                        dw->device = device;
 847                        drbd_queue_work(&first_peer_device(device)->connection->sender_work,
 848                                        &dw->w);
 849                        return 1;
 850                }
 851                drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
 852        }
 853
 854        dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
 855        if (dt <= 0)
 856                dt = 1;
 857
 858        db = device->rs_total;
 859        /* adjust for verify start and stop sectors, respective reached position */
 860        if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
 861                db -= device->ov_left;
 862
 863        dbdt = Bit2KB(db/dt);
 864        device->rs_paused /= HZ;
 865
 866        if (!get_ldev(device))
 867                goto out;
 868
 869        ping_peer(device);
 870
 871        spin_lock_irq(&device->resource->req_lock);
 872        os = drbd_read_state(device);
 873
 874        verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
 875
 876        /* This protects us against multiple calls (that can happen in the presence
 877           of application IO), and against connectivity loss just before we arrive here. */
 878        if (os.conn <= C_CONNECTED)
 879                goto out_unlock;
 880
 881        ns = os;
 882        ns.conn = C_CONNECTED;
 883
 884        drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
 885             verify_done ? "Online verify" : "Resync",
 886             dt + device->rs_paused, device->rs_paused, dbdt);
 887
 888        n_oos = drbd_bm_total_weight(device);
 889
 890        if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
 891                if (n_oos) {
 892                        drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
 893                              n_oos, Bit2KB(1));
 894                        khelper_cmd = "out-of-sync";
 895                }
 896        } else {
 897                D_ASSERT(device, (n_oos - device->rs_failed) == 0);
 898
 899                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
 900                        khelper_cmd = "after-resync-target";
 901
 902                if (device->use_csums && device->rs_total) {
 903                        const unsigned long s = device->rs_same_csum;
 904                        const unsigned long t = device->rs_total;
 905                        const int ratio =
 906                                (t == 0)     ? 0 :
 907                        (t < 100000) ? ((s*100)/t) : (s/(t/100));
 908                        drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
 909                             "transferred %luK total %luK\n",
 910                             ratio,
 911                             Bit2KB(device->rs_same_csum),
 912                             Bit2KB(device->rs_total - device->rs_same_csum),
 913                             Bit2KB(device->rs_total));
 914                }
 915        }
 916
 917        if (device->rs_failed) {
 918                drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
 919
 920                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 921                        ns.disk = D_INCONSISTENT;
 922                        ns.pdsk = D_UP_TO_DATE;
 923                } else {
 924                        ns.disk = D_UP_TO_DATE;
 925                        ns.pdsk = D_INCONSISTENT;
 926                }
 927        } else {
 928                ns.disk = D_UP_TO_DATE;
 929                ns.pdsk = D_UP_TO_DATE;
 930
 931                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 932                        if (device->p_uuid) {
 933                                int i;
 934                                for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
 935                                        _drbd_uuid_set(device, i, device->p_uuid[i]);
 936                                drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
 937                                _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
 938                        } else {
 939                                drbd_err(device, "device->p_uuid is NULL! BUG\n");
 940                        }
 941                }
 942
 943                if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
 944                        /* for verify runs, we don't update uuids here,
 945                         * so there would be nothing to report. */
 946                        drbd_uuid_set_bm(device, 0UL);
 947                        drbd_print_uuids(device, "updated UUIDs");
 948                        if (device->p_uuid) {
 949                                /* Now the two UUID sets are equal, update what we
 950                                 * know of the peer. */
 951                                int i;
 952                                for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
 953                                        device->p_uuid[i] = device->ldev->md.uuid[i];
 954                        }
 955                }
 956        }
 957
 958        _drbd_set_state(device, ns, CS_VERBOSE, NULL);
 959out_unlock:
 960        spin_unlock_irq(&device->resource->req_lock);
 961        put_ldev(device);
 962out:
 963        device->rs_total  = 0;
 964        device->rs_failed = 0;
 965        device->rs_paused = 0;
 966
 967        /* reset start sector, if we reached end of device */
 968        if (verify_done && device->ov_left == 0)
 969                device->ov_start_sector = 0;
 970
 971        drbd_md_sync(device);
 972
 973        if (khelper_cmd)
 974                drbd_khelper(device, khelper_cmd);
 975
 976        return 1;
 977}
 978
 979/* helper */
 980static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
 981{
 982        if (drbd_peer_req_has_active_page(peer_req)) {
 983                /* This might happen if sendpage() has not finished */
 984                int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
 985                atomic_add(i, &device->pp_in_use_by_net);
 986                atomic_sub(i, &device->pp_in_use);
 987                spin_lock_irq(&device->resource->req_lock);
 988                list_add_tail(&peer_req->w.list, &device->net_ee);
 989                spin_unlock_irq(&device->resource->req_lock);
 990                wake_up(&drbd_pp_wait);
 991        } else
 992                drbd_free_peer_req(device, peer_req);
 993}
 994
 995/**
 996 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
 997 * @device:     DRBD device.
 998 * @w:          work object.
 999 * @cancel:     The connection will be closed anyways
1000 */
1001int w_e_end_data_req(struct drbd_work *w, int cancel)
1002{
1003        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1004        struct drbd_peer_device *peer_device = peer_req->peer_device;
1005        struct drbd_device *device = peer_device->device;
1006        int err;
1007
1008        if (unlikely(cancel)) {
1009                drbd_free_peer_req(device, peer_req);
1010                dec_unacked(device);
1011                return 0;
1012        }
1013
1014        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1015                err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1016        } else {
1017                if (__ratelimit(&drbd_ratelimit_state))
1018                        drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1019                            (unsigned long long)peer_req->i.sector);
1020
1021                err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1022        }
1023
1024        dec_unacked(device);
1025
1026        move_to_net_ee_or_free(device, peer_req);
1027
1028        if (unlikely(err))
1029                drbd_err(device, "drbd_send_block() failed\n");
1030        return err;
1031}
1032
1033/**
1034 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1035 * @w:          work object.
1036 * @cancel:     The connection will be closed anyways
1037 */
1038int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1039{
1040        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1041        struct drbd_peer_device *peer_device = peer_req->peer_device;
1042        struct drbd_device *device = peer_device->device;
1043        int err;
1044
1045        if (unlikely(cancel)) {
1046                drbd_free_peer_req(device, peer_req);
1047                dec_unacked(device);
1048                return 0;
1049        }
1050
1051        if (get_ldev_if_state(device, D_FAILED)) {
1052                drbd_rs_complete_io(device, peer_req->i.sector);
1053                put_ldev(device);
1054        }
1055
1056        if (device->state.conn == C_AHEAD) {
1057                err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1058        } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1059                if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1060                        inc_rs_pending(device);
1061                        err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1062                } else {
1063                        if (__ratelimit(&drbd_ratelimit_state))
1064                                drbd_err(device, "Not sending RSDataReply, "
1065                                    "partner DISKLESS!\n");
1066                        err = 0;
1067                }
1068        } else {
1069                if (__ratelimit(&drbd_ratelimit_state))
1070                        drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1071                            (unsigned long long)peer_req->i.sector);
1072
1073                err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1074
1075                /* update resync data with failure */
1076                drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1077        }
1078
1079        dec_unacked(device);
1080
1081        move_to_net_ee_or_free(device, peer_req);
1082
1083        if (unlikely(err))
1084                drbd_err(device, "drbd_send_block() failed\n");
1085        return err;
1086}
1087
1088int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1089{
1090        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1091        struct drbd_peer_device *peer_device = peer_req->peer_device;
1092        struct drbd_device *device = peer_device->device;
1093        struct digest_info *di;
1094        int digest_size;
1095        void *digest = NULL;
1096        int err, eq = 0;
1097
1098        if (unlikely(cancel)) {
1099                drbd_free_peer_req(device, peer_req);
1100                dec_unacked(device);
1101                return 0;
1102        }
1103
1104        if (get_ldev(device)) {
1105                drbd_rs_complete_io(device, peer_req->i.sector);
1106                put_ldev(device);
1107        }
1108
1109        di = peer_req->digest;
1110
1111        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1112                /* quick hack to try to avoid a race against reconfiguration.
1113                 * a real fix would be much more involved,
1114                 * introducing more locking mechanisms */
1115                if (peer_device->connection->csums_tfm) {
1116                        digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm);
1117                        D_ASSERT(device, digest_size == di->digest_size);
1118                        digest = kmalloc(digest_size, GFP_NOIO);
1119                }
1120                if (digest) {
1121                        drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1122                        eq = !memcmp(digest, di->digest, digest_size);
1123                        kfree(digest);
1124                }
1125
1126                if (eq) {
1127                        drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1128                        /* rs_same_csums unit is BM_BLOCK_SIZE */
1129                        device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1130                        err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1131                } else {
1132                        inc_rs_pending(device);
1133                        peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1134                        peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1135                        kfree(di);
1136                        err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1137                }
1138        } else {
1139                err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1140                if (__ratelimit(&drbd_ratelimit_state))
1141                        drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1142        }
1143
1144        dec_unacked(device);
1145        move_to_net_ee_or_free(device, peer_req);
1146
1147        if (unlikely(err))
1148                drbd_err(device, "drbd_send_block/ack() failed\n");
1149        return err;
1150}
1151
1152int w_e_end_ov_req(struct drbd_work *w, int cancel)
1153{
1154        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1155        struct drbd_peer_device *peer_device = peer_req->peer_device;
1156        struct drbd_device *device = peer_device->device;
1157        sector_t sector = peer_req->i.sector;
1158        unsigned int size = peer_req->i.size;
1159        int digest_size;
1160        void *digest;
1161        int err = 0;
1162
1163        if (unlikely(cancel))
1164                goto out;
1165
1166        digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1167        digest = kmalloc(digest_size, GFP_NOIO);
1168        if (!digest) {
1169                err = 1;        /* terminate the connection in case the allocation failed */
1170                goto out;
1171        }
1172
1173        if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1174                drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1175        else
1176                memset(digest, 0, digest_size);
1177
1178        /* Free e and pages before send.
1179         * In case we block on congestion, we could otherwise run into
1180         * some distributed deadlock, if the other side blocks on
1181         * congestion as well, because our receiver blocks in
1182         * drbd_alloc_pages due to pp_in_use > max_buffers. */
1183        drbd_free_peer_req(device, peer_req);
1184        peer_req = NULL;
1185        inc_rs_pending(device);
1186        err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1187        if (err)
1188                dec_rs_pending(device);
1189        kfree(digest);
1190
1191out:
1192        if (peer_req)
1193                drbd_free_peer_req(device, peer_req);
1194        dec_unacked(device);
1195        return err;
1196}
1197
1198void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1199{
1200        if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1201                device->ov_last_oos_size += size>>9;
1202        } else {
1203                device->ov_last_oos_start = sector;
1204                device->ov_last_oos_size = size>>9;
1205        }
1206        drbd_set_out_of_sync(device, sector, size);
1207}
1208
1209int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1210{
1211        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1212        struct drbd_peer_device *peer_device = peer_req->peer_device;
1213        struct drbd_device *device = peer_device->device;
1214        struct digest_info *di;
1215        void *digest;
1216        sector_t sector = peer_req->i.sector;
1217        unsigned int size = peer_req->i.size;
1218        int digest_size;
1219        int err, eq = 0;
1220        bool stop_sector_reached = false;
1221
1222        if (unlikely(cancel)) {
1223                drbd_free_peer_req(device, peer_req);
1224                dec_unacked(device);
1225                return 0;
1226        }
1227
1228        /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1229         * the resync lru has been cleaned up already */
1230        if (get_ldev(device)) {
1231                drbd_rs_complete_io(device, peer_req->i.sector);
1232                put_ldev(device);
1233        }
1234
1235        di = peer_req->digest;
1236
1237        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1238                digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm);
1239                digest = kmalloc(digest_size, GFP_NOIO);
1240                if (digest) {
1241                        drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1242
1243                        D_ASSERT(device, digest_size == di->digest_size);
1244                        eq = !memcmp(digest, di->digest, digest_size);
1245                        kfree(digest);
1246                }
1247        }
1248
1249        /* Free peer_req and pages before send.
1250         * In case we block on congestion, we could otherwise run into
1251         * some distributed deadlock, if the other side blocks on
1252         * congestion as well, because our receiver blocks in
1253         * drbd_alloc_pages due to pp_in_use > max_buffers. */
1254        drbd_free_peer_req(device, peer_req);
1255        if (!eq)
1256                drbd_ov_out_of_sync_found(device, sector, size);
1257        else
1258                ov_out_of_sync_print(device);
1259
1260        err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1261                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1262
1263        dec_unacked(device);
1264
1265        --device->ov_left;
1266
1267        /* let's advance progress step marks only for every other megabyte */
1268        if ((device->ov_left & 0x200) == 0x200)
1269                drbd_advance_rs_marks(device, device->ov_left);
1270
1271        stop_sector_reached = verify_can_do_stop_sector(device) &&
1272                (sector + (size>>9)) >= device->ov_stop_sector;
1273
1274        if (device->ov_left == 0 || stop_sector_reached) {
1275                ov_out_of_sync_print(device);
1276                drbd_resync_finished(device);
1277        }
1278
1279        return err;
1280}
1281
1282/* FIXME
1283 * We need to track the number of pending barrier acks,
1284 * and to be able to wait for them.
1285 * See also comment in drbd_adm_attach before drbd_suspend_io.
1286 */
1287static int drbd_send_barrier(struct drbd_connection *connection)
1288{
1289        struct p_barrier *p;
1290        struct drbd_socket *sock;
1291
1292        sock = &connection->data;
1293        p = conn_prepare_command(connection, sock);
1294        if (!p)
1295                return -EIO;
1296        p->barrier = connection->send.current_epoch_nr;
1297        p->pad = 0;
1298        connection->send.current_epoch_writes = 0;
1299        connection->send.last_sent_barrier_jif = jiffies;
1300
1301        return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1302}
1303
1304int w_send_write_hint(struct drbd_work *w, int cancel)
1305{
1306        struct drbd_device *device =
1307                container_of(w, struct drbd_device, unplug_work);
1308        struct drbd_socket *sock;
1309
1310        if (cancel)
1311                return 0;
1312        sock = &first_peer_device(device)->connection->data;
1313        if (!drbd_prepare_command(first_peer_device(device), sock))
1314                return -EIO;
1315        return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1316}
1317
1318static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1319{
1320        if (!connection->send.seen_any_write_yet) {
1321                connection->send.seen_any_write_yet = true;
1322                connection->send.current_epoch_nr = epoch;
1323                connection->send.current_epoch_writes = 0;
1324                connection->send.last_sent_barrier_jif = jiffies;
1325        }
1326}
1327
1328static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1329{
1330        /* re-init if first write on this connection */
1331        if (!connection->send.seen_any_write_yet)
1332                return;
1333        if (connection->send.current_epoch_nr != epoch) {
1334                if (connection->send.current_epoch_writes)
1335                        drbd_send_barrier(connection);
1336                connection->send.current_epoch_nr = epoch;
1337        }
1338}
1339
1340int w_send_out_of_sync(struct drbd_work *w, int cancel)
1341{
1342        struct drbd_request *req = container_of(w, struct drbd_request, w);
1343        struct drbd_device *device = req->device;
1344        struct drbd_peer_device *const peer_device = first_peer_device(device);
1345        struct drbd_connection *const connection = peer_device->connection;
1346        int err;
1347
1348        if (unlikely(cancel)) {
1349                req_mod(req, SEND_CANCELED);
1350                return 0;
1351        }
1352        req->pre_send_jif = jiffies;
1353
1354        /* this time, no connection->send.current_epoch_writes++;
1355         * If it was sent, it was the closing barrier for the last
1356         * replicated epoch, before we went into AHEAD mode.
1357         * No more barriers will be sent, until we leave AHEAD mode again. */
1358        maybe_send_barrier(connection, req->epoch);
1359
1360        err = drbd_send_out_of_sync(peer_device, req);
1361        req_mod(req, OOS_HANDED_TO_NETWORK);
1362
1363        return err;
1364}
1365
1366/**
1367 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1368 * @w:          work object.
1369 * @cancel:     The connection will be closed anyways
1370 */
1371int w_send_dblock(struct drbd_work *w, int cancel)
1372{
1373        struct drbd_request *req = container_of(w, struct drbd_request, w);
1374        struct drbd_device *device = req->device;
1375        struct drbd_peer_device *const peer_device = first_peer_device(device);
1376        struct drbd_connection *connection = peer_device->connection;
1377        int err;
1378
1379        if (unlikely(cancel)) {
1380                req_mod(req, SEND_CANCELED);
1381                return 0;
1382        }
1383        req->pre_send_jif = jiffies;
1384
1385        re_init_if_first_write(connection, req->epoch);
1386        maybe_send_barrier(connection, req->epoch);
1387        connection->send.current_epoch_writes++;
1388
1389        err = drbd_send_dblock(peer_device, req);
1390        req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1391
1392        return err;
1393}
1394
1395/**
1396 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1397 * @w:          work object.
1398 * @cancel:     The connection will be closed anyways
1399 */
1400int w_send_read_req(struct drbd_work *w, int cancel)
1401{
1402        struct drbd_request *req = container_of(w, struct drbd_request, w);
1403        struct drbd_device *device = req->device;
1404        struct drbd_peer_device *const peer_device = first_peer_device(device);
1405        struct drbd_connection *connection = peer_device->connection;
1406        int err;
1407
1408        if (unlikely(cancel)) {
1409                req_mod(req, SEND_CANCELED);
1410                return 0;
1411        }
1412        req->pre_send_jif = jiffies;
1413
1414        /* Even read requests may close a write epoch,
1415         * if there was any yet. */
1416        maybe_send_barrier(connection, req->epoch);
1417
1418        err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1419                                 (unsigned long)req);
1420
1421        req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1422
1423        return err;
1424}
1425
1426int w_restart_disk_io(struct drbd_work *w, int cancel)
1427{
1428        struct drbd_request *req = container_of(w, struct drbd_request, w);
1429        struct drbd_device *device = req->device;
1430
1431        if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1432                drbd_al_begin_io(device, &req->i);
1433
1434        drbd_req_make_private_bio(req, req->master_bio);
1435        req->private_bio->bi_bdev = device->ldev->backing_bdev;
1436        generic_make_request(req->private_bio);
1437
1438        return 0;
1439}
1440
1441static int _drbd_may_sync_now(struct drbd_device *device)
1442{
1443        struct drbd_device *odev = device;
1444        int resync_after;
1445
1446        while (1) {
1447                if (!odev->ldev || odev->state.disk == D_DISKLESS)
1448                        return 1;
1449                rcu_read_lock();
1450                resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1451                rcu_read_unlock();
1452                if (resync_after == -1)
1453                        return 1;
1454                odev = minor_to_device(resync_after);
1455                if (!odev)
1456                        return 1;
1457                if ((odev->state.conn >= C_SYNC_SOURCE &&
1458                     odev->state.conn <= C_PAUSED_SYNC_T) ||
1459                    odev->state.aftr_isp || odev->state.peer_isp ||
1460                    odev->state.user_isp)
1461                        return 0;
1462        }
1463}
1464
1465/**
1466 * drbd_pause_after() - Pause resync on all devices that may not resync now
1467 * @device:     DRBD device.
1468 *
1469 * Called from process context only (admin command and after_state_ch).
1470 */
1471static bool drbd_pause_after(struct drbd_device *device)
1472{
1473        bool changed = false;
1474        struct drbd_device *odev;
1475        int i;
1476
1477        rcu_read_lock();
1478        idr_for_each_entry(&drbd_devices, odev, i) {
1479                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1480                        continue;
1481                if (!_drbd_may_sync_now(odev) &&
1482                    _drbd_set_state(_NS(odev, aftr_isp, 1),
1483                                    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1484                        changed = true;
1485        }
1486        rcu_read_unlock();
1487
1488        return changed;
1489}
1490
1491/**
1492 * drbd_resume_next() - Resume resync on all devices that may resync now
1493 * @device:     DRBD device.
1494 *
1495 * Called from process context only (admin command and worker).
1496 */
1497static bool drbd_resume_next(struct drbd_device *device)
1498{
1499        bool changed = false;
1500        struct drbd_device *odev;
1501        int i;
1502
1503        rcu_read_lock();
1504        idr_for_each_entry(&drbd_devices, odev, i) {
1505                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1506                        continue;
1507                if (odev->state.aftr_isp) {
1508                        if (_drbd_may_sync_now(odev) &&
1509                            _drbd_set_state(_NS(odev, aftr_isp, 0),
1510                                            CS_HARD, NULL) != SS_NOTHING_TO_DO)
1511                                changed = true;
1512                }
1513        }
1514        rcu_read_unlock();
1515        return changed;
1516}
1517
1518void resume_next_sg(struct drbd_device *device)
1519{
1520        lock_all_resources();
1521        drbd_resume_next(device);
1522        unlock_all_resources();
1523}
1524
1525void suspend_other_sg(struct drbd_device *device)
1526{
1527        lock_all_resources();
1528        drbd_pause_after(device);
1529        unlock_all_resources();
1530}
1531
1532/* caller must lock_all_resources() */
1533enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1534{
1535        struct drbd_device *odev;
1536        int resync_after;
1537
1538        if (o_minor == -1)
1539                return NO_ERROR;
1540        if (o_minor < -1 || o_minor > MINORMASK)
1541                return ERR_RESYNC_AFTER;
1542
1543        /* check for loops */
1544        odev = minor_to_device(o_minor);
1545        while (1) {
1546                if (odev == device)
1547                        return ERR_RESYNC_AFTER_CYCLE;
1548
1549                /* You are free to depend on diskless, non-existing,
1550                 * or not yet/no longer existing minors.
1551                 * We only reject dependency loops.
1552                 * We cannot follow the dependency chain beyond a detached or
1553                 * missing minor.
1554                 */
1555                if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1556                        return NO_ERROR;
1557
1558                rcu_read_lock();
1559                resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1560                rcu_read_unlock();
1561                /* dependency chain ends here, no cycles. */
1562                if (resync_after == -1)
1563                        return NO_ERROR;
1564
1565                /* follow the dependency chain */
1566                odev = minor_to_device(resync_after);
1567        }
1568}
1569
1570/* caller must lock_all_resources() */
1571void drbd_resync_after_changed(struct drbd_device *device)
1572{
1573        int changed;
1574
1575        do {
1576                changed  = drbd_pause_after(device);
1577                changed |= drbd_resume_next(device);
1578        } while (changed);
1579}
1580
1581void drbd_rs_controller_reset(struct drbd_device *device)
1582{
1583        struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1584        struct fifo_buffer *plan;
1585
1586        atomic_set(&device->rs_sect_in, 0);
1587        atomic_set(&device->rs_sect_ev, 0);
1588        device->rs_in_flight = 0;
1589        device->rs_last_events =
1590                (int)part_stat_read(&disk->part0, sectors[0]) +
1591                (int)part_stat_read(&disk->part0, sectors[1]);
1592
1593        /* Updating the RCU protected object in place is necessary since
1594           this function gets called from atomic context.
1595           It is valid since all other updates also lead to an completely
1596           empty fifo */
1597        rcu_read_lock();
1598        plan = rcu_dereference(device->rs_plan_s);
1599        plan->total = 0;
1600        fifo_set(plan, 0);
1601        rcu_read_unlock();
1602}
1603
1604void start_resync_timer_fn(unsigned long data)
1605{
1606        struct drbd_device *device = (struct drbd_device *) data;
1607        drbd_device_post_work(device, RS_START);
1608}
1609
1610static void do_start_resync(struct drbd_device *device)
1611{
1612        if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1613                drbd_warn(device, "postponing start_resync ...\n");
1614                device->start_resync_timer.expires = jiffies + HZ/10;
1615                add_timer(&device->start_resync_timer);
1616                return;
1617        }
1618
1619        drbd_start_resync(device, C_SYNC_SOURCE);
1620        clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1621}
1622
1623static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1624{
1625        bool csums_after_crash_only;
1626        rcu_read_lock();
1627        csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1628        rcu_read_unlock();
1629        return connection->agreed_pro_version >= 89 &&          /* supported? */
1630                connection->csums_tfm &&                        /* configured? */
1631                (csums_after_crash_only == 0                    /* use for each resync? */
1632                 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1633}
1634
1635/**
1636 * drbd_start_resync() - Start the resync process
1637 * @device:     DRBD device.
1638 * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1639 *
1640 * This function might bring you directly into one of the
1641 * C_PAUSED_SYNC_* states.
1642 */
1643void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1644{
1645        struct drbd_peer_device *peer_device = first_peer_device(device);
1646        struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1647        union drbd_state ns;
1648        int r;
1649
1650        if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1651                drbd_err(device, "Resync already running!\n");
1652                return;
1653        }
1654
1655        if (!test_bit(B_RS_H_DONE, &device->flags)) {
1656                if (side == C_SYNC_TARGET) {
1657                        /* Since application IO was locked out during C_WF_BITMAP_T and
1658                           C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1659                           we check that we might make the data inconsistent. */
1660                        r = drbd_khelper(device, "before-resync-target");
1661                        r = (r >> 8) & 0xff;
1662                        if (r > 0) {
1663                                drbd_info(device, "before-resync-target handler returned %d, "
1664                                         "dropping connection.\n", r);
1665                                conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1666                                return;
1667                        }
1668                } else /* C_SYNC_SOURCE */ {
1669                        r = drbd_khelper(device, "before-resync-source");
1670                        r = (r >> 8) & 0xff;
1671                        if (r > 0) {
1672                                if (r == 3) {
1673                                        drbd_info(device, "before-resync-source handler returned %d, "
1674                                                 "ignoring. Old userland tools?", r);
1675                                } else {
1676                                        drbd_info(device, "before-resync-source handler returned %d, "
1677                                                 "dropping connection.\n", r);
1678                                        conn_request_state(connection,
1679                                                           NS(conn, C_DISCONNECTING), CS_HARD);
1680                                        return;
1681                                }
1682                        }
1683                }
1684        }
1685
1686        if (current == connection->worker.task) {
1687                /* The worker should not sleep waiting for state_mutex,
1688                   that can take long */
1689                if (!mutex_trylock(device->state_mutex)) {
1690                        set_bit(B_RS_H_DONE, &device->flags);
1691                        device->start_resync_timer.expires = jiffies + HZ/5;
1692                        add_timer(&device->start_resync_timer);
1693                        return;
1694                }
1695        } else {
1696                mutex_lock(device->state_mutex);
1697        }
1698
1699        lock_all_resources();
1700        clear_bit(B_RS_H_DONE, &device->flags);
1701        /* Did some connection breakage or IO error race with us? */
1702        if (device->state.conn < C_CONNECTED
1703        || !get_ldev_if_state(device, D_NEGOTIATING)) {
1704                unlock_all_resources();
1705                goto out;
1706        }
1707
1708        ns = drbd_read_state(device);
1709
1710        ns.aftr_isp = !_drbd_may_sync_now(device);
1711
1712        ns.conn = side;
1713
1714        if (side == C_SYNC_TARGET)
1715                ns.disk = D_INCONSISTENT;
1716        else /* side == C_SYNC_SOURCE */
1717                ns.pdsk = D_INCONSISTENT;
1718
1719        r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1720        ns = drbd_read_state(device);
1721
1722        if (ns.conn < C_CONNECTED)
1723                r = SS_UNKNOWN_ERROR;
1724
1725        if (r == SS_SUCCESS) {
1726                unsigned long tw = drbd_bm_total_weight(device);
1727                unsigned long now = jiffies;
1728                int i;
1729
1730                device->rs_failed    = 0;
1731                device->rs_paused    = 0;
1732                device->rs_same_csum = 0;
1733                device->rs_last_sect_ev = 0;
1734                device->rs_total     = tw;
1735                device->rs_start     = now;
1736                for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1737                        device->rs_mark_left[i] = tw;
1738                        device->rs_mark_time[i] = now;
1739                }
1740                drbd_pause_after(device);
1741                /* Forget potentially stale cached per resync extent bit-counts.
1742                 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1743                 * disabled, and know the disk state is ok. */
1744                spin_lock(&device->al_lock);
1745                lc_reset(device->resync);
1746                device->resync_locked = 0;
1747                device->resync_wenr = LC_FREE;
1748                spin_unlock(&device->al_lock);
1749        }
1750        unlock_all_resources();
1751
1752        if (r == SS_SUCCESS) {
1753                wake_up(&device->al_wait); /* for lc_reset() above */
1754                /* reset rs_last_bcast when a resync or verify is started,
1755                 * to deal with potential jiffies wrap. */
1756                device->rs_last_bcast = jiffies - HZ;
1757
1758                drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1759                     drbd_conn_str(ns.conn),
1760                     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1761                     (unsigned long) device->rs_total);
1762                if (side == C_SYNC_TARGET) {
1763                        device->bm_resync_fo = 0;
1764                        device->use_csums = use_checksum_based_resync(connection, device);
1765                } else {
1766                        device->use_csums = 0;
1767                }
1768
1769                /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1770                 * with w_send_oos, or the sync target will get confused as to
1771                 * how much bits to resync.  We cannot do that always, because for an
1772                 * empty resync and protocol < 95, we need to do it here, as we call
1773                 * drbd_resync_finished from here in that case.
1774                 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1775                 * and from after_state_ch otherwise. */
1776                if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1777                        drbd_gen_and_send_sync_uuid(peer_device);
1778
1779                if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1780                        /* This still has a race (about when exactly the peers
1781                         * detect connection loss) that can lead to a full sync
1782                         * on next handshake. In 8.3.9 we fixed this with explicit
1783                         * resync-finished notifications, but the fix
1784                         * introduces a protocol change.  Sleeping for some
1785                         * time longer than the ping interval + timeout on the
1786                         * SyncSource, to give the SyncTarget the chance to
1787                         * detect connection loss, then waiting for a ping
1788                         * response (implicit in drbd_resync_finished) reduces
1789                         * the race considerably, but does not solve it. */
1790                        if (side == C_SYNC_SOURCE) {
1791                                struct net_conf *nc;
1792                                int timeo;
1793
1794                                rcu_read_lock();
1795                                nc = rcu_dereference(connection->net_conf);
1796                                timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1797                                rcu_read_unlock();
1798                                schedule_timeout_interruptible(timeo);
1799                        }
1800                        drbd_resync_finished(device);
1801                }
1802
1803                drbd_rs_controller_reset(device);
1804                /* ns.conn may already be != device->state.conn,
1805                 * we may have been paused in between, or become paused until
1806                 * the timer triggers.
1807                 * No matter, that is handled in resync_timer_fn() */
1808                if (ns.conn == C_SYNC_TARGET)
1809                        mod_timer(&device->resync_timer, jiffies);
1810
1811                drbd_md_sync(device);
1812        }
1813        put_ldev(device);
1814out:
1815        mutex_unlock(device->state_mutex);
1816}
1817
1818static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1819{
1820        struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1821        device->rs_last_bcast = jiffies;
1822
1823        if (!get_ldev(device))
1824                return;
1825
1826        drbd_bm_write_lazy(device, 0);
1827        if (resync_done && is_sync_state(device->state.conn))
1828                drbd_resync_finished(device);
1829
1830        drbd_bcast_event(device, &sib);
1831        /* update timestamp, in case it took a while to write out stuff */
1832        device->rs_last_bcast = jiffies;
1833        put_ldev(device);
1834}
1835
1836static void drbd_ldev_destroy(struct drbd_device *device)
1837{
1838        lc_destroy(device->resync);
1839        device->resync = NULL;
1840        lc_destroy(device->act_log);
1841        device->act_log = NULL;
1842
1843        __acquire(local);
1844        drbd_backing_dev_free(device, device->ldev);
1845        device->ldev = NULL;
1846        __release(local);
1847
1848        clear_bit(GOING_DISKLESS, &device->flags);
1849        wake_up(&device->misc_wait);
1850}
1851
1852static void go_diskless(struct drbd_device *device)
1853{
1854        D_ASSERT(device, device->state.disk == D_FAILED);
1855        /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1856         * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1857         * the protected members anymore, though, so once put_ldev reaches zero
1858         * again, it will be safe to free them. */
1859
1860        /* Try to write changed bitmap pages, read errors may have just
1861         * set some bits outside the area covered by the activity log.
1862         *
1863         * If we have an IO error during the bitmap writeout,
1864         * we will want a full sync next time, just in case.
1865         * (Do we want a specific meta data flag for this?)
1866         *
1867         * If that does not make it to stable storage either,
1868         * we cannot do anything about that anymore.
1869         *
1870         * We still need to check if both bitmap and ldev are present, we may
1871         * end up here after a failed attach, before ldev was even assigned.
1872         */
1873        if (device->bitmap && device->ldev) {
1874                /* An interrupted resync or similar is allowed to recounts bits
1875                 * while we detach.
1876                 * Any modifications would not be expected anymore, though.
1877                 */
1878                if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1879                                        "detach", BM_LOCKED_TEST_ALLOWED)) {
1880                        if (test_bit(WAS_READ_ERROR, &device->flags)) {
1881                                drbd_md_set_flag(device, MDF_FULL_SYNC);
1882                                drbd_md_sync(device);
1883                        }
1884                }
1885        }
1886
1887        drbd_force_state(device, NS(disk, D_DISKLESS));
1888}
1889
1890static int do_md_sync(struct drbd_device *device)
1891{
1892        drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1893        drbd_md_sync(device);
1894        return 0;
1895}
1896
1897/* only called from drbd_worker thread, no locking */
1898void __update_timing_details(
1899                struct drbd_thread_timing_details *tdp,
1900                unsigned int *cb_nr,
1901                void *cb,
1902                const char *fn, const unsigned int line)
1903{
1904        unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1905        struct drbd_thread_timing_details *td = tdp + i;
1906
1907        td->start_jif = jiffies;
1908        td->cb_addr = cb;
1909        td->caller_fn = fn;
1910        td->line = line;
1911        td->cb_nr = *cb_nr;
1912
1913        i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1914        td = tdp + i;
1915        memset(td, 0, sizeof(*td));
1916
1917        ++(*cb_nr);
1918}
1919
1920static void do_device_work(struct drbd_device *device, const unsigned long todo)
1921{
1922        if (test_bit(MD_SYNC, &todo))
1923                do_md_sync(device);
1924        if (test_bit(RS_DONE, &todo) ||
1925            test_bit(RS_PROGRESS, &todo))
1926                update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
1927        if (test_bit(GO_DISKLESS, &todo))
1928                go_diskless(device);
1929        if (test_bit(DESTROY_DISK, &todo))
1930                drbd_ldev_destroy(device);
1931        if (test_bit(RS_START, &todo))
1932                do_start_resync(device);
1933}
1934
1935#define DRBD_DEVICE_WORK_MASK   \
1936        ((1UL << GO_DISKLESS)   \
1937        |(1UL << DESTROY_DISK)  \
1938        |(1UL << MD_SYNC)       \
1939        |(1UL << RS_START)      \
1940        |(1UL << RS_PROGRESS)   \
1941        |(1UL << RS_DONE)       \
1942        )
1943
1944static unsigned long get_work_bits(unsigned long *flags)
1945{
1946        unsigned long old, new;
1947        do {
1948                old = *flags;
1949                new = old & ~DRBD_DEVICE_WORK_MASK;
1950        } while (cmpxchg(flags, old, new) != old);
1951        return old & DRBD_DEVICE_WORK_MASK;
1952}
1953
1954static void do_unqueued_work(struct drbd_connection *connection)
1955{
1956        struct drbd_peer_device *peer_device;
1957        int vnr;
1958
1959        rcu_read_lock();
1960        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1961                struct drbd_device *device = peer_device->device;
1962                unsigned long todo = get_work_bits(&device->flags);
1963                if (!todo)
1964                        continue;
1965
1966                kref_get(&device->kref);
1967                rcu_read_unlock();
1968                do_device_work(device, todo);
1969                kref_put(&device->kref, drbd_destroy_device);
1970                rcu_read_lock();
1971        }
1972        rcu_read_unlock();
1973}
1974
1975static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
1976{
1977        spin_lock_irq(&queue->q_lock);
1978        list_splice_tail_init(&queue->q, work_list);
1979        spin_unlock_irq(&queue->q_lock);
1980        return !list_empty(work_list);
1981}
1982
1983static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
1984{
1985        DEFINE_WAIT(wait);
1986        struct net_conf *nc;
1987        int uncork, cork;
1988
1989        dequeue_work_batch(&connection->sender_work, work_list);
1990        if (!list_empty(work_list))
1991                return;
1992
1993        /* Still nothing to do?
1994         * Maybe we still need to close the current epoch,
1995         * even if no new requests are queued yet.
1996         *
1997         * Also, poke TCP, just in case.
1998         * Then wait for new work (or signal). */
1999        rcu_read_lock();
2000        nc = rcu_dereference(connection->net_conf);
2001        uncork = nc ? nc->tcp_cork : 0;
2002        rcu_read_unlock();
2003        if (uncork) {
2004                mutex_lock(&connection->data.mutex);
2005                if (connection->data.socket)
2006                        drbd_tcp_uncork(connection->data.socket);
2007                mutex_unlock(&connection->data.mutex);
2008        }
2009
2010        for (;;) {
2011                int send_barrier;
2012                prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2013                spin_lock_irq(&connection->resource->req_lock);
2014                spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2015                if (!list_empty(&connection->sender_work.q))
2016                        list_splice_tail_init(&connection->sender_work.q, work_list);
2017                spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2018                if (!list_empty(work_list) || signal_pending(current)) {
2019                        spin_unlock_irq(&connection->resource->req_lock);
2020                        break;
2021                }
2022
2023                /* We found nothing new to do, no to-be-communicated request,
2024                 * no other work item.  We may still need to close the last
2025                 * epoch.  Next incoming request epoch will be connection ->
2026                 * current transfer log epoch number.  If that is different
2027                 * from the epoch of the last request we communicated, it is
2028                 * safe to send the epoch separating barrier now.
2029                 */
2030                send_barrier =
2031                        atomic_read(&connection->current_tle_nr) !=
2032                        connection->send.current_epoch_nr;
2033                spin_unlock_irq(&connection->resource->req_lock);
2034
2035                if (send_barrier)
2036                        maybe_send_barrier(connection,
2037                                        connection->send.current_epoch_nr + 1);
2038
2039                if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2040                        break;
2041
2042                /* drbd_send() may have called flush_signals() */
2043                if (get_t_state(&connection->worker) != RUNNING)
2044                        break;
2045
2046                schedule();
2047                /* may be woken up for other things but new work, too,
2048                 * e.g. if the current epoch got closed.
2049                 * In which case we send the barrier above. */
2050        }
2051        finish_wait(&connection->sender_work.q_wait, &wait);
2052
2053        /* someone may have changed the config while we have been waiting above. */
2054        rcu_read_lock();
2055        nc = rcu_dereference(connection->net_conf);
2056        cork = nc ? nc->tcp_cork : 0;
2057        rcu_read_unlock();
2058        mutex_lock(&connection->data.mutex);
2059        if (connection->data.socket) {
2060                if (cork)
2061                        drbd_tcp_cork(connection->data.socket);
2062                else if (!uncork)
2063                        drbd_tcp_uncork(connection->data.socket);
2064        }
2065        mutex_unlock(&connection->data.mutex);
2066}
2067
2068int drbd_worker(struct drbd_thread *thi)
2069{
2070        struct drbd_connection *connection = thi->connection;
2071        struct drbd_work *w = NULL;
2072        struct drbd_peer_device *peer_device;
2073        LIST_HEAD(work_list);
2074        int vnr;
2075
2076        while (get_t_state(thi) == RUNNING) {
2077                drbd_thread_current_set_cpu(thi);
2078
2079                if (list_empty(&work_list)) {
2080                        update_worker_timing_details(connection, wait_for_work);
2081                        wait_for_work(connection, &work_list);
2082                }
2083
2084                if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2085                        update_worker_timing_details(connection, do_unqueued_work);
2086                        do_unqueued_work(connection);
2087                }
2088
2089                if (signal_pending(current)) {
2090                        flush_signals(current);
2091                        if (get_t_state(thi) == RUNNING) {
2092                                drbd_warn(connection, "Worker got an unexpected signal\n");
2093                                continue;
2094                        }
2095                        break;
2096                }
2097
2098                if (get_t_state(thi) != RUNNING)
2099                        break;
2100
2101                if (!list_empty(&work_list)) {
2102                        w = list_first_entry(&work_list, struct drbd_work, list);
2103                        list_del_init(&w->list);
2104                        update_worker_timing_details(connection, w->cb);
2105                        if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2106                                continue;
2107                        if (connection->cstate >= C_WF_REPORT_PARAMS)
2108                                conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2109                }
2110        }
2111
2112        do {
2113                if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2114                        update_worker_timing_details(connection, do_unqueued_work);
2115                        do_unqueued_work(connection);
2116                }
2117                if (!list_empty(&work_list)) {
2118                        w = list_first_entry(&work_list, struct drbd_work, list);
2119                        list_del_init(&w->list);
2120                        update_worker_timing_details(connection, w->cb);
2121                        w->cb(w, 1);
2122                } else
2123                        dequeue_work_batch(&connection->sender_work, &work_list);
2124        } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2125
2126        rcu_read_lock();
2127        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2128                struct drbd_device *device = peer_device->device;
2129                D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2130                kref_get(&device->kref);
2131                rcu_read_unlock();
2132                drbd_device_cleanup(device);
2133                kref_put(&device->kref, drbd_destroy_device);
2134                rcu_read_lock();
2135        }
2136        rcu_read_unlock();
2137
2138        return 0;
2139}
2140