linux/drivers/block/drbd/drbd_worker.c
<<
>>
Prefs
   1/*
   2   drbd_worker.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23
  24*/
  25
  26#include <linux/module.h>
  27#include <linux/drbd.h>
  28#include <linux/sched/signal.h>
  29#include <linux/wait.h>
  30#include <linux/mm.h>
  31#include <linux/memcontrol.h>
  32#include <linux/mm_inline.h>
  33#include <linux/slab.h>
  34#include <linux/random.h>
  35#include <linux/string.h>
  36#include <linux/scatterlist.h>
  37
  38#include "drbd_int.h"
  39#include "drbd_protocol.h"
  40#include "drbd_req.h"
  41
  42static int make_ov_request(struct drbd_device *, int);
  43static int make_resync_request(struct drbd_device *, int);
  44
  45/* endio handlers:
  46 *   drbd_md_endio (defined here)
  47 *   drbd_request_endio (defined here)
  48 *   drbd_peer_request_endio (defined here)
  49 *   drbd_bm_endio (defined in drbd_bitmap.c)
  50 *
  51 * For all these callbacks, note the following:
  52 * The callbacks will be called in irq context by the IDE drivers,
  53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
  54 * Try to get the locking right :)
  55 *
  56 */
  57
  58/* used for synchronous meta data and bitmap IO
  59 * submitted by drbd_md_sync_page_io()
  60 */
  61void drbd_md_endio(struct bio *bio)
  62{
  63        struct drbd_device *device;
  64
  65        device = bio->bi_private;
  66        device->md_io.error = blk_status_to_errno(bio->bi_status);
  67
  68        /* special case: drbd_md_read() during drbd_adm_attach() */
  69        if (device->ldev)
  70                put_ldev(device);
  71        bio_put(bio);
  72
  73        /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
  74         * to timeout on the lower level device, and eventually detach from it.
  75         * If this io completion runs after that timeout expired, this
  76         * drbd_md_put_buffer() may allow us to finally try and re-attach.
  77         * During normal operation, this only puts that extra reference
  78         * down to 1 again.
  79         * Make sure we first drop the reference, and only then signal
  80         * completion, or we may (in drbd_al_read_log()) cycle so fast into the
  81         * next drbd_md_sync_page_io(), that we trigger the
  82         * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
  83         */
  84        drbd_md_put_buffer(device);
  85        device->md_io.done = 1;
  86        wake_up(&device->misc_wait);
  87}
  88
  89/* reads on behalf of the partner,
  90 * "submitted" by the receiver
  91 */
  92static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
  93{
  94        unsigned long flags = 0;
  95        struct drbd_peer_device *peer_device = peer_req->peer_device;
  96        struct drbd_device *device = peer_device->device;
  97
  98        spin_lock_irqsave(&device->resource->req_lock, flags);
  99        device->read_cnt += peer_req->i.size >> 9;
 100        list_del(&peer_req->w.list);
 101        if (list_empty(&device->read_ee))
 102                wake_up(&device->ee_wait);
 103        if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
 104                __drbd_chk_io_error(device, DRBD_READ_ERROR);
 105        spin_unlock_irqrestore(&device->resource->req_lock, flags);
 106
 107        drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
 108        put_ldev(device);
 109}
 110
 111/* writes on behalf of the partner, or resync writes,
 112 * "submitted" by the receiver, final stage.  */
 113void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
 114{
 115        unsigned long flags = 0;
 116        struct drbd_peer_device *peer_device = peer_req->peer_device;
 117        struct drbd_device *device = peer_device->device;
 118        struct drbd_connection *connection = peer_device->connection;
 119        struct drbd_interval i;
 120        int do_wake;
 121        u64 block_id;
 122        int do_al_complete_io;
 123
 124        /* after we moved peer_req to done_ee,
 125         * we may no longer access it,
 126         * it may be freed/reused already!
 127         * (as soon as we release the req_lock) */
 128        i = peer_req->i;
 129        do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
 130        block_id = peer_req->block_id;
 131        peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
 132
 133        if (peer_req->flags & EE_WAS_ERROR) {
 134                /* In protocol != C, we usually do not send write acks.
 135                 * In case of a write error, send the neg ack anyways. */
 136                if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
 137                        inc_unacked(device);
 138                drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
 139        }
 140
 141        spin_lock_irqsave(&device->resource->req_lock, flags);
 142        device->writ_cnt += peer_req->i.size >> 9;
 143        list_move_tail(&peer_req->w.list, &device->done_ee);
 144
 145        /*
 146         * Do not remove from the write_requests tree here: we did not send the
 147         * Ack yet and did not wake possibly waiting conflicting requests.
 148         * Removed from the tree from "drbd_process_done_ee" within the
 149         * appropriate dw.cb (e_end_block/e_end_resync_block) or from
 150         * _drbd_clear_done_ee.
 151         */
 152
 153        do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
 154
 155        /* FIXME do we want to detach for failed REQ_DISCARD?
 156         * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
 157        if (peer_req->flags & EE_WAS_ERROR)
 158                __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
 159
 160        if (connection->cstate >= C_WF_REPORT_PARAMS) {
 161                kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
 162                if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
 163                        kref_put(&device->kref, drbd_destroy_device);
 164        }
 165        spin_unlock_irqrestore(&device->resource->req_lock, flags);
 166
 167        if (block_id == ID_SYNCER)
 168                drbd_rs_complete_io(device, i.sector);
 169
 170        if (do_wake)
 171                wake_up(&device->ee_wait);
 172
 173        if (do_al_complete_io)
 174                drbd_al_complete_io(device, &i);
 175
 176        put_ldev(device);
 177}
 178
 179/* writes on behalf of the partner, or resync writes,
 180 * "submitted" by the receiver.
 181 */
 182void drbd_peer_request_endio(struct bio *bio)
 183{
 184        struct drbd_peer_request *peer_req = bio->bi_private;
 185        struct drbd_device *device = peer_req->peer_device->device;
 186        bool is_write = bio_data_dir(bio) == WRITE;
 187        bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
 188                          bio_op(bio) == REQ_OP_DISCARD;
 189
 190        if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
 191                drbd_warn(device, "%s: error=%d s=%llus\n",
 192                                is_write ? (is_discard ? "discard" : "write")
 193                                        : "read", bio->bi_status,
 194                                (unsigned long long)peer_req->i.sector);
 195
 196        if (bio->bi_status)
 197                set_bit(__EE_WAS_ERROR, &peer_req->flags);
 198
 199        bio_put(bio); /* no need for the bio anymore */
 200        if (atomic_dec_and_test(&peer_req->pending_bios)) {
 201                if (is_write)
 202                        drbd_endio_write_sec_final(peer_req);
 203                else
 204                        drbd_endio_read_sec_final(peer_req);
 205        }
 206}
 207
 208static void
 209drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
 210{
 211        panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
 212                device->minor, device->resource->name, device->vnr);
 213}
 214
 215/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
 216 */
 217void drbd_request_endio(struct bio *bio)
 218{
 219        unsigned long flags;
 220        struct drbd_request *req = bio->bi_private;
 221        struct drbd_device *device = req->device;
 222        struct bio_and_error m;
 223        enum drbd_req_event what;
 224
 225        /* If this request was aborted locally before,
 226         * but now was completed "successfully",
 227         * chances are that this caused arbitrary data corruption.
 228         *
 229         * "aborting" requests, or force-detaching the disk, is intended for
 230         * completely blocked/hung local backing devices which do no longer
 231         * complete requests at all, not even do error completions.  In this
 232         * situation, usually a hard-reset and failover is the only way out.
 233         *
 234         * By "aborting", basically faking a local error-completion,
 235         * we allow for a more graceful swichover by cleanly migrating services.
 236         * Still the affected node has to be rebooted "soon".
 237         *
 238         * By completing these requests, we allow the upper layers to re-use
 239         * the associated data pages.
 240         *
 241         * If later the local backing device "recovers", and now DMAs some data
 242         * from disk into the original request pages, in the best case it will
 243         * just put random data into unused pages; but typically it will corrupt
 244         * meanwhile completely unrelated data, causing all sorts of damage.
 245         *
 246         * Which means delayed successful completion,
 247         * especially for READ requests,
 248         * is a reason to panic().
 249         *
 250         * We assume that a delayed *error* completion is OK,
 251         * though we still will complain noisily about it.
 252         */
 253        if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
 254                if (__ratelimit(&drbd_ratelimit_state))
 255                        drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
 256
 257                if (!bio->bi_status)
 258                        drbd_panic_after_delayed_completion_of_aborted_request(device);
 259        }
 260
 261        /* to avoid recursion in __req_mod */
 262        if (unlikely(bio->bi_status)) {
 263                switch (bio_op(bio)) {
 264                case REQ_OP_WRITE_ZEROES:
 265                case REQ_OP_DISCARD:
 266                        if (bio->bi_status == BLK_STS_NOTSUPP)
 267                                what = DISCARD_COMPLETED_NOTSUPP;
 268                        else
 269                                what = DISCARD_COMPLETED_WITH_ERROR;
 270                        break;
 271                case REQ_OP_READ:
 272                        if (bio->bi_opf & REQ_RAHEAD)
 273                                what = READ_AHEAD_COMPLETED_WITH_ERROR;
 274                        else
 275                                what = READ_COMPLETED_WITH_ERROR;
 276                        break;
 277                default:
 278                        what = WRITE_COMPLETED_WITH_ERROR;
 279                        break;
 280                }
 281        } else {
 282                what = COMPLETED_OK;
 283        }
 284
 285        req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
 286        bio_put(bio);
 287
 288        /* not req_mod(), we need irqsave here! */
 289        spin_lock_irqsave(&device->resource->req_lock, flags);
 290        __req_mod(req, what, &m);
 291        spin_unlock_irqrestore(&device->resource->req_lock, flags);
 292        put_ldev(device);
 293
 294        if (m.bio)
 295                complete_master_bio(device, &m);
 296}
 297
 298void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
 299{
 300        AHASH_REQUEST_ON_STACK(req, tfm);
 301        struct scatterlist sg;
 302        struct page *page = peer_req->pages;
 303        struct page *tmp;
 304        unsigned len;
 305
 306        ahash_request_set_tfm(req, tfm);
 307        ahash_request_set_callback(req, 0, NULL, NULL);
 308
 309        sg_init_table(&sg, 1);
 310        crypto_ahash_init(req);
 311
 312        while ((tmp = page_chain_next(page))) {
 313                /* all but the last page will be fully used */
 314                sg_set_page(&sg, page, PAGE_SIZE, 0);
 315                ahash_request_set_crypt(req, &sg, NULL, sg.length);
 316                crypto_ahash_update(req);
 317                page = tmp;
 318        }
 319        /* and now the last, possibly only partially used page */
 320        len = peer_req->i.size & (PAGE_SIZE - 1);
 321        sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
 322        ahash_request_set_crypt(req, &sg, digest, sg.length);
 323        crypto_ahash_finup(req);
 324        ahash_request_zero(req);
 325}
 326
 327void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
 328{
 329        AHASH_REQUEST_ON_STACK(req, tfm);
 330        struct scatterlist sg;
 331        struct bio_vec bvec;
 332        struct bvec_iter iter;
 333
 334        ahash_request_set_tfm(req, tfm);
 335        ahash_request_set_callback(req, 0, NULL, NULL);
 336
 337        sg_init_table(&sg, 1);
 338        crypto_ahash_init(req);
 339
 340        bio_for_each_segment(bvec, bio, iter) {
 341                sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
 342                ahash_request_set_crypt(req, &sg, NULL, sg.length);
 343                crypto_ahash_update(req);
 344                /* REQ_OP_WRITE_SAME has only one segment,
 345                 * checksum the payload only once. */
 346                if (bio_op(bio) == REQ_OP_WRITE_SAME)
 347                        break;
 348        }
 349        ahash_request_set_crypt(req, NULL, digest, 0);
 350        crypto_ahash_final(req);
 351        ahash_request_zero(req);
 352}
 353
 354/* MAYBE merge common code with w_e_end_ov_req */
 355static int w_e_send_csum(struct drbd_work *w, int cancel)
 356{
 357        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
 358        struct drbd_peer_device *peer_device = peer_req->peer_device;
 359        struct drbd_device *device = peer_device->device;
 360        int digest_size;
 361        void *digest;
 362        int err = 0;
 363
 364        if (unlikely(cancel))
 365                goto out;
 366
 367        if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
 368                goto out;
 369
 370        digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
 371        digest = kmalloc(digest_size, GFP_NOIO);
 372        if (digest) {
 373                sector_t sector = peer_req->i.sector;
 374                unsigned int size = peer_req->i.size;
 375                drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
 376                /* Free peer_req and pages before send.
 377                 * In case we block on congestion, we could otherwise run into
 378                 * some distributed deadlock, if the other side blocks on
 379                 * congestion as well, because our receiver blocks in
 380                 * drbd_alloc_pages due to pp_in_use > max_buffers. */
 381                drbd_free_peer_req(device, peer_req);
 382                peer_req = NULL;
 383                inc_rs_pending(device);
 384                err = drbd_send_drequest_csum(peer_device, sector, size,
 385                                              digest, digest_size,
 386                                              P_CSUM_RS_REQUEST);
 387                kfree(digest);
 388        } else {
 389                drbd_err(device, "kmalloc() of digest failed.\n");
 390                err = -ENOMEM;
 391        }
 392
 393out:
 394        if (peer_req)
 395                drbd_free_peer_req(device, peer_req);
 396
 397        if (unlikely(err))
 398                drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
 399        return err;
 400}
 401
 402#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
 403
 404static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
 405{
 406        struct drbd_device *device = peer_device->device;
 407        struct drbd_peer_request *peer_req;
 408
 409        if (!get_ldev(device))
 410                return -EIO;
 411
 412        /* GFP_TRY, because if there is no memory available right now, this may
 413         * be rescheduled for later. It is "only" background resync, after all. */
 414        peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
 415                                       size, size, GFP_TRY);
 416        if (!peer_req)
 417                goto defer;
 418
 419        peer_req->w.cb = w_e_send_csum;
 420        spin_lock_irq(&device->resource->req_lock);
 421        list_add_tail(&peer_req->w.list, &device->read_ee);
 422        spin_unlock_irq(&device->resource->req_lock);
 423
 424        atomic_add(size >> 9, &device->rs_sect_ev);
 425        if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
 426                                     DRBD_FAULT_RS_RD) == 0)
 427                return 0;
 428
 429        /* If it failed because of ENOMEM, retry should help.  If it failed
 430         * because bio_add_page failed (probably broken lower level driver),
 431         * retry may or may not help.
 432         * If it does not, you may need to force disconnect. */
 433        spin_lock_irq(&device->resource->req_lock);
 434        list_del(&peer_req->w.list);
 435        spin_unlock_irq(&device->resource->req_lock);
 436
 437        drbd_free_peer_req(device, peer_req);
 438defer:
 439        put_ldev(device);
 440        return -EAGAIN;
 441}
 442
 443int w_resync_timer(struct drbd_work *w, int cancel)
 444{
 445        struct drbd_device *device =
 446                container_of(w, struct drbd_device, resync_work);
 447
 448        switch (device->state.conn) {
 449        case C_VERIFY_S:
 450                make_ov_request(device, cancel);
 451                break;
 452        case C_SYNC_TARGET:
 453                make_resync_request(device, cancel);
 454                break;
 455        }
 456
 457        return 0;
 458}
 459
 460void resync_timer_fn(struct timer_list *t)
 461{
 462        struct drbd_device *device = from_timer(device, t, resync_timer);
 463
 464        drbd_queue_work_if_unqueued(
 465                &first_peer_device(device)->connection->sender_work,
 466                &device->resync_work);
 467}
 468
 469static void fifo_set(struct fifo_buffer *fb, int value)
 470{
 471        int i;
 472
 473        for (i = 0; i < fb->size; i++)
 474                fb->values[i] = value;
 475}
 476
 477static int fifo_push(struct fifo_buffer *fb, int value)
 478{
 479        int ov;
 480
 481        ov = fb->values[fb->head_index];
 482        fb->values[fb->head_index++] = value;
 483
 484        if (fb->head_index >= fb->size)
 485                fb->head_index = 0;
 486
 487        return ov;
 488}
 489
 490static void fifo_add_val(struct fifo_buffer *fb, int value)
 491{
 492        int i;
 493
 494        for (i = 0; i < fb->size; i++)
 495                fb->values[i] += value;
 496}
 497
 498struct fifo_buffer *fifo_alloc(int fifo_size)
 499{
 500        struct fifo_buffer *fb;
 501
 502        fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
 503        if (!fb)
 504                return NULL;
 505
 506        fb->head_index = 0;
 507        fb->size = fifo_size;
 508        fb->total = 0;
 509
 510        return fb;
 511}
 512
 513static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
 514{
 515        struct disk_conf *dc;
 516        unsigned int want;     /* The number of sectors we want in-flight */
 517        int req_sect; /* Number of sectors to request in this turn */
 518        int correction; /* Number of sectors more we need in-flight */
 519        int cps; /* correction per invocation of drbd_rs_controller() */
 520        int steps; /* Number of time steps to plan ahead */
 521        int curr_corr;
 522        int max_sect;
 523        struct fifo_buffer *plan;
 524
 525        dc = rcu_dereference(device->ldev->disk_conf);
 526        plan = rcu_dereference(device->rs_plan_s);
 527
 528        steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
 529
 530        if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
 531                want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
 532        } else { /* normal path */
 533                want = dc->c_fill_target ? dc->c_fill_target :
 534                        sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
 535        }
 536
 537        correction = want - device->rs_in_flight - plan->total;
 538
 539        /* Plan ahead */
 540        cps = correction / steps;
 541        fifo_add_val(plan, cps);
 542        plan->total += cps * steps;
 543
 544        /* What we do in this step */
 545        curr_corr = fifo_push(plan, 0);
 546        plan->total -= curr_corr;
 547
 548        req_sect = sect_in + curr_corr;
 549        if (req_sect < 0)
 550                req_sect = 0;
 551
 552        max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
 553        if (req_sect > max_sect)
 554                req_sect = max_sect;
 555
 556        /*
 557        drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
 558                 sect_in, device->rs_in_flight, want, correction,
 559                 steps, cps, device->rs_planed, curr_corr, req_sect);
 560        */
 561
 562        return req_sect;
 563}
 564
 565static int drbd_rs_number_requests(struct drbd_device *device)
 566{
 567        unsigned int sect_in;  /* Number of sectors that came in since the last turn */
 568        int number, mxb;
 569
 570        sect_in = atomic_xchg(&device->rs_sect_in, 0);
 571        device->rs_in_flight -= sect_in;
 572
 573        rcu_read_lock();
 574        mxb = drbd_get_max_buffers(device) / 2;
 575        if (rcu_dereference(device->rs_plan_s)->size) {
 576                number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
 577                device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
 578        } else {
 579                device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
 580                number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
 581        }
 582        rcu_read_unlock();
 583
 584        /* Don't have more than "max-buffers"/2 in-flight.
 585         * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
 586         * potentially causing a distributed deadlock on congestion during
 587         * online-verify or (checksum-based) resync, if max-buffers,
 588         * socket buffer sizes and resync rate settings are mis-configured. */
 589
 590        /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
 591         * mxb (as used here, and in drbd_alloc_pages on the peer) is
 592         * "number of pages" (typically also 4k),
 593         * but "rs_in_flight" is in "sectors" (512 Byte). */
 594        if (mxb - device->rs_in_flight/8 < number)
 595                number = mxb - device->rs_in_flight/8;
 596
 597        return number;
 598}
 599
 600static int make_resync_request(struct drbd_device *const device, int cancel)
 601{
 602        struct drbd_peer_device *const peer_device = first_peer_device(device);
 603        struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
 604        unsigned long bit;
 605        sector_t sector;
 606        const sector_t capacity = drbd_get_capacity(device->this_bdev);
 607        int max_bio_size;
 608        int number, rollback_i, size;
 609        int align, requeue = 0;
 610        int i = 0;
 611        int discard_granularity = 0;
 612
 613        if (unlikely(cancel))
 614                return 0;
 615
 616        if (device->rs_total == 0) {
 617                /* empty resync? */
 618                drbd_resync_finished(device);
 619                return 0;
 620        }
 621
 622        if (!get_ldev(device)) {
 623                /* Since we only need to access device->rsync a
 624                   get_ldev_if_state(device,D_FAILED) would be sufficient, but
 625                   to continue resync with a broken disk makes no sense at
 626                   all */
 627                drbd_err(device, "Disk broke down during resync!\n");
 628                return 0;
 629        }
 630
 631        if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
 632                rcu_read_lock();
 633                discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
 634                rcu_read_unlock();
 635        }
 636
 637        max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
 638        number = drbd_rs_number_requests(device);
 639        if (number <= 0)
 640                goto requeue;
 641
 642        for (i = 0; i < number; i++) {
 643                /* Stop generating RS requests when half of the send buffer is filled,
 644                 * but notify TCP that we'd like to have more space. */
 645                mutex_lock(&connection->data.mutex);
 646                if (connection->data.socket) {
 647                        struct sock *sk = connection->data.socket->sk;
 648                        int queued = sk->sk_wmem_queued;
 649                        int sndbuf = sk->sk_sndbuf;
 650                        if (queued > sndbuf / 2) {
 651                                requeue = 1;
 652                                if (sk->sk_socket)
 653                                        set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 654                        }
 655                } else
 656                        requeue = 1;
 657                mutex_unlock(&connection->data.mutex);
 658                if (requeue)
 659                        goto requeue;
 660
 661next_sector:
 662                size = BM_BLOCK_SIZE;
 663                bit  = drbd_bm_find_next(device, device->bm_resync_fo);
 664
 665                if (bit == DRBD_END_OF_BITMAP) {
 666                        device->bm_resync_fo = drbd_bm_bits(device);
 667                        put_ldev(device);
 668                        return 0;
 669                }
 670
 671                sector = BM_BIT_TO_SECT(bit);
 672
 673                if (drbd_try_rs_begin_io(device, sector)) {
 674                        device->bm_resync_fo = bit;
 675                        goto requeue;
 676                }
 677                device->bm_resync_fo = bit + 1;
 678
 679                if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
 680                        drbd_rs_complete_io(device, sector);
 681                        goto next_sector;
 682                }
 683
 684#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
 685                /* try to find some adjacent bits.
 686                 * we stop if we have already the maximum req size.
 687                 *
 688                 * Additionally always align bigger requests, in order to
 689                 * be prepared for all stripe sizes of software RAIDs.
 690                 */
 691                align = 1;
 692                rollback_i = i;
 693                while (i < number) {
 694                        if (size + BM_BLOCK_SIZE > max_bio_size)
 695                                break;
 696
 697                        /* Be always aligned */
 698                        if (sector & ((1<<(align+3))-1))
 699                                break;
 700
 701                        if (discard_granularity && size == discard_granularity)
 702                                break;
 703
 704                        /* do not cross extent boundaries */
 705                        if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
 706                                break;
 707                        /* now, is it actually dirty, after all?
 708                         * caution, drbd_bm_test_bit is tri-state for some
 709                         * obscure reason; ( b == 0 ) would get the out-of-band
 710                         * only accidentally right because of the "oddly sized"
 711                         * adjustment below */
 712                        if (drbd_bm_test_bit(device, bit+1) != 1)
 713                                break;
 714                        bit++;
 715                        size += BM_BLOCK_SIZE;
 716                        if ((BM_BLOCK_SIZE << align) <= size)
 717                                align++;
 718                        i++;
 719                }
 720                /* if we merged some,
 721                 * reset the offset to start the next drbd_bm_find_next from */
 722                if (size > BM_BLOCK_SIZE)
 723                        device->bm_resync_fo = bit + 1;
 724#endif
 725
 726                /* adjust very last sectors, in case we are oddly sized */
 727                if (sector + (size>>9) > capacity)
 728                        size = (capacity-sector)<<9;
 729
 730                if (device->use_csums) {
 731                        switch (read_for_csum(peer_device, sector, size)) {
 732                        case -EIO: /* Disk failure */
 733                                put_ldev(device);
 734                                return -EIO;
 735                        case -EAGAIN: /* allocation failed, or ldev busy */
 736                                drbd_rs_complete_io(device, sector);
 737                                device->bm_resync_fo = BM_SECT_TO_BIT(sector);
 738                                i = rollback_i;
 739                                goto requeue;
 740                        case 0:
 741                                /* everything ok */
 742                                break;
 743                        default:
 744                                BUG();
 745                        }
 746                } else {
 747                        int err;
 748
 749                        inc_rs_pending(device);
 750                        err = drbd_send_drequest(peer_device,
 751                                                 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
 752                                                 sector, size, ID_SYNCER);
 753                        if (err) {
 754                                drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
 755                                dec_rs_pending(device);
 756                                put_ldev(device);
 757                                return err;
 758                        }
 759                }
 760        }
 761
 762        if (device->bm_resync_fo >= drbd_bm_bits(device)) {
 763                /* last syncer _request_ was sent,
 764                 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
 765                 * next sync group will resume), as soon as we receive the last
 766                 * resync data block, and the last bit is cleared.
 767                 * until then resync "work" is "inactive" ...
 768                 */
 769                put_ldev(device);
 770                return 0;
 771        }
 772
 773 requeue:
 774        device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 775        mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 776        put_ldev(device);
 777        return 0;
 778}
 779
 780static int make_ov_request(struct drbd_device *device, int cancel)
 781{
 782        int number, i, size;
 783        sector_t sector;
 784        const sector_t capacity = drbd_get_capacity(device->this_bdev);
 785        bool stop_sector_reached = false;
 786
 787        if (unlikely(cancel))
 788                return 1;
 789
 790        number = drbd_rs_number_requests(device);
 791
 792        sector = device->ov_position;
 793        for (i = 0; i < number; i++) {
 794                if (sector >= capacity)
 795                        return 1;
 796
 797                /* We check for "finished" only in the reply path:
 798                 * w_e_end_ov_reply().
 799                 * We need to send at least one request out. */
 800                stop_sector_reached = i > 0
 801                        && verify_can_do_stop_sector(device)
 802                        && sector >= device->ov_stop_sector;
 803                if (stop_sector_reached)
 804                        break;
 805
 806                size = BM_BLOCK_SIZE;
 807
 808                if (drbd_try_rs_begin_io(device, sector)) {
 809                        device->ov_position = sector;
 810                        goto requeue;
 811                }
 812
 813                if (sector + (size>>9) > capacity)
 814                        size = (capacity-sector)<<9;
 815
 816                inc_rs_pending(device);
 817                if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
 818                        dec_rs_pending(device);
 819                        return 0;
 820                }
 821                sector += BM_SECT_PER_BIT;
 822        }
 823        device->ov_position = sector;
 824
 825 requeue:
 826        device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 827        if (i == 0 || !stop_sector_reached)
 828                mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 829        return 1;
 830}
 831
 832int w_ov_finished(struct drbd_work *w, int cancel)
 833{
 834        struct drbd_device_work *dw =
 835                container_of(w, struct drbd_device_work, w);
 836        struct drbd_device *device = dw->device;
 837        kfree(dw);
 838        ov_out_of_sync_print(device);
 839        drbd_resync_finished(device);
 840
 841        return 0;
 842}
 843
 844static int w_resync_finished(struct drbd_work *w, int cancel)
 845{
 846        struct drbd_device_work *dw =
 847                container_of(w, struct drbd_device_work, w);
 848        struct drbd_device *device = dw->device;
 849        kfree(dw);
 850
 851        drbd_resync_finished(device);
 852
 853        return 0;
 854}
 855
 856static void ping_peer(struct drbd_device *device)
 857{
 858        struct drbd_connection *connection = first_peer_device(device)->connection;
 859
 860        clear_bit(GOT_PING_ACK, &connection->flags);
 861        request_ping(connection);
 862        wait_event(connection->ping_wait,
 863                   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
 864}
 865
 866int drbd_resync_finished(struct drbd_device *device)
 867{
 868        struct drbd_connection *connection = first_peer_device(device)->connection;
 869        unsigned long db, dt, dbdt;
 870        unsigned long n_oos;
 871        union drbd_state os, ns;
 872        struct drbd_device_work *dw;
 873        char *khelper_cmd = NULL;
 874        int verify_done = 0;
 875
 876        /* Remove all elements from the resync LRU. Since future actions
 877         * might set bits in the (main) bitmap, then the entries in the
 878         * resync LRU would be wrong. */
 879        if (drbd_rs_del_all(device)) {
 880                /* In case this is not possible now, most probably because
 881                 * there are P_RS_DATA_REPLY Packets lingering on the worker's
 882                 * queue (or even the read operations for those packets
 883                 * is not finished by now).   Retry in 100ms. */
 884
 885                schedule_timeout_interruptible(HZ / 10);
 886                dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
 887                if (dw) {
 888                        dw->w.cb = w_resync_finished;
 889                        dw->device = device;
 890                        drbd_queue_work(&connection->sender_work, &dw->w);
 891                        return 1;
 892                }
 893                drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
 894        }
 895
 896        dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
 897        if (dt <= 0)
 898                dt = 1;
 899
 900        db = device->rs_total;
 901        /* adjust for verify start and stop sectors, respective reached position */
 902        if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
 903                db -= device->ov_left;
 904
 905        dbdt = Bit2KB(db/dt);
 906        device->rs_paused /= HZ;
 907
 908        if (!get_ldev(device))
 909                goto out;
 910
 911        ping_peer(device);
 912
 913        spin_lock_irq(&device->resource->req_lock);
 914        os = drbd_read_state(device);
 915
 916        verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
 917
 918        /* This protects us against multiple calls (that can happen in the presence
 919           of application IO), and against connectivity loss just before we arrive here. */
 920        if (os.conn <= C_CONNECTED)
 921                goto out_unlock;
 922
 923        ns = os;
 924        ns.conn = C_CONNECTED;
 925
 926        drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
 927             verify_done ? "Online verify" : "Resync",
 928             dt + device->rs_paused, device->rs_paused, dbdt);
 929
 930        n_oos = drbd_bm_total_weight(device);
 931
 932        if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
 933                if (n_oos) {
 934                        drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
 935                              n_oos, Bit2KB(1));
 936                        khelper_cmd = "out-of-sync";
 937                }
 938        } else {
 939                D_ASSERT(device, (n_oos - device->rs_failed) == 0);
 940
 941                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
 942                        khelper_cmd = "after-resync-target";
 943
 944                if (device->use_csums && device->rs_total) {
 945                        const unsigned long s = device->rs_same_csum;
 946                        const unsigned long t = device->rs_total;
 947                        const int ratio =
 948                                (t == 0)     ? 0 :
 949                        (t < 100000) ? ((s*100)/t) : (s/(t/100));
 950                        drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
 951                             "transferred %luK total %luK\n",
 952                             ratio,
 953                             Bit2KB(device->rs_same_csum),
 954                             Bit2KB(device->rs_total - device->rs_same_csum),
 955                             Bit2KB(device->rs_total));
 956                }
 957        }
 958
 959        if (device->rs_failed) {
 960                drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
 961
 962                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 963                        ns.disk = D_INCONSISTENT;
 964                        ns.pdsk = D_UP_TO_DATE;
 965                } else {
 966                        ns.disk = D_UP_TO_DATE;
 967                        ns.pdsk = D_INCONSISTENT;
 968                }
 969        } else {
 970                ns.disk = D_UP_TO_DATE;
 971                ns.pdsk = D_UP_TO_DATE;
 972
 973                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 974                        if (device->p_uuid) {
 975                                int i;
 976                                for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
 977                                        _drbd_uuid_set(device, i, device->p_uuid[i]);
 978                                drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
 979                                _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
 980                        } else {
 981                                drbd_err(device, "device->p_uuid is NULL! BUG\n");
 982                        }
 983                }
 984
 985                if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
 986                        /* for verify runs, we don't update uuids here,
 987                         * so there would be nothing to report. */
 988                        drbd_uuid_set_bm(device, 0UL);
 989                        drbd_print_uuids(device, "updated UUIDs");
 990                        if (device->p_uuid) {
 991                                /* Now the two UUID sets are equal, update what we
 992                                 * know of the peer. */
 993                                int i;
 994                                for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
 995                                        device->p_uuid[i] = device->ldev->md.uuid[i];
 996                        }
 997                }
 998        }
 999
1000        _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1001out_unlock:
1002        spin_unlock_irq(&device->resource->req_lock);
1003
1004        /* If we have been sync source, and have an effective fencing-policy,
1005         * once *all* volumes are back in sync, call "unfence". */
1006        if (os.conn == C_SYNC_SOURCE) {
1007                enum drbd_disk_state disk_state = D_MASK;
1008                enum drbd_disk_state pdsk_state = D_MASK;
1009                enum drbd_fencing_p fp = FP_DONT_CARE;
1010
1011                rcu_read_lock();
1012                fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1013                if (fp != FP_DONT_CARE) {
1014                        struct drbd_peer_device *peer_device;
1015                        int vnr;
1016                        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1017                                struct drbd_device *device = peer_device->device;
1018                                disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1019                                pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1020                        }
1021                }
1022                rcu_read_unlock();
1023                if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1024                        conn_khelper(connection, "unfence-peer");
1025        }
1026
1027        put_ldev(device);
1028out:
1029        device->rs_total  = 0;
1030        device->rs_failed = 0;
1031        device->rs_paused = 0;
1032
1033        /* reset start sector, if we reached end of device */
1034        if (verify_done && device->ov_left == 0)
1035                device->ov_start_sector = 0;
1036
1037        drbd_md_sync(device);
1038
1039        if (khelper_cmd)
1040                drbd_khelper(device, khelper_cmd);
1041
1042        return 1;
1043}
1044
1045/* helper */
1046static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1047{
1048        if (drbd_peer_req_has_active_page(peer_req)) {
1049                /* This might happen if sendpage() has not finished */
1050                int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1051                atomic_add(i, &device->pp_in_use_by_net);
1052                atomic_sub(i, &device->pp_in_use);
1053                spin_lock_irq(&device->resource->req_lock);
1054                list_add_tail(&peer_req->w.list, &device->net_ee);
1055                spin_unlock_irq(&device->resource->req_lock);
1056                wake_up(&drbd_pp_wait);
1057        } else
1058                drbd_free_peer_req(device, peer_req);
1059}
1060
1061/**
1062 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1063 * @w:          work object.
1064 * @cancel:     The connection will be closed anyways
1065 */
1066int w_e_end_data_req(struct drbd_work *w, int cancel)
1067{
1068        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1069        struct drbd_peer_device *peer_device = peer_req->peer_device;
1070        struct drbd_device *device = peer_device->device;
1071        int err;
1072
1073        if (unlikely(cancel)) {
1074                drbd_free_peer_req(device, peer_req);
1075                dec_unacked(device);
1076                return 0;
1077        }
1078
1079        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1080                err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1081        } else {
1082                if (__ratelimit(&drbd_ratelimit_state))
1083                        drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1084                            (unsigned long long)peer_req->i.sector);
1085
1086                err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1087        }
1088
1089        dec_unacked(device);
1090
1091        move_to_net_ee_or_free(device, peer_req);
1092
1093        if (unlikely(err))
1094                drbd_err(device, "drbd_send_block() failed\n");
1095        return err;
1096}
1097
1098static bool all_zero(struct drbd_peer_request *peer_req)
1099{
1100        struct page *page = peer_req->pages;
1101        unsigned int len = peer_req->i.size;
1102
1103        page_chain_for_each(page) {
1104                unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1105                unsigned int i, words = l / sizeof(long);
1106                unsigned long *d;
1107
1108                d = kmap_atomic(page);
1109                for (i = 0; i < words; i++) {
1110                        if (d[i]) {
1111                                kunmap_atomic(d);
1112                                return false;
1113                        }
1114                }
1115                kunmap_atomic(d);
1116                len -= l;
1117        }
1118
1119        return true;
1120}
1121
1122/**
1123 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1124 * @w:          work object.
1125 * @cancel:     The connection will be closed anyways
1126 */
1127int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1128{
1129        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1130        struct drbd_peer_device *peer_device = peer_req->peer_device;
1131        struct drbd_device *device = peer_device->device;
1132        int err;
1133
1134        if (unlikely(cancel)) {
1135                drbd_free_peer_req(device, peer_req);
1136                dec_unacked(device);
1137                return 0;
1138        }
1139
1140        if (get_ldev_if_state(device, D_FAILED)) {
1141                drbd_rs_complete_io(device, peer_req->i.sector);
1142                put_ldev(device);
1143        }
1144
1145        if (device->state.conn == C_AHEAD) {
1146                err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1147        } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1148                if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1149                        inc_rs_pending(device);
1150                        if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1151                                err = drbd_send_rs_deallocated(peer_device, peer_req);
1152                        else
1153                                err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1154                } else {
1155                        if (__ratelimit(&drbd_ratelimit_state))
1156                                drbd_err(device, "Not sending RSDataReply, "
1157                                    "partner DISKLESS!\n");
1158                        err = 0;
1159                }
1160        } else {
1161                if (__ratelimit(&drbd_ratelimit_state))
1162                        drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1163                            (unsigned long long)peer_req->i.sector);
1164
1165                err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1166
1167                /* update resync data with failure */
1168                drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1169        }
1170
1171        dec_unacked(device);
1172
1173        move_to_net_ee_or_free(device, peer_req);
1174
1175        if (unlikely(err))
1176                drbd_err(device, "drbd_send_block() failed\n");
1177        return err;
1178}
1179
1180int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1181{
1182        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1183        struct drbd_peer_device *peer_device = peer_req->peer_device;
1184        struct drbd_device *device = peer_device->device;
1185        struct digest_info *di;
1186        int digest_size;
1187        void *digest = NULL;
1188        int err, eq = 0;
1189
1190        if (unlikely(cancel)) {
1191                drbd_free_peer_req(device, peer_req);
1192                dec_unacked(device);
1193                return 0;
1194        }
1195
1196        if (get_ldev(device)) {
1197                drbd_rs_complete_io(device, peer_req->i.sector);
1198                put_ldev(device);
1199        }
1200
1201        di = peer_req->digest;
1202
1203        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1204                /* quick hack to try to avoid a race against reconfiguration.
1205                 * a real fix would be much more involved,
1206                 * introducing more locking mechanisms */
1207                if (peer_device->connection->csums_tfm) {
1208                        digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
1209                        D_ASSERT(device, digest_size == di->digest_size);
1210                        digest = kmalloc(digest_size, GFP_NOIO);
1211                }
1212                if (digest) {
1213                        drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1214                        eq = !memcmp(digest, di->digest, digest_size);
1215                        kfree(digest);
1216                }
1217
1218                if (eq) {
1219                        drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1220                        /* rs_same_csums unit is BM_BLOCK_SIZE */
1221                        device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1222                        err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1223                } else {
1224                        inc_rs_pending(device);
1225                        peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1226                        peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1227                        kfree(di);
1228                        err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1229                }
1230        } else {
1231                err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1232                if (__ratelimit(&drbd_ratelimit_state))
1233                        drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1234        }
1235
1236        dec_unacked(device);
1237        move_to_net_ee_or_free(device, peer_req);
1238
1239        if (unlikely(err))
1240                drbd_err(device, "drbd_send_block/ack() failed\n");
1241        return err;
1242}
1243
1244int w_e_end_ov_req(struct drbd_work *w, int cancel)
1245{
1246        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1247        struct drbd_peer_device *peer_device = peer_req->peer_device;
1248        struct drbd_device *device = peer_device->device;
1249        sector_t sector = peer_req->i.sector;
1250        unsigned int size = peer_req->i.size;
1251        int digest_size;
1252        void *digest;
1253        int err = 0;
1254
1255        if (unlikely(cancel))
1256                goto out;
1257
1258        digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1259        digest = kmalloc(digest_size, GFP_NOIO);
1260        if (!digest) {
1261                err = 1;        /* terminate the connection in case the allocation failed */
1262                goto out;
1263        }
1264
1265        if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1266                drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1267        else
1268                memset(digest, 0, digest_size);
1269
1270        /* Free e and pages before send.
1271         * In case we block on congestion, we could otherwise run into
1272         * some distributed deadlock, if the other side blocks on
1273         * congestion as well, because our receiver blocks in
1274         * drbd_alloc_pages due to pp_in_use > max_buffers. */
1275        drbd_free_peer_req(device, peer_req);
1276        peer_req = NULL;
1277        inc_rs_pending(device);
1278        err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1279        if (err)
1280                dec_rs_pending(device);
1281        kfree(digest);
1282
1283out:
1284        if (peer_req)
1285                drbd_free_peer_req(device, peer_req);
1286        dec_unacked(device);
1287        return err;
1288}
1289
1290void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1291{
1292        if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1293                device->ov_last_oos_size += size>>9;
1294        } else {
1295                device->ov_last_oos_start = sector;
1296                device->ov_last_oos_size = size>>9;
1297        }
1298        drbd_set_out_of_sync(device, sector, size);
1299}
1300
1301int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1302{
1303        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1304        struct drbd_peer_device *peer_device = peer_req->peer_device;
1305        struct drbd_device *device = peer_device->device;
1306        struct digest_info *di;
1307        void *digest;
1308        sector_t sector = peer_req->i.sector;
1309        unsigned int size = peer_req->i.size;
1310        int digest_size;
1311        int err, eq = 0;
1312        bool stop_sector_reached = false;
1313
1314        if (unlikely(cancel)) {
1315                drbd_free_peer_req(device, peer_req);
1316                dec_unacked(device);
1317                return 0;
1318        }
1319
1320        /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1321         * the resync lru has been cleaned up already */
1322        if (get_ldev(device)) {
1323                drbd_rs_complete_io(device, peer_req->i.sector);
1324                put_ldev(device);
1325        }
1326
1327        di = peer_req->digest;
1328
1329        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1330                digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1331                digest = kmalloc(digest_size, GFP_NOIO);
1332                if (digest) {
1333                        drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1334
1335                        D_ASSERT(device, digest_size == di->digest_size);
1336                        eq = !memcmp(digest, di->digest, digest_size);
1337                        kfree(digest);
1338                }
1339        }
1340
1341        /* Free peer_req and pages before send.
1342         * In case we block on congestion, we could otherwise run into
1343         * some distributed deadlock, if the other side blocks on
1344         * congestion as well, because our receiver blocks in
1345         * drbd_alloc_pages due to pp_in_use > max_buffers. */
1346        drbd_free_peer_req(device, peer_req);
1347        if (!eq)
1348                drbd_ov_out_of_sync_found(device, sector, size);
1349        else
1350                ov_out_of_sync_print(device);
1351
1352        err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1353                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1354
1355        dec_unacked(device);
1356
1357        --device->ov_left;
1358
1359        /* let's advance progress step marks only for every other megabyte */
1360        if ((device->ov_left & 0x200) == 0x200)
1361                drbd_advance_rs_marks(device, device->ov_left);
1362
1363        stop_sector_reached = verify_can_do_stop_sector(device) &&
1364                (sector + (size>>9)) >= device->ov_stop_sector;
1365
1366        if (device->ov_left == 0 || stop_sector_reached) {
1367                ov_out_of_sync_print(device);
1368                drbd_resync_finished(device);
1369        }
1370
1371        return err;
1372}
1373
1374/* FIXME
1375 * We need to track the number of pending barrier acks,
1376 * and to be able to wait for them.
1377 * See also comment in drbd_adm_attach before drbd_suspend_io.
1378 */
1379static int drbd_send_barrier(struct drbd_connection *connection)
1380{
1381        struct p_barrier *p;
1382        struct drbd_socket *sock;
1383
1384        sock = &connection->data;
1385        p = conn_prepare_command(connection, sock);
1386        if (!p)
1387                return -EIO;
1388        p->barrier = connection->send.current_epoch_nr;
1389        p->pad = 0;
1390        connection->send.current_epoch_writes = 0;
1391        connection->send.last_sent_barrier_jif = jiffies;
1392
1393        return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1394}
1395
1396static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1397{
1398        struct drbd_socket *sock = &pd->connection->data;
1399        if (!drbd_prepare_command(pd, sock))
1400                return -EIO;
1401        return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1402}
1403
1404int w_send_write_hint(struct drbd_work *w, int cancel)
1405{
1406        struct drbd_device *device =
1407                container_of(w, struct drbd_device, unplug_work);
1408
1409        if (cancel)
1410                return 0;
1411        return pd_send_unplug_remote(first_peer_device(device));
1412}
1413
1414static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1415{
1416        if (!connection->send.seen_any_write_yet) {
1417                connection->send.seen_any_write_yet = true;
1418                connection->send.current_epoch_nr = epoch;
1419                connection->send.current_epoch_writes = 0;
1420                connection->send.last_sent_barrier_jif = jiffies;
1421        }
1422}
1423
1424static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1425{
1426        /* re-init if first write on this connection */
1427        if (!connection->send.seen_any_write_yet)
1428                return;
1429        if (connection->send.current_epoch_nr != epoch) {
1430                if (connection->send.current_epoch_writes)
1431                        drbd_send_barrier(connection);
1432                connection->send.current_epoch_nr = epoch;
1433        }
1434}
1435
1436int w_send_out_of_sync(struct drbd_work *w, int cancel)
1437{
1438        struct drbd_request *req = container_of(w, struct drbd_request, w);
1439        struct drbd_device *device = req->device;
1440        struct drbd_peer_device *const peer_device = first_peer_device(device);
1441        struct drbd_connection *const connection = peer_device->connection;
1442        int err;
1443
1444        if (unlikely(cancel)) {
1445                req_mod(req, SEND_CANCELED);
1446                return 0;
1447        }
1448        req->pre_send_jif = jiffies;
1449
1450        /* this time, no connection->send.current_epoch_writes++;
1451         * If it was sent, it was the closing barrier for the last
1452         * replicated epoch, before we went into AHEAD mode.
1453         * No more barriers will be sent, until we leave AHEAD mode again. */
1454        maybe_send_barrier(connection, req->epoch);
1455
1456        err = drbd_send_out_of_sync(peer_device, req);
1457        req_mod(req, OOS_HANDED_TO_NETWORK);
1458
1459        return err;
1460}
1461
1462/**
1463 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1464 * @w:          work object.
1465 * @cancel:     The connection will be closed anyways
1466 */
1467int w_send_dblock(struct drbd_work *w, int cancel)
1468{
1469        struct drbd_request *req = container_of(w, struct drbd_request, w);
1470        struct drbd_device *device = req->device;
1471        struct drbd_peer_device *const peer_device = first_peer_device(device);
1472        struct drbd_connection *connection = peer_device->connection;
1473        bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1474        int err;
1475
1476        if (unlikely(cancel)) {
1477                req_mod(req, SEND_CANCELED);
1478                return 0;
1479        }
1480        req->pre_send_jif = jiffies;
1481
1482        re_init_if_first_write(connection, req->epoch);
1483        maybe_send_barrier(connection, req->epoch);
1484        connection->send.current_epoch_writes++;
1485
1486        err = drbd_send_dblock(peer_device, req);
1487        req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1488
1489        if (do_send_unplug && !err)
1490                pd_send_unplug_remote(peer_device);
1491
1492        return err;
1493}
1494
1495/**
1496 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1497 * @w:          work object.
1498 * @cancel:     The connection will be closed anyways
1499 */
1500int w_send_read_req(struct drbd_work *w, int cancel)
1501{
1502        struct drbd_request *req = container_of(w, struct drbd_request, w);
1503        struct drbd_device *device = req->device;
1504        struct drbd_peer_device *const peer_device = first_peer_device(device);
1505        struct drbd_connection *connection = peer_device->connection;
1506        bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1507        int err;
1508
1509        if (unlikely(cancel)) {
1510                req_mod(req, SEND_CANCELED);
1511                return 0;
1512        }
1513        req->pre_send_jif = jiffies;
1514
1515        /* Even read requests may close a write epoch,
1516         * if there was any yet. */
1517        maybe_send_barrier(connection, req->epoch);
1518
1519        err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1520                                 (unsigned long)req);
1521
1522        req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1523
1524        if (do_send_unplug && !err)
1525                pd_send_unplug_remote(peer_device);
1526
1527        return err;
1528}
1529
1530int w_restart_disk_io(struct drbd_work *w, int cancel)
1531{
1532        struct drbd_request *req = container_of(w, struct drbd_request, w);
1533        struct drbd_device *device = req->device;
1534
1535        if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1536                drbd_al_begin_io(device, &req->i);
1537
1538        drbd_req_make_private_bio(req, req->master_bio);
1539        bio_set_dev(req->private_bio, device->ldev->backing_bdev);
1540        generic_make_request(req->private_bio);
1541
1542        return 0;
1543}
1544
1545static int _drbd_may_sync_now(struct drbd_device *device)
1546{
1547        struct drbd_device *odev = device;
1548        int resync_after;
1549
1550        while (1) {
1551                if (!odev->ldev || odev->state.disk == D_DISKLESS)
1552                        return 1;
1553                rcu_read_lock();
1554                resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1555                rcu_read_unlock();
1556                if (resync_after == -1)
1557                        return 1;
1558                odev = minor_to_device(resync_after);
1559                if (!odev)
1560                        return 1;
1561                if ((odev->state.conn >= C_SYNC_SOURCE &&
1562                     odev->state.conn <= C_PAUSED_SYNC_T) ||
1563                    odev->state.aftr_isp || odev->state.peer_isp ||
1564                    odev->state.user_isp)
1565                        return 0;
1566        }
1567}
1568
1569/**
1570 * drbd_pause_after() - Pause resync on all devices that may not resync now
1571 * @device:     DRBD device.
1572 *
1573 * Called from process context only (admin command and after_state_ch).
1574 */
1575static bool drbd_pause_after(struct drbd_device *device)
1576{
1577        bool changed = false;
1578        struct drbd_device *odev;
1579        int i;
1580
1581        rcu_read_lock();
1582        idr_for_each_entry(&drbd_devices, odev, i) {
1583                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1584                        continue;
1585                if (!_drbd_may_sync_now(odev) &&
1586                    _drbd_set_state(_NS(odev, aftr_isp, 1),
1587                                    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1588                        changed = true;
1589        }
1590        rcu_read_unlock();
1591
1592        return changed;
1593}
1594
1595/**
1596 * drbd_resume_next() - Resume resync on all devices that may resync now
1597 * @device:     DRBD device.
1598 *
1599 * Called from process context only (admin command and worker).
1600 */
1601static bool drbd_resume_next(struct drbd_device *device)
1602{
1603        bool changed = false;
1604        struct drbd_device *odev;
1605        int i;
1606
1607        rcu_read_lock();
1608        idr_for_each_entry(&drbd_devices, odev, i) {
1609                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1610                        continue;
1611                if (odev->state.aftr_isp) {
1612                        if (_drbd_may_sync_now(odev) &&
1613                            _drbd_set_state(_NS(odev, aftr_isp, 0),
1614                                            CS_HARD, NULL) != SS_NOTHING_TO_DO)
1615                                changed = true;
1616                }
1617        }
1618        rcu_read_unlock();
1619        return changed;
1620}
1621
1622void resume_next_sg(struct drbd_device *device)
1623{
1624        lock_all_resources();
1625        drbd_resume_next(device);
1626        unlock_all_resources();
1627}
1628
1629void suspend_other_sg(struct drbd_device *device)
1630{
1631        lock_all_resources();
1632        drbd_pause_after(device);
1633        unlock_all_resources();
1634}
1635
1636/* caller must lock_all_resources() */
1637enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1638{
1639        struct drbd_device *odev;
1640        int resync_after;
1641
1642        if (o_minor == -1)
1643                return NO_ERROR;
1644        if (o_minor < -1 || o_minor > MINORMASK)
1645                return ERR_RESYNC_AFTER;
1646
1647        /* check for loops */
1648        odev = minor_to_device(o_minor);
1649        while (1) {
1650                if (odev == device)
1651                        return ERR_RESYNC_AFTER_CYCLE;
1652
1653                /* You are free to depend on diskless, non-existing,
1654                 * or not yet/no longer existing minors.
1655                 * We only reject dependency loops.
1656                 * We cannot follow the dependency chain beyond a detached or
1657                 * missing minor.
1658                 */
1659                if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1660                        return NO_ERROR;
1661
1662                rcu_read_lock();
1663                resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1664                rcu_read_unlock();
1665                /* dependency chain ends here, no cycles. */
1666                if (resync_after == -1)
1667                        return NO_ERROR;
1668
1669                /* follow the dependency chain */
1670                odev = minor_to_device(resync_after);
1671        }
1672}
1673
1674/* caller must lock_all_resources() */
1675void drbd_resync_after_changed(struct drbd_device *device)
1676{
1677        int changed;
1678
1679        do {
1680                changed  = drbd_pause_after(device);
1681                changed |= drbd_resume_next(device);
1682        } while (changed);
1683}
1684
1685void drbd_rs_controller_reset(struct drbd_device *device)
1686{
1687        struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1688        struct fifo_buffer *plan;
1689
1690        atomic_set(&device->rs_sect_in, 0);
1691        atomic_set(&device->rs_sect_ev, 0);
1692        device->rs_in_flight = 0;
1693        device->rs_last_events = (int)part_stat_read_accum(&disk->part0, sectors);
1694
1695        /* Updating the RCU protected object in place is necessary since
1696           this function gets called from atomic context.
1697           It is valid since all other updates also lead to an completely
1698           empty fifo */
1699        rcu_read_lock();
1700        plan = rcu_dereference(device->rs_plan_s);
1701        plan->total = 0;
1702        fifo_set(plan, 0);
1703        rcu_read_unlock();
1704}
1705
1706void start_resync_timer_fn(struct timer_list *t)
1707{
1708        struct drbd_device *device = from_timer(device, t, start_resync_timer);
1709        drbd_device_post_work(device, RS_START);
1710}
1711
1712static void do_start_resync(struct drbd_device *device)
1713{
1714        if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1715                drbd_warn(device, "postponing start_resync ...\n");
1716                device->start_resync_timer.expires = jiffies + HZ/10;
1717                add_timer(&device->start_resync_timer);
1718                return;
1719        }
1720
1721        drbd_start_resync(device, C_SYNC_SOURCE);
1722        clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1723}
1724
1725static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1726{
1727        bool csums_after_crash_only;
1728        rcu_read_lock();
1729        csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1730        rcu_read_unlock();
1731        return connection->agreed_pro_version >= 89 &&          /* supported? */
1732                connection->csums_tfm &&                        /* configured? */
1733                (csums_after_crash_only == false                /* use for each resync? */
1734                 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1735}
1736
1737/**
1738 * drbd_start_resync() - Start the resync process
1739 * @device:     DRBD device.
1740 * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1741 *
1742 * This function might bring you directly into one of the
1743 * C_PAUSED_SYNC_* states.
1744 */
1745void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1746{
1747        struct drbd_peer_device *peer_device = first_peer_device(device);
1748        struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1749        union drbd_state ns;
1750        int r;
1751
1752        if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1753                drbd_err(device, "Resync already running!\n");
1754                return;
1755        }
1756
1757        if (!connection) {
1758                drbd_err(device, "No connection to peer, aborting!\n");
1759                return;
1760        }
1761
1762        if (!test_bit(B_RS_H_DONE, &device->flags)) {
1763                if (side == C_SYNC_TARGET) {
1764                        /* Since application IO was locked out during C_WF_BITMAP_T and
1765                           C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1766                           we check that we might make the data inconsistent. */
1767                        r = drbd_khelper(device, "before-resync-target");
1768                        r = (r >> 8) & 0xff;
1769                        if (r > 0) {
1770                                drbd_info(device, "before-resync-target handler returned %d, "
1771                                         "dropping connection.\n", r);
1772                                conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1773                                return;
1774                        }
1775                } else /* C_SYNC_SOURCE */ {
1776                        r = drbd_khelper(device, "before-resync-source");
1777                        r = (r >> 8) & 0xff;
1778                        if (r > 0) {
1779                                if (r == 3) {
1780                                        drbd_info(device, "before-resync-source handler returned %d, "
1781                                                 "ignoring. Old userland tools?", r);
1782                                } else {
1783                                        drbd_info(device, "before-resync-source handler returned %d, "
1784                                                 "dropping connection.\n", r);
1785                                        conn_request_state(connection,
1786                                                           NS(conn, C_DISCONNECTING), CS_HARD);
1787                                        return;
1788                                }
1789                        }
1790                }
1791        }
1792
1793        if (current == connection->worker.task) {
1794                /* The worker should not sleep waiting for state_mutex,
1795                   that can take long */
1796                if (!mutex_trylock(device->state_mutex)) {
1797                        set_bit(B_RS_H_DONE, &device->flags);
1798                        device->start_resync_timer.expires = jiffies + HZ/5;
1799                        add_timer(&device->start_resync_timer);
1800                        return;
1801                }
1802        } else {
1803                mutex_lock(device->state_mutex);
1804        }
1805
1806        lock_all_resources();
1807        clear_bit(B_RS_H_DONE, &device->flags);
1808        /* Did some connection breakage or IO error race with us? */
1809        if (device->state.conn < C_CONNECTED
1810        || !get_ldev_if_state(device, D_NEGOTIATING)) {
1811                unlock_all_resources();
1812                goto out;
1813        }
1814
1815        ns = drbd_read_state(device);
1816
1817        ns.aftr_isp = !_drbd_may_sync_now(device);
1818
1819        ns.conn = side;
1820
1821        if (side == C_SYNC_TARGET)
1822                ns.disk = D_INCONSISTENT;
1823        else /* side == C_SYNC_SOURCE */
1824                ns.pdsk = D_INCONSISTENT;
1825
1826        r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1827        ns = drbd_read_state(device);
1828
1829        if (ns.conn < C_CONNECTED)
1830                r = SS_UNKNOWN_ERROR;
1831
1832        if (r == SS_SUCCESS) {
1833                unsigned long tw = drbd_bm_total_weight(device);
1834                unsigned long now = jiffies;
1835                int i;
1836
1837                device->rs_failed    = 0;
1838                device->rs_paused    = 0;
1839                device->rs_same_csum = 0;
1840                device->rs_last_sect_ev = 0;
1841                device->rs_total     = tw;
1842                device->rs_start     = now;
1843                for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1844                        device->rs_mark_left[i] = tw;
1845                        device->rs_mark_time[i] = now;
1846                }
1847                drbd_pause_after(device);
1848                /* Forget potentially stale cached per resync extent bit-counts.
1849                 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1850                 * disabled, and know the disk state is ok. */
1851                spin_lock(&device->al_lock);
1852                lc_reset(device->resync);
1853                device->resync_locked = 0;
1854                device->resync_wenr = LC_FREE;
1855                spin_unlock(&device->al_lock);
1856        }
1857        unlock_all_resources();
1858
1859        if (r == SS_SUCCESS) {
1860                wake_up(&device->al_wait); /* for lc_reset() above */
1861                /* reset rs_last_bcast when a resync or verify is started,
1862                 * to deal with potential jiffies wrap. */
1863                device->rs_last_bcast = jiffies - HZ;
1864
1865                drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1866                     drbd_conn_str(ns.conn),
1867                     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1868                     (unsigned long) device->rs_total);
1869                if (side == C_SYNC_TARGET) {
1870                        device->bm_resync_fo = 0;
1871                        device->use_csums = use_checksum_based_resync(connection, device);
1872                } else {
1873                        device->use_csums = false;
1874                }
1875
1876                /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1877                 * with w_send_oos, or the sync target will get confused as to
1878                 * how much bits to resync.  We cannot do that always, because for an
1879                 * empty resync and protocol < 95, we need to do it here, as we call
1880                 * drbd_resync_finished from here in that case.
1881                 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1882                 * and from after_state_ch otherwise. */
1883                if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1884                        drbd_gen_and_send_sync_uuid(peer_device);
1885
1886                if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1887                        /* This still has a race (about when exactly the peers
1888                         * detect connection loss) that can lead to a full sync
1889                         * on next handshake. In 8.3.9 we fixed this with explicit
1890                         * resync-finished notifications, but the fix
1891                         * introduces a protocol change.  Sleeping for some
1892                         * time longer than the ping interval + timeout on the
1893                         * SyncSource, to give the SyncTarget the chance to
1894                         * detect connection loss, then waiting for a ping
1895                         * response (implicit in drbd_resync_finished) reduces
1896                         * the race considerably, but does not solve it. */
1897                        if (side == C_SYNC_SOURCE) {
1898                                struct net_conf *nc;
1899                                int timeo;
1900
1901                                rcu_read_lock();
1902                                nc = rcu_dereference(connection->net_conf);
1903                                timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1904                                rcu_read_unlock();
1905                                schedule_timeout_interruptible(timeo);
1906                        }
1907                        drbd_resync_finished(device);
1908                }
1909
1910                drbd_rs_controller_reset(device);
1911                /* ns.conn may already be != device->state.conn,
1912                 * we may have been paused in between, or become paused until
1913                 * the timer triggers.
1914                 * No matter, that is handled in resync_timer_fn() */
1915                if (ns.conn == C_SYNC_TARGET)
1916                        mod_timer(&device->resync_timer, jiffies);
1917
1918                drbd_md_sync(device);
1919        }
1920        put_ldev(device);
1921out:
1922        mutex_unlock(device->state_mutex);
1923}
1924
1925static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1926{
1927        struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1928        device->rs_last_bcast = jiffies;
1929
1930        if (!get_ldev(device))
1931                return;
1932
1933        drbd_bm_write_lazy(device, 0);
1934        if (resync_done && is_sync_state(device->state.conn))
1935                drbd_resync_finished(device);
1936
1937        drbd_bcast_event(device, &sib);
1938        /* update timestamp, in case it took a while to write out stuff */
1939        device->rs_last_bcast = jiffies;
1940        put_ldev(device);
1941}
1942
1943static void drbd_ldev_destroy(struct drbd_device *device)
1944{
1945        lc_destroy(device->resync);
1946        device->resync = NULL;
1947        lc_destroy(device->act_log);
1948        device->act_log = NULL;
1949
1950        __acquire(local);
1951        drbd_backing_dev_free(device, device->ldev);
1952        device->ldev = NULL;
1953        __release(local);
1954
1955        clear_bit(GOING_DISKLESS, &device->flags);
1956        wake_up(&device->misc_wait);
1957}
1958
1959static void go_diskless(struct drbd_device *device)
1960{
1961        D_ASSERT(device, device->state.disk == D_FAILED);
1962        /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1963         * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1964         * the protected members anymore, though, so once put_ldev reaches zero
1965         * again, it will be safe to free them. */
1966
1967        /* Try to write changed bitmap pages, read errors may have just
1968         * set some bits outside the area covered by the activity log.
1969         *
1970         * If we have an IO error during the bitmap writeout,
1971         * we will want a full sync next time, just in case.
1972         * (Do we want a specific meta data flag for this?)
1973         *
1974         * If that does not make it to stable storage either,
1975         * we cannot do anything about that anymore.
1976         *
1977         * We still need to check if both bitmap and ldev are present, we may
1978         * end up here after a failed attach, before ldev was even assigned.
1979         */
1980        if (device->bitmap && device->ldev) {
1981                /* An interrupted resync or similar is allowed to recounts bits
1982                 * while we detach.
1983                 * Any modifications would not be expected anymore, though.
1984                 */
1985                if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1986                                        "detach", BM_LOCKED_TEST_ALLOWED)) {
1987                        if (test_bit(WAS_READ_ERROR, &device->flags)) {
1988                                drbd_md_set_flag(device, MDF_FULL_SYNC);
1989                                drbd_md_sync(device);
1990                        }
1991                }
1992        }
1993
1994        drbd_force_state(device, NS(disk, D_DISKLESS));
1995}
1996
1997static int do_md_sync(struct drbd_device *device)
1998{
1999        drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
2000        drbd_md_sync(device);
2001        return 0;
2002}
2003
2004/* only called from drbd_worker thread, no locking */
2005void __update_timing_details(
2006                struct drbd_thread_timing_details *tdp,
2007                unsigned int *cb_nr,
2008                void *cb,
2009                const char *fn, const unsigned int line)
2010{
2011        unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
2012        struct drbd_thread_timing_details *td = tdp + i;
2013
2014        td->start_jif = jiffies;
2015        td->cb_addr = cb;
2016        td->caller_fn = fn;
2017        td->line = line;
2018        td->cb_nr = *cb_nr;
2019
2020        i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2021        td = tdp + i;
2022        memset(td, 0, sizeof(*td));
2023
2024        ++(*cb_nr);
2025}
2026
2027static void do_device_work(struct drbd_device *device, const unsigned long todo)
2028{
2029        if (test_bit(MD_SYNC, &todo))
2030                do_md_sync(device);
2031        if (test_bit(RS_DONE, &todo) ||
2032            test_bit(RS_PROGRESS, &todo))
2033                update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2034        if (test_bit(GO_DISKLESS, &todo))
2035                go_diskless(device);
2036        if (test_bit(DESTROY_DISK, &todo))
2037                drbd_ldev_destroy(device);
2038        if (test_bit(RS_START, &todo))
2039                do_start_resync(device);
2040}
2041
2042#define DRBD_DEVICE_WORK_MASK   \
2043        ((1UL << GO_DISKLESS)   \
2044        |(1UL << DESTROY_DISK)  \
2045        |(1UL << MD_SYNC)       \
2046        |(1UL << RS_START)      \
2047        |(1UL << RS_PROGRESS)   \
2048        |(1UL << RS_DONE)       \
2049        )
2050
2051static unsigned long get_work_bits(unsigned long *flags)
2052{
2053        unsigned long old, new;
2054        do {
2055                old = *flags;
2056                new = old & ~DRBD_DEVICE_WORK_MASK;
2057        } while (cmpxchg(flags, old, new) != old);
2058        return old & DRBD_DEVICE_WORK_MASK;
2059}
2060
2061static void do_unqueued_work(struct drbd_connection *connection)
2062{
2063        struct drbd_peer_device *peer_device;
2064        int vnr;
2065
2066        rcu_read_lock();
2067        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2068                struct drbd_device *device = peer_device->device;
2069                unsigned long todo = get_work_bits(&device->flags);
2070                if (!todo)
2071                        continue;
2072
2073                kref_get(&device->kref);
2074                rcu_read_unlock();
2075                do_device_work(device, todo);
2076                kref_put(&device->kref, drbd_destroy_device);
2077                rcu_read_lock();
2078        }
2079        rcu_read_unlock();
2080}
2081
2082static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2083{
2084        spin_lock_irq(&queue->q_lock);
2085        list_splice_tail_init(&queue->q, work_list);
2086        spin_unlock_irq(&queue->q_lock);
2087        return !list_empty(work_list);
2088}
2089
2090static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2091{
2092        DEFINE_WAIT(wait);
2093        struct net_conf *nc;
2094        int uncork, cork;
2095
2096        dequeue_work_batch(&connection->sender_work, work_list);
2097        if (!list_empty(work_list))
2098                return;
2099
2100        /* Still nothing to do?
2101         * Maybe we still need to close the current epoch,
2102         * even if no new requests are queued yet.
2103         *
2104         * Also, poke TCP, just in case.
2105         * Then wait for new work (or signal). */
2106        rcu_read_lock();
2107        nc = rcu_dereference(connection->net_conf);
2108        uncork = nc ? nc->tcp_cork : 0;
2109        rcu_read_unlock();
2110        if (uncork) {
2111                mutex_lock(&connection->data.mutex);
2112                if (connection->data.socket)
2113                        drbd_tcp_uncork(connection->data.socket);
2114                mutex_unlock(&connection->data.mutex);
2115        }
2116
2117        for (;;) {
2118                int send_barrier;
2119                prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2120                spin_lock_irq(&connection->resource->req_lock);
2121                spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2122                if (!list_empty(&connection->sender_work.q))
2123                        list_splice_tail_init(&connection->sender_work.q, work_list);
2124                spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2125                if (!list_empty(work_list) || signal_pending(current)) {
2126                        spin_unlock_irq(&connection->resource->req_lock);
2127                        break;
2128                }
2129
2130                /* We found nothing new to do, no to-be-communicated request,
2131                 * no other work item.  We may still need to close the last
2132                 * epoch.  Next incoming request epoch will be connection ->
2133                 * current transfer log epoch number.  If that is different
2134                 * from the epoch of the last request we communicated, it is
2135                 * safe to send the epoch separating barrier now.
2136                 */
2137                send_barrier =
2138                        atomic_read(&connection->current_tle_nr) !=
2139                        connection->send.current_epoch_nr;
2140                spin_unlock_irq(&connection->resource->req_lock);
2141
2142                if (send_barrier)
2143                        maybe_send_barrier(connection,
2144                                        connection->send.current_epoch_nr + 1);
2145
2146                if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2147                        break;
2148
2149                /* drbd_send() may have called flush_signals() */
2150                if (get_t_state(&connection->worker) != RUNNING)
2151                        break;
2152
2153                schedule();
2154                /* may be woken up for other things but new work, too,
2155                 * e.g. if the current epoch got closed.
2156                 * In which case we send the barrier above. */
2157        }
2158        finish_wait(&connection->sender_work.q_wait, &wait);
2159
2160        /* someone may have changed the config while we have been waiting above. */
2161        rcu_read_lock();
2162        nc = rcu_dereference(connection->net_conf);
2163        cork = nc ? nc->tcp_cork : 0;
2164        rcu_read_unlock();
2165        mutex_lock(&connection->data.mutex);
2166        if (connection->data.socket) {
2167                if (cork)
2168                        drbd_tcp_cork(connection->data.socket);
2169                else if (!uncork)
2170                        drbd_tcp_uncork(connection->data.socket);
2171        }
2172        mutex_unlock(&connection->data.mutex);
2173}
2174
2175int drbd_worker(struct drbd_thread *thi)
2176{
2177        struct drbd_connection *connection = thi->connection;
2178        struct drbd_work *w = NULL;
2179        struct drbd_peer_device *peer_device;
2180        LIST_HEAD(work_list);
2181        int vnr;
2182
2183        while (get_t_state(thi) == RUNNING) {
2184                drbd_thread_current_set_cpu(thi);
2185
2186                if (list_empty(&work_list)) {
2187                        update_worker_timing_details(connection, wait_for_work);
2188                        wait_for_work(connection, &work_list);
2189                }
2190
2191                if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2192                        update_worker_timing_details(connection, do_unqueued_work);
2193                        do_unqueued_work(connection);
2194                }
2195
2196                if (signal_pending(current)) {
2197                        flush_signals(current);
2198                        if (get_t_state(thi) == RUNNING) {
2199                                drbd_warn(connection, "Worker got an unexpected signal\n");
2200                                continue;
2201                        }
2202                        break;
2203                }
2204
2205                if (get_t_state(thi) != RUNNING)
2206                        break;
2207
2208                if (!list_empty(&work_list)) {
2209                        w = list_first_entry(&work_list, struct drbd_work, list);
2210                        list_del_init(&w->list);
2211                        update_worker_timing_details(connection, w->cb);
2212                        if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2213                                continue;
2214                        if (connection->cstate >= C_WF_REPORT_PARAMS)
2215                                conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2216                }
2217        }
2218
2219        do {
2220                if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2221                        update_worker_timing_details(connection, do_unqueued_work);
2222                        do_unqueued_work(connection);
2223                }
2224                if (!list_empty(&work_list)) {
2225                        w = list_first_entry(&work_list, struct drbd_work, list);
2226                        list_del_init(&w->list);
2227                        update_worker_timing_details(connection, w->cb);
2228                        w->cb(w, 1);
2229                } else
2230                        dequeue_work_batch(&connection->sender_work, &work_list);
2231        } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2232
2233        rcu_read_lock();
2234        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2235                struct drbd_device *device = peer_device->device;
2236                D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2237                kref_get(&device->kref);
2238                rcu_read_unlock();
2239                drbd_device_cleanup(device);
2240                kref_put(&device->kref, drbd_destroy_device);
2241                rcu_read_lock();
2242        }
2243        rcu_read_unlock();
2244
2245        return 0;
2246}
2247