linux/drivers/block/drbd/drbd_worker.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3   drbd_worker.c
   4
   5   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   6
   7   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   8   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   9   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
  10
  11
  12*/
  13
  14#include <linux/module.h>
  15#include <linux/drbd.h>
  16#include <linux/sched/signal.h>
  17#include <linux/wait.h>
  18#include <linux/mm.h>
  19#include <linux/memcontrol.h>
  20#include <linux/mm_inline.h>
  21#include <linux/slab.h>
  22#include <linux/random.h>
  23#include <linux/string.h>
  24#include <linux/scatterlist.h>
  25#include <linux/part_stat.h>
  26
  27#include "drbd_int.h"
  28#include "drbd_protocol.h"
  29#include "drbd_req.h"
  30
  31static int make_ov_request(struct drbd_device *, int);
  32static int make_resync_request(struct drbd_device *, int);
  33
  34/* endio handlers:
  35 *   drbd_md_endio (defined here)
  36 *   drbd_request_endio (defined here)
  37 *   drbd_peer_request_endio (defined here)
  38 *   drbd_bm_endio (defined in drbd_bitmap.c)
  39 *
  40 * For all these callbacks, note the following:
  41 * The callbacks will be called in irq context by the IDE drivers,
  42 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
  43 * Try to get the locking right :)
  44 *
  45 */
  46
  47/* used for synchronous meta data and bitmap IO
  48 * submitted by drbd_md_sync_page_io()
  49 */
  50void drbd_md_endio(struct bio *bio)
  51{
  52        struct drbd_device *device;
  53
  54        device = bio->bi_private;
  55        device->md_io.error = blk_status_to_errno(bio->bi_status);
  56
  57        /* special case: drbd_md_read() during drbd_adm_attach() */
  58        if (device->ldev)
  59                put_ldev(device);
  60        bio_put(bio);
  61
  62        /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
  63         * to timeout on the lower level device, and eventually detach from it.
  64         * If this io completion runs after that timeout expired, this
  65         * drbd_md_put_buffer() may allow us to finally try and re-attach.
  66         * During normal operation, this only puts that extra reference
  67         * down to 1 again.
  68         * Make sure we first drop the reference, and only then signal
  69         * completion, or we may (in drbd_al_read_log()) cycle so fast into the
  70         * next drbd_md_sync_page_io(), that we trigger the
  71         * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
  72         */
  73        drbd_md_put_buffer(device);
  74        device->md_io.done = 1;
  75        wake_up(&device->misc_wait);
  76}
  77
  78/* reads on behalf of the partner,
  79 * "submitted" by the receiver
  80 */
  81static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
  82{
  83        unsigned long flags = 0;
  84        struct drbd_peer_device *peer_device = peer_req->peer_device;
  85        struct drbd_device *device = peer_device->device;
  86
  87        spin_lock_irqsave(&device->resource->req_lock, flags);
  88        device->read_cnt += peer_req->i.size >> 9;
  89        list_del(&peer_req->w.list);
  90        if (list_empty(&device->read_ee))
  91                wake_up(&device->ee_wait);
  92        if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
  93                __drbd_chk_io_error(device, DRBD_READ_ERROR);
  94        spin_unlock_irqrestore(&device->resource->req_lock, flags);
  95
  96        drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
  97        put_ldev(device);
  98}
  99
 100/* writes on behalf of the partner, or resync writes,
 101 * "submitted" by the receiver, final stage.  */
 102void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
 103{
 104        unsigned long flags = 0;
 105        struct drbd_peer_device *peer_device = peer_req->peer_device;
 106        struct drbd_device *device = peer_device->device;
 107        struct drbd_connection *connection = peer_device->connection;
 108        struct drbd_interval i;
 109        int do_wake;
 110        u64 block_id;
 111        int do_al_complete_io;
 112
 113        /* after we moved peer_req to done_ee,
 114         * we may no longer access it,
 115         * it may be freed/reused already!
 116         * (as soon as we release the req_lock) */
 117        i = peer_req->i;
 118        do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
 119        block_id = peer_req->block_id;
 120        peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
 121
 122        if (peer_req->flags & EE_WAS_ERROR) {
 123                /* In protocol != C, we usually do not send write acks.
 124                 * In case of a write error, send the neg ack anyways. */
 125                if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
 126                        inc_unacked(device);
 127                drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
 128        }
 129
 130        spin_lock_irqsave(&device->resource->req_lock, flags);
 131        device->writ_cnt += peer_req->i.size >> 9;
 132        list_move_tail(&peer_req->w.list, &device->done_ee);
 133
 134        /*
 135         * Do not remove from the write_requests tree here: we did not send the
 136         * Ack yet and did not wake possibly waiting conflicting requests.
 137         * Removed from the tree from "drbd_process_done_ee" within the
 138         * appropriate dw.cb (e_end_block/e_end_resync_block) or from
 139         * _drbd_clear_done_ee.
 140         */
 141
 142        do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
 143
 144        /* FIXME do we want to detach for failed REQ_OP_DISCARD?
 145         * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
 146        if (peer_req->flags & EE_WAS_ERROR)
 147                __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
 148
 149        if (connection->cstate >= C_WF_REPORT_PARAMS) {
 150                kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
 151                if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
 152                        kref_put(&device->kref, drbd_destroy_device);
 153        }
 154        spin_unlock_irqrestore(&device->resource->req_lock, flags);
 155
 156        if (block_id == ID_SYNCER)
 157                drbd_rs_complete_io(device, i.sector);
 158
 159        if (do_wake)
 160                wake_up(&device->ee_wait);
 161
 162        if (do_al_complete_io)
 163                drbd_al_complete_io(device, &i);
 164
 165        put_ldev(device);
 166}
 167
 168/* writes on behalf of the partner, or resync writes,
 169 * "submitted" by the receiver.
 170 */
 171void drbd_peer_request_endio(struct bio *bio)
 172{
 173        struct drbd_peer_request *peer_req = bio->bi_private;
 174        struct drbd_device *device = peer_req->peer_device->device;
 175        bool is_write = bio_data_dir(bio) == WRITE;
 176        bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
 177                          bio_op(bio) == REQ_OP_DISCARD;
 178
 179        if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
 180                drbd_warn(device, "%s: error=%d s=%llus\n",
 181                                is_write ? (is_discard ? "discard" : "write")
 182                                        : "read", bio->bi_status,
 183                                (unsigned long long)peer_req->i.sector);
 184
 185        if (bio->bi_status)
 186                set_bit(__EE_WAS_ERROR, &peer_req->flags);
 187
 188        bio_put(bio); /* no need for the bio anymore */
 189        if (atomic_dec_and_test(&peer_req->pending_bios)) {
 190                if (is_write)
 191                        drbd_endio_write_sec_final(peer_req);
 192                else
 193                        drbd_endio_read_sec_final(peer_req);
 194        }
 195}
 196
 197static void
 198drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
 199{
 200        panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
 201                device->minor, device->resource->name, device->vnr);
 202}
 203
 204/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
 205 */
 206void drbd_request_endio(struct bio *bio)
 207{
 208        unsigned long flags;
 209        struct drbd_request *req = bio->bi_private;
 210        struct drbd_device *device = req->device;
 211        struct bio_and_error m;
 212        enum drbd_req_event what;
 213
 214        /* If this request was aborted locally before,
 215         * but now was completed "successfully",
 216         * chances are that this caused arbitrary data corruption.
 217         *
 218         * "aborting" requests, or force-detaching the disk, is intended for
 219         * completely blocked/hung local backing devices which do no longer
 220         * complete requests at all, not even do error completions.  In this
 221         * situation, usually a hard-reset and failover is the only way out.
 222         *
 223         * By "aborting", basically faking a local error-completion,
 224         * we allow for a more graceful swichover by cleanly migrating services.
 225         * Still the affected node has to be rebooted "soon".
 226         *
 227         * By completing these requests, we allow the upper layers to re-use
 228         * the associated data pages.
 229         *
 230         * If later the local backing device "recovers", and now DMAs some data
 231         * from disk into the original request pages, in the best case it will
 232         * just put random data into unused pages; but typically it will corrupt
 233         * meanwhile completely unrelated data, causing all sorts of damage.
 234         *
 235         * Which means delayed successful completion,
 236         * especially for READ requests,
 237         * is a reason to panic().
 238         *
 239         * We assume that a delayed *error* completion is OK,
 240         * though we still will complain noisily about it.
 241         */
 242        if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
 243                if (__ratelimit(&drbd_ratelimit_state))
 244                        drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
 245
 246                if (!bio->bi_status)
 247                        drbd_panic_after_delayed_completion_of_aborted_request(device);
 248        }
 249
 250        /* to avoid recursion in __req_mod */
 251        if (unlikely(bio->bi_status)) {
 252                switch (bio_op(bio)) {
 253                case REQ_OP_WRITE_ZEROES:
 254                case REQ_OP_DISCARD:
 255                        if (bio->bi_status == BLK_STS_NOTSUPP)
 256                                what = DISCARD_COMPLETED_NOTSUPP;
 257                        else
 258                                what = DISCARD_COMPLETED_WITH_ERROR;
 259                        break;
 260                case REQ_OP_READ:
 261                        if (bio->bi_opf & REQ_RAHEAD)
 262                                what = READ_AHEAD_COMPLETED_WITH_ERROR;
 263                        else
 264                                what = READ_COMPLETED_WITH_ERROR;
 265                        break;
 266                default:
 267                        what = WRITE_COMPLETED_WITH_ERROR;
 268                        break;
 269                }
 270        } else {
 271                what = COMPLETED_OK;
 272        }
 273
 274        req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
 275        bio_put(bio);
 276
 277        /* not req_mod(), we need irqsave here! */
 278        spin_lock_irqsave(&device->resource->req_lock, flags);
 279        __req_mod(req, what, &m);
 280        spin_unlock_irqrestore(&device->resource->req_lock, flags);
 281        put_ldev(device);
 282
 283        if (m.bio)
 284                complete_master_bio(device, &m);
 285}
 286
 287void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
 288{
 289        SHASH_DESC_ON_STACK(desc, tfm);
 290        struct page *page = peer_req->pages;
 291        struct page *tmp;
 292        unsigned len;
 293        void *src;
 294
 295        desc->tfm = tfm;
 296
 297        crypto_shash_init(desc);
 298
 299        src = kmap_atomic(page);
 300        while ((tmp = page_chain_next(page))) {
 301                /* all but the last page will be fully used */
 302                crypto_shash_update(desc, src, PAGE_SIZE);
 303                kunmap_atomic(src);
 304                page = tmp;
 305                src = kmap_atomic(page);
 306        }
 307        /* and now the last, possibly only partially used page */
 308        len = peer_req->i.size & (PAGE_SIZE - 1);
 309        crypto_shash_update(desc, src, len ?: PAGE_SIZE);
 310        kunmap_atomic(src);
 311
 312        crypto_shash_final(desc, digest);
 313        shash_desc_zero(desc);
 314}
 315
 316void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
 317{
 318        SHASH_DESC_ON_STACK(desc, tfm);
 319        struct bio_vec bvec;
 320        struct bvec_iter iter;
 321
 322        desc->tfm = tfm;
 323
 324        crypto_shash_init(desc);
 325
 326        bio_for_each_segment(bvec, bio, iter) {
 327                u8 *src;
 328
 329                src = kmap_atomic(bvec.bv_page);
 330                crypto_shash_update(desc, src + bvec.bv_offset, bvec.bv_len);
 331                kunmap_atomic(src);
 332
 333                /* REQ_OP_WRITE_SAME has only one segment,
 334                 * checksum the payload only once. */
 335                if (bio_op(bio) == REQ_OP_WRITE_SAME)
 336                        break;
 337        }
 338        crypto_shash_final(desc, digest);
 339        shash_desc_zero(desc);
 340}
 341
 342/* MAYBE merge common code with w_e_end_ov_req */
 343static int w_e_send_csum(struct drbd_work *w, int cancel)
 344{
 345        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
 346        struct drbd_peer_device *peer_device = peer_req->peer_device;
 347        struct drbd_device *device = peer_device->device;
 348        int digest_size;
 349        void *digest;
 350        int err = 0;
 351
 352        if (unlikely(cancel))
 353                goto out;
 354
 355        if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
 356                goto out;
 357
 358        digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
 359        digest = kmalloc(digest_size, GFP_NOIO);
 360        if (digest) {
 361                sector_t sector = peer_req->i.sector;
 362                unsigned int size = peer_req->i.size;
 363                drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
 364                /* Free peer_req and pages before send.
 365                 * In case we block on congestion, we could otherwise run into
 366                 * some distributed deadlock, if the other side blocks on
 367                 * congestion as well, because our receiver blocks in
 368                 * drbd_alloc_pages due to pp_in_use > max_buffers. */
 369                drbd_free_peer_req(device, peer_req);
 370                peer_req = NULL;
 371                inc_rs_pending(device);
 372                err = drbd_send_drequest_csum(peer_device, sector, size,
 373                                              digest, digest_size,
 374                                              P_CSUM_RS_REQUEST);
 375                kfree(digest);
 376        } else {
 377                drbd_err(device, "kmalloc() of digest failed.\n");
 378                err = -ENOMEM;
 379        }
 380
 381out:
 382        if (peer_req)
 383                drbd_free_peer_req(device, peer_req);
 384
 385        if (unlikely(err))
 386                drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
 387        return err;
 388}
 389
 390#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
 391
 392static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
 393{
 394        struct drbd_device *device = peer_device->device;
 395        struct drbd_peer_request *peer_req;
 396
 397        if (!get_ldev(device))
 398                return -EIO;
 399
 400        /* GFP_TRY, because if there is no memory available right now, this may
 401         * be rescheduled for later. It is "only" background resync, after all. */
 402        peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
 403                                       size, size, GFP_TRY);
 404        if (!peer_req)
 405                goto defer;
 406
 407        peer_req->w.cb = w_e_send_csum;
 408        spin_lock_irq(&device->resource->req_lock);
 409        list_add_tail(&peer_req->w.list, &device->read_ee);
 410        spin_unlock_irq(&device->resource->req_lock);
 411
 412        atomic_add(size >> 9, &device->rs_sect_ev);
 413        if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
 414                                     DRBD_FAULT_RS_RD) == 0)
 415                return 0;
 416
 417        /* If it failed because of ENOMEM, retry should help.  If it failed
 418         * because bio_add_page failed (probably broken lower level driver),
 419         * retry may or may not help.
 420         * If it does not, you may need to force disconnect. */
 421        spin_lock_irq(&device->resource->req_lock);
 422        list_del(&peer_req->w.list);
 423        spin_unlock_irq(&device->resource->req_lock);
 424
 425        drbd_free_peer_req(device, peer_req);
 426defer:
 427        put_ldev(device);
 428        return -EAGAIN;
 429}
 430
 431int w_resync_timer(struct drbd_work *w, int cancel)
 432{
 433        struct drbd_device *device =
 434                container_of(w, struct drbd_device, resync_work);
 435
 436        switch (device->state.conn) {
 437        case C_VERIFY_S:
 438                make_ov_request(device, cancel);
 439                break;
 440        case C_SYNC_TARGET:
 441                make_resync_request(device, cancel);
 442                break;
 443        }
 444
 445        return 0;
 446}
 447
 448void resync_timer_fn(struct timer_list *t)
 449{
 450        struct drbd_device *device = from_timer(device, t, resync_timer);
 451
 452        drbd_queue_work_if_unqueued(
 453                &first_peer_device(device)->connection->sender_work,
 454                &device->resync_work);
 455}
 456
 457static void fifo_set(struct fifo_buffer *fb, int value)
 458{
 459        int i;
 460
 461        for (i = 0; i < fb->size; i++)
 462                fb->values[i] = value;
 463}
 464
 465static int fifo_push(struct fifo_buffer *fb, int value)
 466{
 467        int ov;
 468
 469        ov = fb->values[fb->head_index];
 470        fb->values[fb->head_index++] = value;
 471
 472        if (fb->head_index >= fb->size)
 473                fb->head_index = 0;
 474
 475        return ov;
 476}
 477
 478static void fifo_add_val(struct fifo_buffer *fb, int value)
 479{
 480        int i;
 481
 482        for (i = 0; i < fb->size; i++)
 483                fb->values[i] += value;
 484}
 485
 486struct fifo_buffer *fifo_alloc(unsigned int fifo_size)
 487{
 488        struct fifo_buffer *fb;
 489
 490        fb = kzalloc(struct_size(fb, values, fifo_size), GFP_NOIO);
 491        if (!fb)
 492                return NULL;
 493
 494        fb->head_index = 0;
 495        fb->size = fifo_size;
 496        fb->total = 0;
 497
 498        return fb;
 499}
 500
 501static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
 502{
 503        struct disk_conf *dc;
 504        unsigned int want;     /* The number of sectors we want in-flight */
 505        int req_sect; /* Number of sectors to request in this turn */
 506        int correction; /* Number of sectors more we need in-flight */
 507        int cps; /* correction per invocation of drbd_rs_controller() */
 508        int steps; /* Number of time steps to plan ahead */
 509        int curr_corr;
 510        int max_sect;
 511        struct fifo_buffer *plan;
 512
 513        dc = rcu_dereference(device->ldev->disk_conf);
 514        plan = rcu_dereference(device->rs_plan_s);
 515
 516        steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
 517
 518        if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
 519                want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
 520        } else { /* normal path */
 521                want = dc->c_fill_target ? dc->c_fill_target :
 522                        sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
 523        }
 524
 525        correction = want - device->rs_in_flight - plan->total;
 526
 527        /* Plan ahead */
 528        cps = correction / steps;
 529        fifo_add_val(plan, cps);
 530        plan->total += cps * steps;
 531
 532        /* What we do in this step */
 533        curr_corr = fifo_push(plan, 0);
 534        plan->total -= curr_corr;
 535
 536        req_sect = sect_in + curr_corr;
 537        if (req_sect < 0)
 538                req_sect = 0;
 539
 540        max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
 541        if (req_sect > max_sect)
 542                req_sect = max_sect;
 543
 544        /*
 545        drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
 546                 sect_in, device->rs_in_flight, want, correction,
 547                 steps, cps, device->rs_planed, curr_corr, req_sect);
 548        */
 549
 550        return req_sect;
 551}
 552
 553static int drbd_rs_number_requests(struct drbd_device *device)
 554{
 555        unsigned int sect_in;  /* Number of sectors that came in since the last turn */
 556        int number, mxb;
 557
 558        sect_in = atomic_xchg(&device->rs_sect_in, 0);
 559        device->rs_in_flight -= sect_in;
 560
 561        rcu_read_lock();
 562        mxb = drbd_get_max_buffers(device) / 2;
 563        if (rcu_dereference(device->rs_plan_s)->size) {
 564                number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
 565                device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
 566        } else {
 567                device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
 568                number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
 569        }
 570        rcu_read_unlock();
 571
 572        /* Don't have more than "max-buffers"/2 in-flight.
 573         * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
 574         * potentially causing a distributed deadlock on congestion during
 575         * online-verify or (checksum-based) resync, if max-buffers,
 576         * socket buffer sizes and resync rate settings are mis-configured. */
 577
 578        /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
 579         * mxb (as used here, and in drbd_alloc_pages on the peer) is
 580         * "number of pages" (typically also 4k),
 581         * but "rs_in_flight" is in "sectors" (512 Byte). */
 582        if (mxb - device->rs_in_flight/8 < number)
 583                number = mxb - device->rs_in_flight/8;
 584
 585        return number;
 586}
 587
 588static int make_resync_request(struct drbd_device *const device, int cancel)
 589{
 590        struct drbd_peer_device *const peer_device = first_peer_device(device);
 591        struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
 592        unsigned long bit;
 593        sector_t sector;
 594        const sector_t capacity = get_capacity(device->vdisk);
 595        int max_bio_size;
 596        int number, rollback_i, size;
 597        int align, requeue = 0;
 598        int i = 0;
 599        int discard_granularity = 0;
 600
 601        if (unlikely(cancel))
 602                return 0;
 603
 604        if (device->rs_total == 0) {
 605                /* empty resync? */
 606                drbd_resync_finished(device);
 607                return 0;
 608        }
 609
 610        if (!get_ldev(device)) {
 611                /* Since we only need to access device->rsync a
 612                   get_ldev_if_state(device,D_FAILED) would be sufficient, but
 613                   to continue resync with a broken disk makes no sense at
 614                   all */
 615                drbd_err(device, "Disk broke down during resync!\n");
 616                return 0;
 617        }
 618
 619        if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
 620                rcu_read_lock();
 621                discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
 622                rcu_read_unlock();
 623        }
 624
 625        max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
 626        number = drbd_rs_number_requests(device);
 627        if (number <= 0)
 628                goto requeue;
 629
 630        for (i = 0; i < number; i++) {
 631                /* Stop generating RS requests when half of the send buffer is filled,
 632                 * but notify TCP that we'd like to have more space. */
 633                mutex_lock(&connection->data.mutex);
 634                if (connection->data.socket) {
 635                        struct sock *sk = connection->data.socket->sk;
 636                        int queued = sk->sk_wmem_queued;
 637                        int sndbuf = sk->sk_sndbuf;
 638                        if (queued > sndbuf / 2) {
 639                                requeue = 1;
 640                                if (sk->sk_socket)
 641                                        set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 642                        }
 643                } else
 644                        requeue = 1;
 645                mutex_unlock(&connection->data.mutex);
 646                if (requeue)
 647                        goto requeue;
 648
 649next_sector:
 650                size = BM_BLOCK_SIZE;
 651                bit  = drbd_bm_find_next(device, device->bm_resync_fo);
 652
 653                if (bit == DRBD_END_OF_BITMAP) {
 654                        device->bm_resync_fo = drbd_bm_bits(device);
 655                        put_ldev(device);
 656                        return 0;
 657                }
 658
 659                sector = BM_BIT_TO_SECT(bit);
 660
 661                if (drbd_try_rs_begin_io(device, sector)) {
 662                        device->bm_resync_fo = bit;
 663                        goto requeue;
 664                }
 665                device->bm_resync_fo = bit + 1;
 666
 667                if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
 668                        drbd_rs_complete_io(device, sector);
 669                        goto next_sector;
 670                }
 671
 672#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
 673                /* try to find some adjacent bits.
 674                 * we stop if we have already the maximum req size.
 675                 *
 676                 * Additionally always align bigger requests, in order to
 677                 * be prepared for all stripe sizes of software RAIDs.
 678                 */
 679                align = 1;
 680                rollback_i = i;
 681                while (i < number) {
 682                        if (size + BM_BLOCK_SIZE > max_bio_size)
 683                                break;
 684
 685                        /* Be always aligned */
 686                        if (sector & ((1<<(align+3))-1))
 687                                break;
 688
 689                        if (discard_granularity && size == discard_granularity)
 690                                break;
 691
 692                        /* do not cross extent boundaries */
 693                        if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
 694                                break;
 695                        /* now, is it actually dirty, after all?
 696                         * caution, drbd_bm_test_bit is tri-state for some
 697                         * obscure reason; ( b == 0 ) would get the out-of-band
 698                         * only accidentally right because of the "oddly sized"
 699                         * adjustment below */
 700                        if (drbd_bm_test_bit(device, bit+1) != 1)
 701                                break;
 702                        bit++;
 703                        size += BM_BLOCK_SIZE;
 704                        if ((BM_BLOCK_SIZE << align) <= size)
 705                                align++;
 706                        i++;
 707                }
 708                /* if we merged some,
 709                 * reset the offset to start the next drbd_bm_find_next from */
 710                if (size > BM_BLOCK_SIZE)
 711                        device->bm_resync_fo = bit + 1;
 712#endif
 713
 714                /* adjust very last sectors, in case we are oddly sized */
 715                if (sector + (size>>9) > capacity)
 716                        size = (capacity-sector)<<9;
 717
 718                if (device->use_csums) {
 719                        switch (read_for_csum(peer_device, sector, size)) {
 720                        case -EIO: /* Disk failure */
 721                                put_ldev(device);
 722                                return -EIO;
 723                        case -EAGAIN: /* allocation failed, or ldev busy */
 724                                drbd_rs_complete_io(device, sector);
 725                                device->bm_resync_fo = BM_SECT_TO_BIT(sector);
 726                                i = rollback_i;
 727                                goto requeue;
 728                        case 0:
 729                                /* everything ok */
 730                                break;
 731                        default:
 732                                BUG();
 733                        }
 734                } else {
 735                        int err;
 736
 737                        inc_rs_pending(device);
 738                        err = drbd_send_drequest(peer_device,
 739                                                 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
 740                                                 sector, size, ID_SYNCER);
 741                        if (err) {
 742                                drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
 743                                dec_rs_pending(device);
 744                                put_ldev(device);
 745                                return err;
 746                        }
 747                }
 748        }
 749
 750        if (device->bm_resync_fo >= drbd_bm_bits(device)) {
 751                /* last syncer _request_ was sent,
 752                 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
 753                 * next sync group will resume), as soon as we receive the last
 754                 * resync data block, and the last bit is cleared.
 755                 * until then resync "work" is "inactive" ...
 756                 */
 757                put_ldev(device);
 758                return 0;
 759        }
 760
 761 requeue:
 762        device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 763        mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 764        put_ldev(device);
 765        return 0;
 766}
 767
 768static int make_ov_request(struct drbd_device *device, int cancel)
 769{
 770        int number, i, size;
 771        sector_t sector;
 772        const sector_t capacity = get_capacity(device->vdisk);
 773        bool stop_sector_reached = false;
 774
 775        if (unlikely(cancel))
 776                return 1;
 777
 778        number = drbd_rs_number_requests(device);
 779
 780        sector = device->ov_position;
 781        for (i = 0; i < number; i++) {
 782                if (sector >= capacity)
 783                        return 1;
 784
 785                /* We check for "finished" only in the reply path:
 786                 * w_e_end_ov_reply().
 787                 * We need to send at least one request out. */
 788                stop_sector_reached = i > 0
 789                        && verify_can_do_stop_sector(device)
 790                        && sector >= device->ov_stop_sector;
 791                if (stop_sector_reached)
 792                        break;
 793
 794                size = BM_BLOCK_SIZE;
 795
 796                if (drbd_try_rs_begin_io(device, sector)) {
 797                        device->ov_position = sector;
 798                        goto requeue;
 799                }
 800
 801                if (sector + (size>>9) > capacity)
 802                        size = (capacity-sector)<<9;
 803
 804                inc_rs_pending(device);
 805                if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
 806                        dec_rs_pending(device);
 807                        return 0;
 808                }
 809                sector += BM_SECT_PER_BIT;
 810        }
 811        device->ov_position = sector;
 812
 813 requeue:
 814        device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 815        if (i == 0 || !stop_sector_reached)
 816                mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 817        return 1;
 818}
 819
 820int w_ov_finished(struct drbd_work *w, int cancel)
 821{
 822        struct drbd_device_work *dw =
 823                container_of(w, struct drbd_device_work, w);
 824        struct drbd_device *device = dw->device;
 825        kfree(dw);
 826        ov_out_of_sync_print(device);
 827        drbd_resync_finished(device);
 828
 829        return 0;
 830}
 831
 832static int w_resync_finished(struct drbd_work *w, int cancel)
 833{
 834        struct drbd_device_work *dw =
 835                container_of(w, struct drbd_device_work, w);
 836        struct drbd_device *device = dw->device;
 837        kfree(dw);
 838
 839        drbd_resync_finished(device);
 840
 841        return 0;
 842}
 843
 844static void ping_peer(struct drbd_device *device)
 845{
 846        struct drbd_connection *connection = first_peer_device(device)->connection;
 847
 848        clear_bit(GOT_PING_ACK, &connection->flags);
 849        request_ping(connection);
 850        wait_event(connection->ping_wait,
 851                   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
 852}
 853
 854int drbd_resync_finished(struct drbd_device *device)
 855{
 856        struct drbd_connection *connection = first_peer_device(device)->connection;
 857        unsigned long db, dt, dbdt;
 858        unsigned long n_oos;
 859        union drbd_state os, ns;
 860        struct drbd_device_work *dw;
 861        char *khelper_cmd = NULL;
 862        int verify_done = 0;
 863
 864        /* Remove all elements from the resync LRU. Since future actions
 865         * might set bits in the (main) bitmap, then the entries in the
 866         * resync LRU would be wrong. */
 867        if (drbd_rs_del_all(device)) {
 868                /* In case this is not possible now, most probably because
 869                 * there are P_RS_DATA_REPLY Packets lingering on the worker's
 870                 * queue (or even the read operations for those packets
 871                 * is not finished by now).   Retry in 100ms. */
 872
 873                schedule_timeout_interruptible(HZ / 10);
 874                dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
 875                if (dw) {
 876                        dw->w.cb = w_resync_finished;
 877                        dw->device = device;
 878                        drbd_queue_work(&connection->sender_work, &dw->w);
 879                        return 1;
 880                }
 881                drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
 882        }
 883
 884        dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
 885        if (dt <= 0)
 886                dt = 1;
 887
 888        db = device->rs_total;
 889        /* adjust for verify start and stop sectors, respective reached position */
 890        if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
 891                db -= device->ov_left;
 892
 893        dbdt = Bit2KB(db/dt);
 894        device->rs_paused /= HZ;
 895
 896        if (!get_ldev(device))
 897                goto out;
 898
 899        ping_peer(device);
 900
 901        spin_lock_irq(&device->resource->req_lock);
 902        os = drbd_read_state(device);
 903
 904        verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
 905
 906        /* This protects us against multiple calls (that can happen in the presence
 907           of application IO), and against connectivity loss just before we arrive here. */
 908        if (os.conn <= C_CONNECTED)
 909                goto out_unlock;
 910
 911        ns = os;
 912        ns.conn = C_CONNECTED;
 913
 914        drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
 915             verify_done ? "Online verify" : "Resync",
 916             dt + device->rs_paused, device->rs_paused, dbdt);
 917
 918        n_oos = drbd_bm_total_weight(device);
 919
 920        if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
 921                if (n_oos) {
 922                        drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
 923                              n_oos, Bit2KB(1));
 924                        khelper_cmd = "out-of-sync";
 925                }
 926        } else {
 927                D_ASSERT(device, (n_oos - device->rs_failed) == 0);
 928
 929                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
 930                        khelper_cmd = "after-resync-target";
 931
 932                if (device->use_csums && device->rs_total) {
 933                        const unsigned long s = device->rs_same_csum;
 934                        const unsigned long t = device->rs_total;
 935                        const int ratio =
 936                                (t == 0)     ? 0 :
 937                        (t < 100000) ? ((s*100)/t) : (s/(t/100));
 938                        drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
 939                             "transferred %luK total %luK\n",
 940                             ratio,
 941                             Bit2KB(device->rs_same_csum),
 942                             Bit2KB(device->rs_total - device->rs_same_csum),
 943                             Bit2KB(device->rs_total));
 944                }
 945        }
 946
 947        if (device->rs_failed) {
 948                drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
 949
 950                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 951                        ns.disk = D_INCONSISTENT;
 952                        ns.pdsk = D_UP_TO_DATE;
 953                } else {
 954                        ns.disk = D_UP_TO_DATE;
 955                        ns.pdsk = D_INCONSISTENT;
 956                }
 957        } else {
 958                ns.disk = D_UP_TO_DATE;
 959                ns.pdsk = D_UP_TO_DATE;
 960
 961                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 962                        if (device->p_uuid) {
 963                                int i;
 964                                for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
 965                                        _drbd_uuid_set(device, i, device->p_uuid[i]);
 966                                drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
 967                                _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
 968                        } else {
 969                                drbd_err(device, "device->p_uuid is NULL! BUG\n");
 970                        }
 971                }
 972
 973                if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
 974                        /* for verify runs, we don't update uuids here,
 975                         * so there would be nothing to report. */
 976                        drbd_uuid_set_bm(device, 0UL);
 977                        drbd_print_uuids(device, "updated UUIDs");
 978                        if (device->p_uuid) {
 979                                /* Now the two UUID sets are equal, update what we
 980                                 * know of the peer. */
 981                                int i;
 982                                for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
 983                                        device->p_uuid[i] = device->ldev->md.uuid[i];
 984                        }
 985                }
 986        }
 987
 988        _drbd_set_state(device, ns, CS_VERBOSE, NULL);
 989out_unlock:
 990        spin_unlock_irq(&device->resource->req_lock);
 991
 992        /* If we have been sync source, and have an effective fencing-policy,
 993         * once *all* volumes are back in sync, call "unfence". */
 994        if (os.conn == C_SYNC_SOURCE) {
 995                enum drbd_disk_state disk_state = D_MASK;
 996                enum drbd_disk_state pdsk_state = D_MASK;
 997                enum drbd_fencing_p fp = FP_DONT_CARE;
 998
 999                rcu_read_lock();
1000                fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1001                if (fp != FP_DONT_CARE) {
1002                        struct drbd_peer_device *peer_device;
1003                        int vnr;
1004                        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1005                                struct drbd_device *device = peer_device->device;
1006                                disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1007                                pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1008                        }
1009                }
1010                rcu_read_unlock();
1011                if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1012                        conn_khelper(connection, "unfence-peer");
1013        }
1014
1015        put_ldev(device);
1016out:
1017        device->rs_total  = 0;
1018        device->rs_failed = 0;
1019        device->rs_paused = 0;
1020
1021        /* reset start sector, if we reached end of device */
1022        if (verify_done && device->ov_left == 0)
1023                device->ov_start_sector = 0;
1024
1025        drbd_md_sync(device);
1026
1027        if (khelper_cmd)
1028                drbd_khelper(device, khelper_cmd);
1029
1030        return 1;
1031}
1032
1033/* helper */
1034static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1035{
1036        if (drbd_peer_req_has_active_page(peer_req)) {
1037                /* This might happen if sendpage() has not finished */
1038                int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1039                atomic_add(i, &device->pp_in_use_by_net);
1040                atomic_sub(i, &device->pp_in_use);
1041                spin_lock_irq(&device->resource->req_lock);
1042                list_add_tail(&peer_req->w.list, &device->net_ee);
1043                spin_unlock_irq(&device->resource->req_lock);
1044                wake_up(&drbd_pp_wait);
1045        } else
1046                drbd_free_peer_req(device, peer_req);
1047}
1048
1049/**
1050 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1051 * @w:          work object.
1052 * @cancel:     The connection will be closed anyways
1053 */
1054int w_e_end_data_req(struct drbd_work *w, int cancel)
1055{
1056        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1057        struct drbd_peer_device *peer_device = peer_req->peer_device;
1058        struct drbd_device *device = peer_device->device;
1059        int err;
1060
1061        if (unlikely(cancel)) {
1062                drbd_free_peer_req(device, peer_req);
1063                dec_unacked(device);
1064                return 0;
1065        }
1066
1067        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1068                err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1069        } else {
1070                if (__ratelimit(&drbd_ratelimit_state))
1071                        drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1072                            (unsigned long long)peer_req->i.sector);
1073
1074                err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1075        }
1076
1077        dec_unacked(device);
1078
1079        move_to_net_ee_or_free(device, peer_req);
1080
1081        if (unlikely(err))
1082                drbd_err(device, "drbd_send_block() failed\n");
1083        return err;
1084}
1085
1086static bool all_zero(struct drbd_peer_request *peer_req)
1087{
1088        struct page *page = peer_req->pages;
1089        unsigned int len = peer_req->i.size;
1090
1091        page_chain_for_each(page) {
1092                unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1093                unsigned int i, words = l / sizeof(long);
1094                unsigned long *d;
1095
1096                d = kmap_atomic(page);
1097                for (i = 0; i < words; i++) {
1098                        if (d[i]) {
1099                                kunmap_atomic(d);
1100                                return false;
1101                        }
1102                }
1103                kunmap_atomic(d);
1104                len -= l;
1105        }
1106
1107        return true;
1108}
1109
1110/**
1111 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1112 * @w:          work object.
1113 * @cancel:     The connection will be closed anyways
1114 */
1115int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1116{
1117        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1118        struct drbd_peer_device *peer_device = peer_req->peer_device;
1119        struct drbd_device *device = peer_device->device;
1120        int err;
1121
1122        if (unlikely(cancel)) {
1123                drbd_free_peer_req(device, peer_req);
1124                dec_unacked(device);
1125                return 0;
1126        }
1127
1128        if (get_ldev_if_state(device, D_FAILED)) {
1129                drbd_rs_complete_io(device, peer_req->i.sector);
1130                put_ldev(device);
1131        }
1132
1133        if (device->state.conn == C_AHEAD) {
1134                err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1135        } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1136                if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1137                        inc_rs_pending(device);
1138                        if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1139                                err = drbd_send_rs_deallocated(peer_device, peer_req);
1140                        else
1141                                err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1142                } else {
1143                        if (__ratelimit(&drbd_ratelimit_state))
1144                                drbd_err(device, "Not sending RSDataReply, "
1145                                    "partner DISKLESS!\n");
1146                        err = 0;
1147                }
1148        } else {
1149                if (__ratelimit(&drbd_ratelimit_state))
1150                        drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1151                            (unsigned long long)peer_req->i.sector);
1152
1153                err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1154
1155                /* update resync data with failure */
1156                drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1157        }
1158
1159        dec_unacked(device);
1160
1161        move_to_net_ee_or_free(device, peer_req);
1162
1163        if (unlikely(err))
1164                drbd_err(device, "drbd_send_block() failed\n");
1165        return err;
1166}
1167
1168int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1169{
1170        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1171        struct drbd_peer_device *peer_device = peer_req->peer_device;
1172        struct drbd_device *device = peer_device->device;
1173        struct digest_info *di;
1174        int digest_size;
1175        void *digest = NULL;
1176        int err, eq = 0;
1177
1178        if (unlikely(cancel)) {
1179                drbd_free_peer_req(device, peer_req);
1180                dec_unacked(device);
1181                return 0;
1182        }
1183
1184        if (get_ldev(device)) {
1185                drbd_rs_complete_io(device, peer_req->i.sector);
1186                put_ldev(device);
1187        }
1188
1189        di = peer_req->digest;
1190
1191        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1192                /* quick hack to try to avoid a race against reconfiguration.
1193                 * a real fix would be much more involved,
1194                 * introducing more locking mechanisms */
1195                if (peer_device->connection->csums_tfm) {
1196                        digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1197                        D_ASSERT(device, digest_size == di->digest_size);
1198                        digest = kmalloc(digest_size, GFP_NOIO);
1199                }
1200                if (digest) {
1201                        drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1202                        eq = !memcmp(digest, di->digest, digest_size);
1203                        kfree(digest);
1204                }
1205
1206                if (eq) {
1207                        drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1208                        /* rs_same_csums unit is BM_BLOCK_SIZE */
1209                        device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1210                        err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1211                } else {
1212                        inc_rs_pending(device);
1213                        peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1214                        peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1215                        kfree(di);
1216                        err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1217                }
1218        } else {
1219                err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1220                if (__ratelimit(&drbd_ratelimit_state))
1221                        drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1222        }
1223
1224        dec_unacked(device);
1225        move_to_net_ee_or_free(device, peer_req);
1226
1227        if (unlikely(err))
1228                drbd_err(device, "drbd_send_block/ack() failed\n");
1229        return err;
1230}
1231
1232int w_e_end_ov_req(struct drbd_work *w, int cancel)
1233{
1234        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1235        struct drbd_peer_device *peer_device = peer_req->peer_device;
1236        struct drbd_device *device = peer_device->device;
1237        sector_t sector = peer_req->i.sector;
1238        unsigned int size = peer_req->i.size;
1239        int digest_size;
1240        void *digest;
1241        int err = 0;
1242
1243        if (unlikely(cancel))
1244                goto out;
1245
1246        digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1247        digest = kmalloc(digest_size, GFP_NOIO);
1248        if (!digest) {
1249                err = 1;        /* terminate the connection in case the allocation failed */
1250                goto out;
1251        }
1252
1253        if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1254                drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1255        else
1256                memset(digest, 0, digest_size);
1257
1258        /* Free e and pages before send.
1259         * In case we block on congestion, we could otherwise run into
1260         * some distributed deadlock, if the other side blocks on
1261         * congestion as well, because our receiver blocks in
1262         * drbd_alloc_pages due to pp_in_use > max_buffers. */
1263        drbd_free_peer_req(device, peer_req);
1264        peer_req = NULL;
1265        inc_rs_pending(device);
1266        err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1267        if (err)
1268                dec_rs_pending(device);
1269        kfree(digest);
1270
1271out:
1272        if (peer_req)
1273                drbd_free_peer_req(device, peer_req);
1274        dec_unacked(device);
1275        return err;
1276}
1277
1278void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1279{
1280        if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1281                device->ov_last_oos_size += size>>9;
1282        } else {
1283                device->ov_last_oos_start = sector;
1284                device->ov_last_oos_size = size>>9;
1285        }
1286        drbd_set_out_of_sync(device, sector, size);
1287}
1288
1289int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1290{
1291        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1292        struct drbd_peer_device *peer_device = peer_req->peer_device;
1293        struct drbd_device *device = peer_device->device;
1294        struct digest_info *di;
1295        void *digest;
1296        sector_t sector = peer_req->i.sector;
1297        unsigned int size = peer_req->i.size;
1298        int digest_size;
1299        int err, eq = 0;
1300        bool stop_sector_reached = false;
1301
1302        if (unlikely(cancel)) {
1303                drbd_free_peer_req(device, peer_req);
1304                dec_unacked(device);
1305                return 0;
1306        }
1307
1308        /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1309         * the resync lru has been cleaned up already */
1310        if (get_ldev(device)) {
1311                drbd_rs_complete_io(device, peer_req->i.sector);
1312                put_ldev(device);
1313        }
1314
1315        di = peer_req->digest;
1316
1317        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1318                digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1319                digest = kmalloc(digest_size, GFP_NOIO);
1320                if (digest) {
1321                        drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1322
1323                        D_ASSERT(device, digest_size == di->digest_size);
1324                        eq = !memcmp(digest, di->digest, digest_size);
1325                        kfree(digest);
1326                }
1327        }
1328
1329        /* Free peer_req and pages before send.
1330         * In case we block on congestion, we could otherwise run into
1331         * some distributed deadlock, if the other side blocks on
1332         * congestion as well, because our receiver blocks in
1333         * drbd_alloc_pages due to pp_in_use > max_buffers. */
1334        drbd_free_peer_req(device, peer_req);
1335        if (!eq)
1336                drbd_ov_out_of_sync_found(device, sector, size);
1337        else
1338                ov_out_of_sync_print(device);
1339
1340        err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1341                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1342
1343        dec_unacked(device);
1344
1345        --device->ov_left;
1346
1347        /* let's advance progress step marks only for every other megabyte */
1348        if ((device->ov_left & 0x200) == 0x200)
1349                drbd_advance_rs_marks(device, device->ov_left);
1350
1351        stop_sector_reached = verify_can_do_stop_sector(device) &&
1352                (sector + (size>>9)) >= device->ov_stop_sector;
1353
1354        if (device->ov_left == 0 || stop_sector_reached) {
1355                ov_out_of_sync_print(device);
1356                drbd_resync_finished(device);
1357        }
1358
1359        return err;
1360}
1361
1362/* FIXME
1363 * We need to track the number of pending barrier acks,
1364 * and to be able to wait for them.
1365 * See also comment in drbd_adm_attach before drbd_suspend_io.
1366 */
1367static int drbd_send_barrier(struct drbd_connection *connection)
1368{
1369        struct p_barrier *p;
1370        struct drbd_socket *sock;
1371
1372        sock = &connection->data;
1373        p = conn_prepare_command(connection, sock);
1374        if (!p)
1375                return -EIO;
1376        p->barrier = connection->send.current_epoch_nr;
1377        p->pad = 0;
1378        connection->send.current_epoch_writes = 0;
1379        connection->send.last_sent_barrier_jif = jiffies;
1380
1381        return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1382}
1383
1384static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1385{
1386        struct drbd_socket *sock = &pd->connection->data;
1387        if (!drbd_prepare_command(pd, sock))
1388                return -EIO;
1389        return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1390}
1391
1392int w_send_write_hint(struct drbd_work *w, int cancel)
1393{
1394        struct drbd_device *device =
1395                container_of(w, struct drbd_device, unplug_work);
1396
1397        if (cancel)
1398                return 0;
1399        return pd_send_unplug_remote(first_peer_device(device));
1400}
1401
1402static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1403{
1404        if (!connection->send.seen_any_write_yet) {
1405                connection->send.seen_any_write_yet = true;
1406                connection->send.current_epoch_nr = epoch;
1407                connection->send.current_epoch_writes = 0;
1408                connection->send.last_sent_barrier_jif = jiffies;
1409        }
1410}
1411
1412static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1413{
1414        /* re-init if first write on this connection */
1415        if (!connection->send.seen_any_write_yet)
1416                return;
1417        if (connection->send.current_epoch_nr != epoch) {
1418                if (connection->send.current_epoch_writes)
1419                        drbd_send_barrier(connection);
1420                connection->send.current_epoch_nr = epoch;
1421        }
1422}
1423
1424int w_send_out_of_sync(struct drbd_work *w, int cancel)
1425{
1426        struct drbd_request *req = container_of(w, struct drbd_request, w);
1427        struct drbd_device *device = req->device;
1428        struct drbd_peer_device *const peer_device = first_peer_device(device);
1429        struct drbd_connection *const connection = peer_device->connection;
1430        int err;
1431
1432        if (unlikely(cancel)) {
1433                req_mod(req, SEND_CANCELED);
1434                return 0;
1435        }
1436        req->pre_send_jif = jiffies;
1437
1438        /* this time, no connection->send.current_epoch_writes++;
1439         * If it was sent, it was the closing barrier for the last
1440         * replicated epoch, before we went into AHEAD mode.
1441         * No more barriers will be sent, until we leave AHEAD mode again. */
1442        maybe_send_barrier(connection, req->epoch);
1443
1444        err = drbd_send_out_of_sync(peer_device, req);
1445        req_mod(req, OOS_HANDED_TO_NETWORK);
1446
1447        return err;
1448}
1449
1450/**
1451 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1452 * @w:          work object.
1453 * @cancel:     The connection will be closed anyways
1454 */
1455int w_send_dblock(struct drbd_work *w, int cancel)
1456{
1457        struct drbd_request *req = container_of(w, struct drbd_request, w);
1458        struct drbd_device *device = req->device;
1459        struct drbd_peer_device *const peer_device = first_peer_device(device);
1460        struct drbd_connection *connection = peer_device->connection;
1461        bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1462        int err;
1463
1464        if (unlikely(cancel)) {
1465                req_mod(req, SEND_CANCELED);
1466                return 0;
1467        }
1468        req->pre_send_jif = jiffies;
1469
1470        re_init_if_first_write(connection, req->epoch);
1471        maybe_send_barrier(connection, req->epoch);
1472        connection->send.current_epoch_writes++;
1473
1474        err = drbd_send_dblock(peer_device, req);
1475        req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1476
1477        if (do_send_unplug && !err)
1478                pd_send_unplug_remote(peer_device);
1479
1480        return err;
1481}
1482
1483/**
1484 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1485 * @w:          work object.
1486 * @cancel:     The connection will be closed anyways
1487 */
1488int w_send_read_req(struct drbd_work *w, int cancel)
1489{
1490        struct drbd_request *req = container_of(w, struct drbd_request, w);
1491        struct drbd_device *device = req->device;
1492        struct drbd_peer_device *const peer_device = first_peer_device(device);
1493        struct drbd_connection *connection = peer_device->connection;
1494        bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1495        int err;
1496
1497        if (unlikely(cancel)) {
1498                req_mod(req, SEND_CANCELED);
1499                return 0;
1500        }
1501        req->pre_send_jif = jiffies;
1502
1503        /* Even read requests may close a write epoch,
1504         * if there was any yet. */
1505        maybe_send_barrier(connection, req->epoch);
1506
1507        err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1508                                 (unsigned long)req);
1509
1510        req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1511
1512        if (do_send_unplug && !err)
1513                pd_send_unplug_remote(peer_device);
1514
1515        return err;
1516}
1517
1518int w_restart_disk_io(struct drbd_work *w, int cancel)
1519{
1520        struct drbd_request *req = container_of(w, struct drbd_request, w);
1521        struct drbd_device *device = req->device;
1522
1523        if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1524                drbd_al_begin_io(device, &req->i);
1525
1526        req->private_bio = bio_clone_fast(req->master_bio, GFP_NOIO,
1527                                          &drbd_io_bio_set);
1528        bio_set_dev(req->private_bio, device->ldev->backing_bdev);
1529        req->private_bio->bi_private = req;
1530        req->private_bio->bi_end_io = drbd_request_endio;
1531        submit_bio_noacct(req->private_bio);
1532
1533        return 0;
1534}
1535
1536static int _drbd_may_sync_now(struct drbd_device *device)
1537{
1538        struct drbd_device *odev = device;
1539        int resync_after;
1540
1541        while (1) {
1542                if (!odev->ldev || odev->state.disk == D_DISKLESS)
1543                        return 1;
1544                rcu_read_lock();
1545                resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1546                rcu_read_unlock();
1547                if (resync_after == -1)
1548                        return 1;
1549                odev = minor_to_device(resync_after);
1550                if (!odev)
1551                        return 1;
1552                if ((odev->state.conn >= C_SYNC_SOURCE &&
1553                     odev->state.conn <= C_PAUSED_SYNC_T) ||
1554                    odev->state.aftr_isp || odev->state.peer_isp ||
1555                    odev->state.user_isp)
1556                        return 0;
1557        }
1558}
1559
1560/**
1561 * drbd_pause_after() - Pause resync on all devices that may not resync now
1562 * @device:     DRBD device.
1563 *
1564 * Called from process context only (admin command and after_state_ch).
1565 */
1566static bool drbd_pause_after(struct drbd_device *device)
1567{
1568        bool changed = false;
1569        struct drbd_device *odev;
1570        int i;
1571
1572        rcu_read_lock();
1573        idr_for_each_entry(&drbd_devices, odev, i) {
1574                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1575                        continue;
1576                if (!_drbd_may_sync_now(odev) &&
1577                    _drbd_set_state(_NS(odev, aftr_isp, 1),
1578                                    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1579                        changed = true;
1580        }
1581        rcu_read_unlock();
1582
1583        return changed;
1584}
1585
1586/**
1587 * drbd_resume_next() - Resume resync on all devices that may resync now
1588 * @device:     DRBD device.
1589 *
1590 * Called from process context only (admin command and worker).
1591 */
1592static bool drbd_resume_next(struct drbd_device *device)
1593{
1594        bool changed = false;
1595        struct drbd_device *odev;
1596        int i;
1597
1598        rcu_read_lock();
1599        idr_for_each_entry(&drbd_devices, odev, i) {
1600                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1601                        continue;
1602                if (odev->state.aftr_isp) {
1603                        if (_drbd_may_sync_now(odev) &&
1604                            _drbd_set_state(_NS(odev, aftr_isp, 0),
1605                                            CS_HARD, NULL) != SS_NOTHING_TO_DO)
1606                                changed = true;
1607                }
1608        }
1609        rcu_read_unlock();
1610        return changed;
1611}
1612
1613void resume_next_sg(struct drbd_device *device)
1614{
1615        lock_all_resources();
1616        drbd_resume_next(device);
1617        unlock_all_resources();
1618}
1619
1620void suspend_other_sg(struct drbd_device *device)
1621{
1622        lock_all_resources();
1623        drbd_pause_after(device);
1624        unlock_all_resources();
1625}
1626
1627/* caller must lock_all_resources() */
1628enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1629{
1630        struct drbd_device *odev;
1631        int resync_after;
1632
1633        if (o_minor == -1)
1634                return NO_ERROR;
1635        if (o_minor < -1 || o_minor > MINORMASK)
1636                return ERR_RESYNC_AFTER;
1637
1638        /* check for loops */
1639        odev = minor_to_device(o_minor);
1640        while (1) {
1641                if (odev == device)
1642                        return ERR_RESYNC_AFTER_CYCLE;
1643
1644                /* You are free to depend on diskless, non-existing,
1645                 * or not yet/no longer existing minors.
1646                 * We only reject dependency loops.
1647                 * We cannot follow the dependency chain beyond a detached or
1648                 * missing minor.
1649                 */
1650                if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1651                        return NO_ERROR;
1652
1653                rcu_read_lock();
1654                resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1655                rcu_read_unlock();
1656                /* dependency chain ends here, no cycles. */
1657                if (resync_after == -1)
1658                        return NO_ERROR;
1659
1660                /* follow the dependency chain */
1661                odev = minor_to_device(resync_after);
1662        }
1663}
1664
1665/* caller must lock_all_resources() */
1666void drbd_resync_after_changed(struct drbd_device *device)
1667{
1668        int changed;
1669
1670        do {
1671                changed  = drbd_pause_after(device);
1672                changed |= drbd_resume_next(device);
1673        } while (changed);
1674}
1675
1676void drbd_rs_controller_reset(struct drbd_device *device)
1677{
1678        struct gendisk *disk = device->ldev->backing_bdev->bd_disk;
1679        struct fifo_buffer *plan;
1680
1681        atomic_set(&device->rs_sect_in, 0);
1682        atomic_set(&device->rs_sect_ev, 0);
1683        device->rs_in_flight = 0;
1684        device->rs_last_events =
1685                (int)part_stat_read_accum(disk->part0, sectors);
1686
1687        /* Updating the RCU protected object in place is necessary since
1688           this function gets called from atomic context.
1689           It is valid since all other updates also lead to an completely
1690           empty fifo */
1691        rcu_read_lock();
1692        plan = rcu_dereference(device->rs_plan_s);
1693        plan->total = 0;
1694        fifo_set(plan, 0);
1695        rcu_read_unlock();
1696}
1697
1698void start_resync_timer_fn(struct timer_list *t)
1699{
1700        struct drbd_device *device = from_timer(device, t, start_resync_timer);
1701        drbd_device_post_work(device, RS_START);
1702}
1703
1704static void do_start_resync(struct drbd_device *device)
1705{
1706        if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1707                drbd_warn(device, "postponing start_resync ...\n");
1708                device->start_resync_timer.expires = jiffies + HZ/10;
1709                add_timer(&device->start_resync_timer);
1710                return;
1711        }
1712
1713        drbd_start_resync(device, C_SYNC_SOURCE);
1714        clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1715}
1716
1717static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1718{
1719        bool csums_after_crash_only;
1720        rcu_read_lock();
1721        csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1722        rcu_read_unlock();
1723        return connection->agreed_pro_version >= 89 &&          /* supported? */
1724                connection->csums_tfm &&                        /* configured? */
1725                (csums_after_crash_only == false                /* use for each resync? */
1726                 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1727}
1728
1729/**
1730 * drbd_start_resync() - Start the resync process
1731 * @device:     DRBD device.
1732 * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1733 *
1734 * This function might bring you directly into one of the
1735 * C_PAUSED_SYNC_* states.
1736 */
1737void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1738{
1739        struct drbd_peer_device *peer_device = first_peer_device(device);
1740        struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1741        union drbd_state ns;
1742        int r;
1743
1744        if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1745                drbd_err(device, "Resync already running!\n");
1746                return;
1747        }
1748
1749        if (!connection) {
1750                drbd_err(device, "No connection to peer, aborting!\n");
1751                return;
1752        }
1753
1754        if (!test_bit(B_RS_H_DONE, &device->flags)) {
1755                if (side == C_SYNC_TARGET) {
1756                        /* Since application IO was locked out during C_WF_BITMAP_T and
1757                           C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1758                           we check that we might make the data inconsistent. */
1759                        r = drbd_khelper(device, "before-resync-target");
1760                        r = (r >> 8) & 0xff;
1761                        if (r > 0) {
1762                                drbd_info(device, "before-resync-target handler returned %d, "
1763                                         "dropping connection.\n", r);
1764                                conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1765                                return;
1766                        }
1767                } else /* C_SYNC_SOURCE */ {
1768                        r = drbd_khelper(device, "before-resync-source");
1769                        r = (r >> 8) & 0xff;
1770                        if (r > 0) {
1771                                if (r == 3) {
1772                                        drbd_info(device, "before-resync-source handler returned %d, "
1773                                                 "ignoring. Old userland tools?", r);
1774                                } else {
1775                                        drbd_info(device, "before-resync-source handler returned %d, "
1776                                                 "dropping connection.\n", r);
1777                                        conn_request_state(connection,
1778                                                           NS(conn, C_DISCONNECTING), CS_HARD);
1779                                        return;
1780                                }
1781                        }
1782                }
1783        }
1784
1785        if (current == connection->worker.task) {
1786                /* The worker should not sleep waiting for state_mutex,
1787                   that can take long */
1788                if (!mutex_trylock(device->state_mutex)) {
1789                        set_bit(B_RS_H_DONE, &device->flags);
1790                        device->start_resync_timer.expires = jiffies + HZ/5;
1791                        add_timer(&device->start_resync_timer);
1792                        return;
1793                }
1794        } else {
1795                mutex_lock(device->state_mutex);
1796        }
1797
1798        lock_all_resources();
1799        clear_bit(B_RS_H_DONE, &device->flags);
1800        /* Did some connection breakage or IO error race with us? */
1801        if (device->state.conn < C_CONNECTED
1802        || !get_ldev_if_state(device, D_NEGOTIATING)) {
1803                unlock_all_resources();
1804                goto out;
1805        }
1806
1807        ns = drbd_read_state(device);
1808
1809        ns.aftr_isp = !_drbd_may_sync_now(device);
1810
1811        ns.conn = side;
1812
1813        if (side == C_SYNC_TARGET)
1814                ns.disk = D_INCONSISTENT;
1815        else /* side == C_SYNC_SOURCE */
1816                ns.pdsk = D_INCONSISTENT;
1817
1818        r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1819        ns = drbd_read_state(device);
1820
1821        if (ns.conn < C_CONNECTED)
1822                r = SS_UNKNOWN_ERROR;
1823
1824        if (r == SS_SUCCESS) {
1825                unsigned long tw = drbd_bm_total_weight(device);
1826                unsigned long now = jiffies;
1827                int i;
1828
1829                device->rs_failed    = 0;
1830                device->rs_paused    = 0;
1831                device->rs_same_csum = 0;
1832                device->rs_last_sect_ev = 0;
1833                device->rs_total     = tw;
1834                device->rs_start     = now;
1835                for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1836                        device->rs_mark_left[i] = tw;
1837                        device->rs_mark_time[i] = now;
1838                }
1839                drbd_pause_after(device);
1840                /* Forget potentially stale cached per resync extent bit-counts.
1841                 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1842                 * disabled, and know the disk state is ok. */
1843                spin_lock(&device->al_lock);
1844                lc_reset(device->resync);
1845                device->resync_locked = 0;
1846                device->resync_wenr = LC_FREE;
1847                spin_unlock(&device->al_lock);
1848        }
1849        unlock_all_resources();
1850
1851        if (r == SS_SUCCESS) {
1852                wake_up(&device->al_wait); /* for lc_reset() above */
1853                /* reset rs_last_bcast when a resync or verify is started,
1854                 * to deal with potential jiffies wrap. */
1855                device->rs_last_bcast = jiffies - HZ;
1856
1857                drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1858                     drbd_conn_str(ns.conn),
1859                     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1860                     (unsigned long) device->rs_total);
1861                if (side == C_SYNC_TARGET) {
1862                        device->bm_resync_fo = 0;
1863                        device->use_csums = use_checksum_based_resync(connection, device);
1864                } else {
1865                        device->use_csums = false;
1866                }
1867
1868                /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1869                 * with w_send_oos, or the sync target will get confused as to
1870                 * how much bits to resync.  We cannot do that always, because for an
1871                 * empty resync and protocol < 95, we need to do it here, as we call
1872                 * drbd_resync_finished from here in that case.
1873                 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1874                 * and from after_state_ch otherwise. */
1875                if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1876                        drbd_gen_and_send_sync_uuid(peer_device);
1877
1878                if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1879                        /* This still has a race (about when exactly the peers
1880                         * detect connection loss) that can lead to a full sync
1881                         * on next handshake. In 8.3.9 we fixed this with explicit
1882                         * resync-finished notifications, but the fix
1883                         * introduces a protocol change.  Sleeping for some
1884                         * time longer than the ping interval + timeout on the
1885                         * SyncSource, to give the SyncTarget the chance to
1886                         * detect connection loss, then waiting for a ping
1887                         * response (implicit in drbd_resync_finished) reduces
1888                         * the race considerably, but does not solve it. */
1889                        if (side == C_SYNC_SOURCE) {
1890                                struct net_conf *nc;
1891                                int timeo;
1892
1893                                rcu_read_lock();
1894                                nc = rcu_dereference(connection->net_conf);
1895                                timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1896                                rcu_read_unlock();
1897                                schedule_timeout_interruptible(timeo);
1898                        }
1899                        drbd_resync_finished(device);
1900                }
1901
1902                drbd_rs_controller_reset(device);
1903                /* ns.conn may already be != device->state.conn,
1904                 * we may have been paused in between, or become paused until
1905                 * the timer triggers.
1906                 * No matter, that is handled in resync_timer_fn() */
1907                if (ns.conn == C_SYNC_TARGET)
1908                        mod_timer(&device->resync_timer, jiffies);
1909
1910                drbd_md_sync(device);
1911        }
1912        put_ldev(device);
1913out:
1914        mutex_unlock(device->state_mutex);
1915}
1916
1917static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1918{
1919        struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1920        device->rs_last_bcast = jiffies;
1921
1922        if (!get_ldev(device))
1923                return;
1924
1925        drbd_bm_write_lazy(device, 0);
1926        if (resync_done && is_sync_state(device->state.conn))
1927                drbd_resync_finished(device);
1928
1929        drbd_bcast_event(device, &sib);
1930        /* update timestamp, in case it took a while to write out stuff */
1931        device->rs_last_bcast = jiffies;
1932        put_ldev(device);
1933}
1934
1935static void drbd_ldev_destroy(struct drbd_device *device)
1936{
1937        lc_destroy(device->resync);
1938        device->resync = NULL;
1939        lc_destroy(device->act_log);
1940        device->act_log = NULL;
1941
1942        __acquire(local);
1943        drbd_backing_dev_free(device, device->ldev);
1944        device->ldev = NULL;
1945        __release(local);
1946
1947        clear_bit(GOING_DISKLESS, &device->flags);
1948        wake_up(&device->misc_wait);
1949}
1950
1951static void go_diskless(struct drbd_device *device)
1952{
1953        D_ASSERT(device, device->state.disk == D_FAILED);
1954        /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1955         * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1956         * the protected members anymore, though, so once put_ldev reaches zero
1957         * again, it will be safe to free them. */
1958
1959        /* Try to write changed bitmap pages, read errors may have just
1960         * set some bits outside the area covered by the activity log.
1961         *
1962         * If we have an IO error during the bitmap writeout,
1963         * we will want a full sync next time, just in case.
1964         * (Do we want a specific meta data flag for this?)
1965         *
1966         * If that does not make it to stable storage either,
1967         * we cannot do anything about that anymore.
1968         *
1969         * We still need to check if both bitmap and ldev are present, we may
1970         * end up here after a failed attach, before ldev was even assigned.
1971         */
1972        if (device->bitmap && device->ldev) {
1973                /* An interrupted resync or similar is allowed to recounts bits
1974                 * while we detach.
1975                 * Any modifications would not be expected anymore, though.
1976                 */
1977                if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1978                                        "detach", BM_LOCKED_TEST_ALLOWED)) {
1979                        if (test_bit(WAS_READ_ERROR, &device->flags)) {
1980                                drbd_md_set_flag(device, MDF_FULL_SYNC);
1981                                drbd_md_sync(device);
1982                        }
1983                }
1984        }
1985
1986        drbd_force_state(device, NS(disk, D_DISKLESS));
1987}
1988
1989static int do_md_sync(struct drbd_device *device)
1990{
1991        drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1992        drbd_md_sync(device);
1993        return 0;
1994}
1995
1996/* only called from drbd_worker thread, no locking */
1997void __update_timing_details(
1998                struct drbd_thread_timing_details *tdp,
1999                unsigned int *cb_nr,
2000                void *cb,
2001                const char *fn, const unsigned int line)
2002{
2003        unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2004        struct drbd_thread_timing_details *td = tdp + i;
2005
2006        td->start_jif = jiffies;
2007        td->cb_addr = cb;
2008        td->caller_fn = fn;
2009        td->line = line;
2010        td->cb_nr = *cb_nr;
2011
2012        i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2013        td = tdp + i;
2014        memset(td, 0, sizeof(*td));
2015
2016        ++(*cb_nr);
2017}
2018
2019static void do_device_work(struct drbd_device *device, const unsigned long todo)
2020{
2021        if (test_bit(MD_SYNC, &todo))
2022                do_md_sync(device);
2023        if (test_bit(RS_DONE, &todo) ||
2024            test_bit(RS_PROGRESS, &todo))
2025                update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2026        if (test_bit(GO_DISKLESS, &todo))
2027                go_diskless(device);
2028        if (test_bit(DESTROY_DISK, &todo))
2029                drbd_ldev_destroy(device);
2030        if (test_bit(RS_START, &todo))
2031                do_start_resync(device);
2032}
2033
2034#define DRBD_DEVICE_WORK_MASK   \
2035        ((1UL << GO_DISKLESS)   \
2036        |(1UL << DESTROY_DISK)  \
2037        |(1UL << MD_SYNC)       \
2038        |(1UL << RS_START)      \
2039        |(1UL << RS_PROGRESS)   \
2040        |(1UL << RS_DONE)       \
2041        )
2042
2043static unsigned long get_work_bits(unsigned long *flags)
2044{
2045        unsigned long old, new;
2046        do {
2047                old = *flags;
2048                new = old & ~DRBD_DEVICE_WORK_MASK;
2049        } while (cmpxchg(flags, old, new) != old);
2050        return old & DRBD_DEVICE_WORK_MASK;
2051}
2052
2053static void do_unqueued_work(struct drbd_connection *connection)
2054{
2055        struct drbd_peer_device *peer_device;
2056        int vnr;
2057
2058        rcu_read_lock();
2059        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2060                struct drbd_device *device = peer_device->device;
2061                unsigned long todo = get_work_bits(&device->flags);
2062                if (!todo)
2063                        continue;
2064
2065                kref_get(&device->kref);
2066                rcu_read_unlock();
2067                do_device_work(device, todo);
2068                kref_put(&device->kref, drbd_destroy_device);
2069                rcu_read_lock();
2070        }
2071        rcu_read_unlock();
2072}
2073
2074static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2075{
2076        spin_lock_irq(&queue->q_lock);
2077        list_splice_tail_init(&queue->q, work_list);
2078        spin_unlock_irq(&queue->q_lock);
2079        return !list_empty(work_list);
2080}
2081
2082static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2083{
2084        DEFINE_WAIT(wait);
2085        struct net_conf *nc;
2086        int uncork, cork;
2087
2088        dequeue_work_batch(&connection->sender_work, work_list);
2089        if (!list_empty(work_list))
2090                return;
2091
2092        /* Still nothing to do?
2093         * Maybe we still need to close the current epoch,
2094         * even if no new requests are queued yet.
2095         *
2096         * Also, poke TCP, just in case.
2097         * Then wait for new work (or signal). */
2098        rcu_read_lock();
2099        nc = rcu_dereference(connection->net_conf);
2100        uncork = nc ? nc->tcp_cork : 0;
2101        rcu_read_unlock();
2102        if (uncork) {
2103                mutex_lock(&connection->data.mutex);
2104                if (connection->data.socket)
2105                        tcp_sock_set_cork(connection->data.socket->sk, false);
2106                mutex_unlock(&connection->data.mutex);
2107        }
2108
2109        for (;;) {
2110                int send_barrier;
2111                prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2112                spin_lock_irq(&connection->resource->req_lock);
2113                spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2114                if (!list_empty(&connection->sender_work.q))
2115                        list_splice_tail_init(&connection->sender_work.q, work_list);
2116                spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2117                if (!list_empty(work_list) || signal_pending(current)) {
2118                        spin_unlock_irq(&connection->resource->req_lock);
2119                        break;
2120                }
2121
2122                /* We found nothing new to do, no to-be-communicated request,
2123                 * no other work item.  We may still need to close the last
2124                 * epoch.  Next incoming request epoch will be connection ->
2125                 * current transfer log epoch number.  If that is different
2126                 * from the epoch of the last request we communicated, it is
2127                 * safe to send the epoch separating barrier now.
2128                 */
2129                send_barrier =
2130                        atomic_read(&connection->current_tle_nr) !=
2131                        connection->send.current_epoch_nr;
2132                spin_unlock_irq(&connection->resource->req_lock);
2133
2134                if (send_barrier)
2135                        maybe_send_barrier(connection,
2136                                        connection->send.current_epoch_nr + 1);
2137
2138                if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2139                        break;
2140
2141                /* drbd_send() may have called flush_signals() */
2142                if (get_t_state(&connection->worker) != RUNNING)
2143                        break;
2144
2145                schedule();
2146                /* may be woken up for other things but new work, too,
2147                 * e.g. if the current epoch got closed.
2148                 * In which case we send the barrier above. */
2149        }
2150        finish_wait(&connection->sender_work.q_wait, &wait);
2151
2152        /* someone may have changed the config while we have been waiting above. */
2153        rcu_read_lock();
2154        nc = rcu_dereference(connection->net_conf);
2155        cork = nc ? nc->tcp_cork : 0;
2156        rcu_read_unlock();
2157        mutex_lock(&connection->data.mutex);
2158        if (connection->data.socket) {
2159                if (cork)
2160                        tcp_sock_set_cork(connection->data.socket->sk, true);
2161                else if (!uncork)
2162                        tcp_sock_set_cork(connection->data.socket->sk, false);
2163        }
2164        mutex_unlock(&connection->data.mutex);
2165}
2166
2167int drbd_worker(struct drbd_thread *thi)
2168{
2169        struct drbd_connection *connection = thi->connection;
2170        struct drbd_work *w = NULL;
2171        struct drbd_peer_device *peer_device;
2172        LIST_HEAD(work_list);
2173        int vnr;
2174
2175        while (get_t_state(thi) == RUNNING) {
2176                drbd_thread_current_set_cpu(thi);
2177
2178                if (list_empty(&work_list)) {
2179                        update_worker_timing_details(connection, wait_for_work);
2180                        wait_for_work(connection, &work_list);
2181                }
2182
2183                if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2184                        update_worker_timing_details(connection, do_unqueued_work);
2185                        do_unqueued_work(connection);
2186                }
2187
2188                if (signal_pending(current)) {
2189                        flush_signals(current);
2190                        if (get_t_state(thi) == RUNNING) {
2191                                drbd_warn(connection, "Worker got an unexpected signal\n");
2192                                continue;
2193                        }
2194                        break;
2195                }
2196
2197                if (get_t_state(thi) != RUNNING)
2198                        break;
2199
2200                if (!list_empty(&work_list)) {
2201                        w = list_first_entry(&work_list, struct drbd_work, list);
2202                        list_del_init(&w->list);
2203                        update_worker_timing_details(connection, w->cb);
2204                        if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2205                                continue;
2206                        if (connection->cstate >= C_WF_REPORT_PARAMS)
2207                                conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2208                }
2209        }
2210
2211        do {
2212                if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2213                        update_worker_timing_details(connection, do_unqueued_work);
2214                        do_unqueued_work(connection);
2215                }
2216                if (!list_empty(&work_list)) {
2217                        w = list_first_entry(&work_list, struct drbd_work, list);
2218                        list_del_init(&w->list);
2219                        update_worker_timing_details(connection, w->cb);
2220                        w->cb(w, 1);
2221                } else
2222                        dequeue_work_batch(&connection->sender_work, &work_list);
2223        } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2224
2225        rcu_read_lock();
2226        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2227                struct drbd_device *device = peer_device->device;
2228                D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2229                kref_get(&device->kref);
2230                rcu_read_unlock();
2231                drbd_device_cleanup(device);
2232                kref_put(&device->kref, drbd_destroy_device);
2233                rcu_read_lock();
2234        }
2235        rcu_read_unlock();
2236
2237        return 0;
2238}
2239