linux/drivers/block/drbd/drbd_worker.c
<<
>>
Prefs
   1// SPDX-License-Identifier: GPL-2.0-or-later
   2/*
   3   drbd_worker.c
   4
   5   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   6
   7   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   8   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   9   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
  10
  11
  12*/
  13
  14#include <linux/module.h>
  15#include <linux/drbd.h>
  16#include <linux/sched/signal.h>
  17#include <linux/wait.h>
  18#include <linux/mm.h>
  19#include <linux/memcontrol.h>
  20#include <linux/mm_inline.h>
  21#include <linux/slab.h>
  22#include <linux/random.h>
  23#include <linux/string.h>
  24#include <linux/scatterlist.h>
  25
  26#include "drbd_int.h"
  27#include "drbd_protocol.h"
  28#include "drbd_req.h"
  29
  30static int make_ov_request(struct drbd_device *, int);
  31static int make_resync_request(struct drbd_device *, int);
  32
  33/* endio handlers:
  34 *   drbd_md_endio (defined here)
  35 *   drbd_request_endio (defined here)
  36 *   drbd_peer_request_endio (defined here)
  37 *   drbd_bm_endio (defined in drbd_bitmap.c)
  38 *
  39 * For all these callbacks, note the following:
  40 * The callbacks will be called in irq context by the IDE drivers,
  41 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
  42 * Try to get the locking right :)
  43 *
  44 */
  45
  46/* used for synchronous meta data and bitmap IO
  47 * submitted by drbd_md_sync_page_io()
  48 */
  49void drbd_md_endio(struct bio *bio)
  50{
  51        struct drbd_device *device;
  52
  53        device = bio->bi_private;
  54        device->md_io.error = blk_status_to_errno(bio->bi_status);
  55
  56        /* special case: drbd_md_read() during drbd_adm_attach() */
  57        if (device->ldev)
  58                put_ldev(device);
  59        bio_put(bio);
  60
  61        /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
  62         * to timeout on the lower level device, and eventually detach from it.
  63         * If this io completion runs after that timeout expired, this
  64         * drbd_md_put_buffer() may allow us to finally try and re-attach.
  65         * During normal operation, this only puts that extra reference
  66         * down to 1 again.
  67         * Make sure we first drop the reference, and only then signal
  68         * completion, or we may (in drbd_al_read_log()) cycle so fast into the
  69         * next drbd_md_sync_page_io(), that we trigger the
  70         * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
  71         */
  72        drbd_md_put_buffer(device);
  73        device->md_io.done = 1;
  74        wake_up(&device->misc_wait);
  75}
  76
  77/* reads on behalf of the partner,
  78 * "submitted" by the receiver
  79 */
  80static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
  81{
  82        unsigned long flags = 0;
  83        struct drbd_peer_device *peer_device = peer_req->peer_device;
  84        struct drbd_device *device = peer_device->device;
  85
  86        spin_lock_irqsave(&device->resource->req_lock, flags);
  87        device->read_cnt += peer_req->i.size >> 9;
  88        list_del(&peer_req->w.list);
  89        if (list_empty(&device->read_ee))
  90                wake_up(&device->ee_wait);
  91        if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
  92                __drbd_chk_io_error(device, DRBD_READ_ERROR);
  93        spin_unlock_irqrestore(&device->resource->req_lock, flags);
  94
  95        drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
  96        put_ldev(device);
  97}
  98
  99/* writes on behalf of the partner, or resync writes,
 100 * "submitted" by the receiver, final stage.  */
 101void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
 102{
 103        unsigned long flags = 0;
 104        struct drbd_peer_device *peer_device = peer_req->peer_device;
 105        struct drbd_device *device = peer_device->device;
 106        struct drbd_connection *connection = peer_device->connection;
 107        struct drbd_interval i;
 108        int do_wake;
 109        u64 block_id;
 110        int do_al_complete_io;
 111
 112        /* after we moved peer_req to done_ee,
 113         * we may no longer access it,
 114         * it may be freed/reused already!
 115         * (as soon as we release the req_lock) */
 116        i = peer_req->i;
 117        do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
 118        block_id = peer_req->block_id;
 119        peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
 120
 121        if (peer_req->flags & EE_WAS_ERROR) {
 122                /* In protocol != C, we usually do not send write acks.
 123                 * In case of a write error, send the neg ack anyways. */
 124                if (!__test_and_set_bit(__EE_SEND_WRITE_ACK, &peer_req->flags))
 125                        inc_unacked(device);
 126                drbd_set_out_of_sync(device, peer_req->i.sector, peer_req->i.size);
 127        }
 128
 129        spin_lock_irqsave(&device->resource->req_lock, flags);
 130        device->writ_cnt += peer_req->i.size >> 9;
 131        list_move_tail(&peer_req->w.list, &device->done_ee);
 132
 133        /*
 134         * Do not remove from the write_requests tree here: we did not send the
 135         * Ack yet and did not wake possibly waiting conflicting requests.
 136         * Removed from the tree from "drbd_process_done_ee" within the
 137         * appropriate dw.cb (e_end_block/e_end_resync_block) or from
 138         * _drbd_clear_done_ee.
 139         */
 140
 141        do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
 142
 143        /* FIXME do we want to detach for failed REQ_OP_DISCARD?
 144         * ((peer_req->flags & (EE_WAS_ERROR|EE_TRIM)) == EE_WAS_ERROR) */
 145        if (peer_req->flags & EE_WAS_ERROR)
 146                __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
 147
 148        if (connection->cstate >= C_WF_REPORT_PARAMS) {
 149                kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
 150                if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
 151                        kref_put(&device->kref, drbd_destroy_device);
 152        }
 153        spin_unlock_irqrestore(&device->resource->req_lock, flags);
 154
 155        if (block_id == ID_SYNCER)
 156                drbd_rs_complete_io(device, i.sector);
 157
 158        if (do_wake)
 159                wake_up(&device->ee_wait);
 160
 161        if (do_al_complete_io)
 162                drbd_al_complete_io(device, &i);
 163
 164        put_ldev(device);
 165}
 166
 167/* writes on behalf of the partner, or resync writes,
 168 * "submitted" by the receiver.
 169 */
 170void drbd_peer_request_endio(struct bio *bio)
 171{
 172        struct drbd_peer_request *peer_req = bio->bi_private;
 173        struct drbd_device *device = peer_req->peer_device->device;
 174        bool is_write = bio_data_dir(bio) == WRITE;
 175        bool is_discard = bio_op(bio) == REQ_OP_WRITE_ZEROES ||
 176                          bio_op(bio) == REQ_OP_DISCARD;
 177
 178        if (bio->bi_status && __ratelimit(&drbd_ratelimit_state))
 179                drbd_warn(device, "%s: error=%d s=%llus\n",
 180                                is_write ? (is_discard ? "discard" : "write")
 181                                        : "read", bio->bi_status,
 182                                (unsigned long long)peer_req->i.sector);
 183
 184        if (bio->bi_status)
 185                set_bit(__EE_WAS_ERROR, &peer_req->flags);
 186
 187        bio_put(bio); /* no need for the bio anymore */
 188        if (atomic_dec_and_test(&peer_req->pending_bios)) {
 189                if (is_write)
 190                        drbd_endio_write_sec_final(peer_req);
 191                else
 192                        drbd_endio_read_sec_final(peer_req);
 193        }
 194}
 195
 196static void
 197drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
 198{
 199        panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
 200                device->minor, device->resource->name, device->vnr);
 201}
 202
 203/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
 204 */
 205void drbd_request_endio(struct bio *bio)
 206{
 207        unsigned long flags;
 208        struct drbd_request *req = bio->bi_private;
 209        struct drbd_device *device = req->device;
 210        struct bio_and_error m;
 211        enum drbd_req_event what;
 212
 213        /* If this request was aborted locally before,
 214         * but now was completed "successfully",
 215         * chances are that this caused arbitrary data corruption.
 216         *
 217         * "aborting" requests, or force-detaching the disk, is intended for
 218         * completely blocked/hung local backing devices which do no longer
 219         * complete requests at all, not even do error completions.  In this
 220         * situation, usually a hard-reset and failover is the only way out.
 221         *
 222         * By "aborting", basically faking a local error-completion,
 223         * we allow for a more graceful swichover by cleanly migrating services.
 224         * Still the affected node has to be rebooted "soon".
 225         *
 226         * By completing these requests, we allow the upper layers to re-use
 227         * the associated data pages.
 228         *
 229         * If later the local backing device "recovers", and now DMAs some data
 230         * from disk into the original request pages, in the best case it will
 231         * just put random data into unused pages; but typically it will corrupt
 232         * meanwhile completely unrelated data, causing all sorts of damage.
 233         *
 234         * Which means delayed successful completion,
 235         * especially for READ requests,
 236         * is a reason to panic().
 237         *
 238         * We assume that a delayed *error* completion is OK,
 239         * though we still will complain noisily about it.
 240         */
 241        if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
 242                if (__ratelimit(&drbd_ratelimit_state))
 243                        drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
 244
 245                if (!bio->bi_status)
 246                        drbd_panic_after_delayed_completion_of_aborted_request(device);
 247        }
 248
 249        /* to avoid recursion in __req_mod */
 250        if (unlikely(bio->bi_status)) {
 251                switch (bio_op(bio)) {
 252                case REQ_OP_WRITE_ZEROES:
 253                case REQ_OP_DISCARD:
 254                        if (bio->bi_status == BLK_STS_NOTSUPP)
 255                                what = DISCARD_COMPLETED_NOTSUPP;
 256                        else
 257                                what = DISCARD_COMPLETED_WITH_ERROR;
 258                        break;
 259                case REQ_OP_READ:
 260                        if (bio->bi_opf & REQ_RAHEAD)
 261                                what = READ_AHEAD_COMPLETED_WITH_ERROR;
 262                        else
 263                                what = READ_COMPLETED_WITH_ERROR;
 264                        break;
 265                default:
 266                        what = WRITE_COMPLETED_WITH_ERROR;
 267                        break;
 268                }
 269        } else {
 270                what = COMPLETED_OK;
 271        }
 272
 273        req->private_bio = ERR_PTR(blk_status_to_errno(bio->bi_status));
 274        bio_put(bio);
 275
 276        /* not req_mod(), we need irqsave here! */
 277        spin_lock_irqsave(&device->resource->req_lock, flags);
 278        __req_mod(req, what, &m);
 279        spin_unlock_irqrestore(&device->resource->req_lock, flags);
 280        put_ldev(device);
 281
 282        if (m.bio)
 283                complete_master_bio(device, &m);
 284}
 285
 286void drbd_csum_ee(struct crypto_shash *tfm, struct drbd_peer_request *peer_req, void *digest)
 287{
 288        SHASH_DESC_ON_STACK(desc, tfm);
 289        struct page *page = peer_req->pages;
 290        struct page *tmp;
 291        unsigned len;
 292        void *src;
 293
 294        desc->tfm = tfm;
 295
 296        crypto_shash_init(desc);
 297
 298        src = kmap_atomic(page);
 299        while ((tmp = page_chain_next(page))) {
 300                /* all but the last page will be fully used */
 301                crypto_shash_update(desc, src, PAGE_SIZE);
 302                kunmap_atomic(src);
 303                page = tmp;
 304                src = kmap_atomic(page);
 305        }
 306        /* and now the last, possibly only partially used page */
 307        len = peer_req->i.size & (PAGE_SIZE - 1);
 308        crypto_shash_update(desc, src, len ?: PAGE_SIZE);
 309        kunmap_atomic(src);
 310
 311        crypto_shash_final(desc, digest);
 312        shash_desc_zero(desc);
 313}
 314
 315void drbd_csum_bio(struct crypto_shash *tfm, struct bio *bio, void *digest)
 316{
 317        SHASH_DESC_ON_STACK(desc, tfm);
 318        struct bio_vec bvec;
 319        struct bvec_iter iter;
 320
 321        desc->tfm = tfm;
 322
 323        crypto_shash_init(desc);
 324
 325        bio_for_each_segment(bvec, bio, iter) {
 326                u8 *src;
 327
 328                src = kmap_atomic(bvec.bv_page);
 329                crypto_shash_update(desc, src + bvec.bv_offset, bvec.bv_len);
 330                kunmap_atomic(src);
 331
 332                /* REQ_OP_WRITE_SAME has only one segment,
 333                 * checksum the payload only once. */
 334                if (bio_op(bio) == REQ_OP_WRITE_SAME)
 335                        break;
 336        }
 337        crypto_shash_final(desc, digest);
 338        shash_desc_zero(desc);
 339}
 340
 341/* MAYBE merge common code with w_e_end_ov_req */
 342static int w_e_send_csum(struct drbd_work *w, int cancel)
 343{
 344        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
 345        struct drbd_peer_device *peer_device = peer_req->peer_device;
 346        struct drbd_device *device = peer_device->device;
 347        int digest_size;
 348        void *digest;
 349        int err = 0;
 350
 351        if (unlikely(cancel))
 352                goto out;
 353
 354        if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
 355                goto out;
 356
 357        digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
 358        digest = kmalloc(digest_size, GFP_NOIO);
 359        if (digest) {
 360                sector_t sector = peer_req->i.sector;
 361                unsigned int size = peer_req->i.size;
 362                drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
 363                /* Free peer_req and pages before send.
 364                 * In case we block on congestion, we could otherwise run into
 365                 * some distributed deadlock, if the other side blocks on
 366                 * congestion as well, because our receiver blocks in
 367                 * drbd_alloc_pages due to pp_in_use > max_buffers. */
 368                drbd_free_peer_req(device, peer_req);
 369                peer_req = NULL;
 370                inc_rs_pending(device);
 371                err = drbd_send_drequest_csum(peer_device, sector, size,
 372                                              digest, digest_size,
 373                                              P_CSUM_RS_REQUEST);
 374                kfree(digest);
 375        } else {
 376                drbd_err(device, "kmalloc() of digest failed.\n");
 377                err = -ENOMEM;
 378        }
 379
 380out:
 381        if (peer_req)
 382                drbd_free_peer_req(device, peer_req);
 383
 384        if (unlikely(err))
 385                drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
 386        return err;
 387}
 388
 389#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
 390
 391static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
 392{
 393        struct drbd_device *device = peer_device->device;
 394        struct drbd_peer_request *peer_req;
 395
 396        if (!get_ldev(device))
 397                return -EIO;
 398
 399        /* GFP_TRY, because if there is no memory available right now, this may
 400         * be rescheduled for later. It is "only" background resync, after all. */
 401        peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
 402                                       size, size, GFP_TRY);
 403        if (!peer_req)
 404                goto defer;
 405
 406        peer_req->w.cb = w_e_send_csum;
 407        spin_lock_irq(&device->resource->req_lock);
 408        list_add_tail(&peer_req->w.list, &device->read_ee);
 409        spin_unlock_irq(&device->resource->req_lock);
 410
 411        atomic_add(size >> 9, &device->rs_sect_ev);
 412        if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
 413                                     DRBD_FAULT_RS_RD) == 0)
 414                return 0;
 415
 416        /* If it failed because of ENOMEM, retry should help.  If it failed
 417         * because bio_add_page failed (probably broken lower level driver),
 418         * retry may or may not help.
 419         * If it does not, you may need to force disconnect. */
 420        spin_lock_irq(&device->resource->req_lock);
 421        list_del(&peer_req->w.list);
 422        spin_unlock_irq(&device->resource->req_lock);
 423
 424        drbd_free_peer_req(device, peer_req);
 425defer:
 426        put_ldev(device);
 427        return -EAGAIN;
 428}
 429
 430int w_resync_timer(struct drbd_work *w, int cancel)
 431{
 432        struct drbd_device *device =
 433                container_of(w, struct drbd_device, resync_work);
 434
 435        switch (device->state.conn) {
 436        case C_VERIFY_S:
 437                make_ov_request(device, cancel);
 438                break;
 439        case C_SYNC_TARGET:
 440                make_resync_request(device, cancel);
 441                break;
 442        }
 443
 444        return 0;
 445}
 446
 447void resync_timer_fn(struct timer_list *t)
 448{
 449        struct drbd_device *device = from_timer(device, t, resync_timer);
 450
 451        drbd_queue_work_if_unqueued(
 452                &first_peer_device(device)->connection->sender_work,
 453                &device->resync_work);
 454}
 455
 456static void fifo_set(struct fifo_buffer *fb, int value)
 457{
 458        int i;
 459
 460        for (i = 0; i < fb->size; i++)
 461                fb->values[i] = value;
 462}
 463
 464static int fifo_push(struct fifo_buffer *fb, int value)
 465{
 466        int ov;
 467
 468        ov = fb->values[fb->head_index];
 469        fb->values[fb->head_index++] = value;
 470
 471        if (fb->head_index >= fb->size)
 472                fb->head_index = 0;
 473
 474        return ov;
 475}
 476
 477static void fifo_add_val(struct fifo_buffer *fb, int value)
 478{
 479        int i;
 480
 481        for (i = 0; i < fb->size; i++)
 482                fb->values[i] += value;
 483}
 484
 485struct fifo_buffer *fifo_alloc(unsigned int fifo_size)
 486{
 487        struct fifo_buffer *fb;
 488
 489        fb = kzalloc(struct_size(fb, values, fifo_size), GFP_NOIO);
 490        if (!fb)
 491                return NULL;
 492
 493        fb->head_index = 0;
 494        fb->size = fifo_size;
 495        fb->total = 0;
 496
 497        return fb;
 498}
 499
 500static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
 501{
 502        struct disk_conf *dc;
 503        unsigned int want;     /* The number of sectors we want in-flight */
 504        int req_sect; /* Number of sectors to request in this turn */
 505        int correction; /* Number of sectors more we need in-flight */
 506        int cps; /* correction per invocation of drbd_rs_controller() */
 507        int steps; /* Number of time steps to plan ahead */
 508        int curr_corr;
 509        int max_sect;
 510        struct fifo_buffer *plan;
 511
 512        dc = rcu_dereference(device->ldev->disk_conf);
 513        plan = rcu_dereference(device->rs_plan_s);
 514
 515        steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
 516
 517        if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
 518                want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
 519        } else { /* normal path */
 520                want = dc->c_fill_target ? dc->c_fill_target :
 521                        sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
 522        }
 523
 524        correction = want - device->rs_in_flight - plan->total;
 525
 526        /* Plan ahead */
 527        cps = correction / steps;
 528        fifo_add_val(plan, cps);
 529        plan->total += cps * steps;
 530
 531        /* What we do in this step */
 532        curr_corr = fifo_push(plan, 0);
 533        plan->total -= curr_corr;
 534
 535        req_sect = sect_in + curr_corr;
 536        if (req_sect < 0)
 537                req_sect = 0;
 538
 539        max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
 540        if (req_sect > max_sect)
 541                req_sect = max_sect;
 542
 543        /*
 544        drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
 545                 sect_in, device->rs_in_flight, want, correction,
 546                 steps, cps, device->rs_planed, curr_corr, req_sect);
 547        */
 548
 549        return req_sect;
 550}
 551
 552static int drbd_rs_number_requests(struct drbd_device *device)
 553{
 554        unsigned int sect_in;  /* Number of sectors that came in since the last turn */
 555        int number, mxb;
 556
 557        sect_in = atomic_xchg(&device->rs_sect_in, 0);
 558        device->rs_in_flight -= sect_in;
 559
 560        rcu_read_lock();
 561        mxb = drbd_get_max_buffers(device) / 2;
 562        if (rcu_dereference(device->rs_plan_s)->size) {
 563                number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
 564                device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
 565        } else {
 566                device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
 567                number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
 568        }
 569        rcu_read_unlock();
 570
 571        /* Don't have more than "max-buffers"/2 in-flight.
 572         * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
 573         * potentially causing a distributed deadlock on congestion during
 574         * online-verify or (checksum-based) resync, if max-buffers,
 575         * socket buffer sizes and resync rate settings are mis-configured. */
 576
 577        /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
 578         * mxb (as used here, and in drbd_alloc_pages on the peer) is
 579         * "number of pages" (typically also 4k),
 580         * but "rs_in_flight" is in "sectors" (512 Byte). */
 581        if (mxb - device->rs_in_flight/8 < number)
 582                number = mxb - device->rs_in_flight/8;
 583
 584        return number;
 585}
 586
 587static int make_resync_request(struct drbd_device *const device, int cancel)
 588{
 589        struct drbd_peer_device *const peer_device = first_peer_device(device);
 590        struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
 591        unsigned long bit;
 592        sector_t sector;
 593        const sector_t capacity = drbd_get_capacity(device->this_bdev);
 594        int max_bio_size;
 595        int number, rollback_i, size;
 596        int align, requeue = 0;
 597        int i = 0;
 598        int discard_granularity = 0;
 599
 600        if (unlikely(cancel))
 601                return 0;
 602
 603        if (device->rs_total == 0) {
 604                /* empty resync? */
 605                drbd_resync_finished(device);
 606                return 0;
 607        }
 608
 609        if (!get_ldev(device)) {
 610                /* Since we only need to access device->rsync a
 611                   get_ldev_if_state(device,D_FAILED) would be sufficient, but
 612                   to continue resync with a broken disk makes no sense at
 613                   all */
 614                drbd_err(device, "Disk broke down during resync!\n");
 615                return 0;
 616        }
 617
 618        if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
 619                rcu_read_lock();
 620                discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
 621                rcu_read_unlock();
 622        }
 623
 624        max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
 625        number = drbd_rs_number_requests(device);
 626        if (number <= 0)
 627                goto requeue;
 628
 629        for (i = 0; i < number; i++) {
 630                /* Stop generating RS requests when half of the send buffer is filled,
 631                 * but notify TCP that we'd like to have more space. */
 632                mutex_lock(&connection->data.mutex);
 633                if (connection->data.socket) {
 634                        struct sock *sk = connection->data.socket->sk;
 635                        int queued = sk->sk_wmem_queued;
 636                        int sndbuf = sk->sk_sndbuf;
 637                        if (queued > sndbuf / 2) {
 638                                requeue = 1;
 639                                if (sk->sk_socket)
 640                                        set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 641                        }
 642                } else
 643                        requeue = 1;
 644                mutex_unlock(&connection->data.mutex);
 645                if (requeue)
 646                        goto requeue;
 647
 648next_sector:
 649                size = BM_BLOCK_SIZE;
 650                bit  = drbd_bm_find_next(device, device->bm_resync_fo);
 651
 652                if (bit == DRBD_END_OF_BITMAP) {
 653                        device->bm_resync_fo = drbd_bm_bits(device);
 654                        put_ldev(device);
 655                        return 0;
 656                }
 657
 658                sector = BM_BIT_TO_SECT(bit);
 659
 660                if (drbd_try_rs_begin_io(device, sector)) {
 661                        device->bm_resync_fo = bit;
 662                        goto requeue;
 663                }
 664                device->bm_resync_fo = bit + 1;
 665
 666                if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
 667                        drbd_rs_complete_io(device, sector);
 668                        goto next_sector;
 669                }
 670
 671#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
 672                /* try to find some adjacent bits.
 673                 * we stop if we have already the maximum req size.
 674                 *
 675                 * Additionally always align bigger requests, in order to
 676                 * be prepared for all stripe sizes of software RAIDs.
 677                 */
 678                align = 1;
 679                rollback_i = i;
 680                while (i < number) {
 681                        if (size + BM_BLOCK_SIZE > max_bio_size)
 682                                break;
 683
 684                        /* Be always aligned */
 685                        if (sector & ((1<<(align+3))-1))
 686                                break;
 687
 688                        if (discard_granularity && size == discard_granularity)
 689                                break;
 690
 691                        /* do not cross extent boundaries */
 692                        if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
 693                                break;
 694                        /* now, is it actually dirty, after all?
 695                         * caution, drbd_bm_test_bit is tri-state for some
 696                         * obscure reason; ( b == 0 ) would get the out-of-band
 697                         * only accidentally right because of the "oddly sized"
 698                         * adjustment below */
 699                        if (drbd_bm_test_bit(device, bit+1) != 1)
 700                                break;
 701                        bit++;
 702                        size += BM_BLOCK_SIZE;
 703                        if ((BM_BLOCK_SIZE << align) <= size)
 704                                align++;
 705                        i++;
 706                }
 707                /* if we merged some,
 708                 * reset the offset to start the next drbd_bm_find_next from */
 709                if (size > BM_BLOCK_SIZE)
 710                        device->bm_resync_fo = bit + 1;
 711#endif
 712
 713                /* adjust very last sectors, in case we are oddly sized */
 714                if (sector + (size>>9) > capacity)
 715                        size = (capacity-sector)<<9;
 716
 717                if (device->use_csums) {
 718                        switch (read_for_csum(peer_device, sector, size)) {
 719                        case -EIO: /* Disk failure */
 720                                put_ldev(device);
 721                                return -EIO;
 722                        case -EAGAIN: /* allocation failed, or ldev busy */
 723                                drbd_rs_complete_io(device, sector);
 724                                device->bm_resync_fo = BM_SECT_TO_BIT(sector);
 725                                i = rollback_i;
 726                                goto requeue;
 727                        case 0:
 728                                /* everything ok */
 729                                break;
 730                        default:
 731                                BUG();
 732                        }
 733                } else {
 734                        int err;
 735
 736                        inc_rs_pending(device);
 737                        err = drbd_send_drequest(peer_device,
 738                                                 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
 739                                                 sector, size, ID_SYNCER);
 740                        if (err) {
 741                                drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
 742                                dec_rs_pending(device);
 743                                put_ldev(device);
 744                                return err;
 745                        }
 746                }
 747        }
 748
 749        if (device->bm_resync_fo >= drbd_bm_bits(device)) {
 750                /* last syncer _request_ was sent,
 751                 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
 752                 * next sync group will resume), as soon as we receive the last
 753                 * resync data block, and the last bit is cleared.
 754                 * until then resync "work" is "inactive" ...
 755                 */
 756                put_ldev(device);
 757                return 0;
 758        }
 759
 760 requeue:
 761        device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 762        mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 763        put_ldev(device);
 764        return 0;
 765}
 766
 767static int make_ov_request(struct drbd_device *device, int cancel)
 768{
 769        int number, i, size;
 770        sector_t sector;
 771        const sector_t capacity = drbd_get_capacity(device->this_bdev);
 772        bool stop_sector_reached = false;
 773
 774        if (unlikely(cancel))
 775                return 1;
 776
 777        number = drbd_rs_number_requests(device);
 778
 779        sector = device->ov_position;
 780        for (i = 0; i < number; i++) {
 781                if (sector >= capacity)
 782                        return 1;
 783
 784                /* We check for "finished" only in the reply path:
 785                 * w_e_end_ov_reply().
 786                 * We need to send at least one request out. */
 787                stop_sector_reached = i > 0
 788                        && verify_can_do_stop_sector(device)
 789                        && sector >= device->ov_stop_sector;
 790                if (stop_sector_reached)
 791                        break;
 792
 793                size = BM_BLOCK_SIZE;
 794
 795                if (drbd_try_rs_begin_io(device, sector)) {
 796                        device->ov_position = sector;
 797                        goto requeue;
 798                }
 799
 800                if (sector + (size>>9) > capacity)
 801                        size = (capacity-sector)<<9;
 802
 803                inc_rs_pending(device);
 804                if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
 805                        dec_rs_pending(device);
 806                        return 0;
 807                }
 808                sector += BM_SECT_PER_BIT;
 809        }
 810        device->ov_position = sector;
 811
 812 requeue:
 813        device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 814        if (i == 0 || !stop_sector_reached)
 815                mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 816        return 1;
 817}
 818
 819int w_ov_finished(struct drbd_work *w, int cancel)
 820{
 821        struct drbd_device_work *dw =
 822                container_of(w, struct drbd_device_work, w);
 823        struct drbd_device *device = dw->device;
 824        kfree(dw);
 825        ov_out_of_sync_print(device);
 826        drbd_resync_finished(device);
 827
 828        return 0;
 829}
 830
 831static int w_resync_finished(struct drbd_work *w, int cancel)
 832{
 833        struct drbd_device_work *dw =
 834                container_of(w, struct drbd_device_work, w);
 835        struct drbd_device *device = dw->device;
 836        kfree(dw);
 837
 838        drbd_resync_finished(device);
 839
 840        return 0;
 841}
 842
 843static void ping_peer(struct drbd_device *device)
 844{
 845        struct drbd_connection *connection = first_peer_device(device)->connection;
 846
 847        clear_bit(GOT_PING_ACK, &connection->flags);
 848        request_ping(connection);
 849        wait_event(connection->ping_wait,
 850                   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
 851}
 852
 853int drbd_resync_finished(struct drbd_device *device)
 854{
 855        struct drbd_connection *connection = first_peer_device(device)->connection;
 856        unsigned long db, dt, dbdt;
 857        unsigned long n_oos;
 858        union drbd_state os, ns;
 859        struct drbd_device_work *dw;
 860        char *khelper_cmd = NULL;
 861        int verify_done = 0;
 862
 863        /* Remove all elements from the resync LRU. Since future actions
 864         * might set bits in the (main) bitmap, then the entries in the
 865         * resync LRU would be wrong. */
 866        if (drbd_rs_del_all(device)) {
 867                /* In case this is not possible now, most probably because
 868                 * there are P_RS_DATA_REPLY Packets lingering on the worker's
 869                 * queue (or even the read operations for those packets
 870                 * is not finished by now).   Retry in 100ms. */
 871
 872                schedule_timeout_interruptible(HZ / 10);
 873                dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
 874                if (dw) {
 875                        dw->w.cb = w_resync_finished;
 876                        dw->device = device;
 877                        drbd_queue_work(&connection->sender_work, &dw->w);
 878                        return 1;
 879                }
 880                drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
 881        }
 882
 883        dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
 884        if (dt <= 0)
 885                dt = 1;
 886
 887        db = device->rs_total;
 888        /* adjust for verify start and stop sectors, respective reached position */
 889        if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
 890                db -= device->ov_left;
 891
 892        dbdt = Bit2KB(db/dt);
 893        device->rs_paused /= HZ;
 894
 895        if (!get_ldev(device))
 896                goto out;
 897
 898        ping_peer(device);
 899
 900        spin_lock_irq(&device->resource->req_lock);
 901        os = drbd_read_state(device);
 902
 903        verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
 904
 905        /* This protects us against multiple calls (that can happen in the presence
 906           of application IO), and against connectivity loss just before we arrive here. */
 907        if (os.conn <= C_CONNECTED)
 908                goto out_unlock;
 909
 910        ns = os;
 911        ns.conn = C_CONNECTED;
 912
 913        drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
 914             verify_done ? "Online verify" : "Resync",
 915             dt + device->rs_paused, device->rs_paused, dbdt);
 916
 917        n_oos = drbd_bm_total_weight(device);
 918
 919        if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
 920                if (n_oos) {
 921                        drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
 922                              n_oos, Bit2KB(1));
 923                        khelper_cmd = "out-of-sync";
 924                }
 925        } else {
 926                D_ASSERT(device, (n_oos - device->rs_failed) == 0);
 927
 928                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
 929                        khelper_cmd = "after-resync-target";
 930
 931                if (device->use_csums && device->rs_total) {
 932                        const unsigned long s = device->rs_same_csum;
 933                        const unsigned long t = device->rs_total;
 934                        const int ratio =
 935                                (t == 0)     ? 0 :
 936                        (t < 100000) ? ((s*100)/t) : (s/(t/100));
 937                        drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
 938                             "transferred %luK total %luK\n",
 939                             ratio,
 940                             Bit2KB(device->rs_same_csum),
 941                             Bit2KB(device->rs_total - device->rs_same_csum),
 942                             Bit2KB(device->rs_total));
 943                }
 944        }
 945
 946        if (device->rs_failed) {
 947                drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
 948
 949                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 950                        ns.disk = D_INCONSISTENT;
 951                        ns.pdsk = D_UP_TO_DATE;
 952                } else {
 953                        ns.disk = D_UP_TO_DATE;
 954                        ns.pdsk = D_INCONSISTENT;
 955                }
 956        } else {
 957                ns.disk = D_UP_TO_DATE;
 958                ns.pdsk = D_UP_TO_DATE;
 959
 960                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 961                        if (device->p_uuid) {
 962                                int i;
 963                                for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
 964                                        _drbd_uuid_set(device, i, device->p_uuid[i]);
 965                                drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
 966                                _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
 967                        } else {
 968                                drbd_err(device, "device->p_uuid is NULL! BUG\n");
 969                        }
 970                }
 971
 972                if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
 973                        /* for verify runs, we don't update uuids here,
 974                         * so there would be nothing to report. */
 975                        drbd_uuid_set_bm(device, 0UL);
 976                        drbd_print_uuids(device, "updated UUIDs");
 977                        if (device->p_uuid) {
 978                                /* Now the two UUID sets are equal, update what we
 979                                 * know of the peer. */
 980                                int i;
 981                                for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
 982                                        device->p_uuid[i] = device->ldev->md.uuid[i];
 983                        }
 984                }
 985        }
 986
 987        _drbd_set_state(device, ns, CS_VERBOSE, NULL);
 988out_unlock:
 989        spin_unlock_irq(&device->resource->req_lock);
 990
 991        /* If we have been sync source, and have an effective fencing-policy,
 992         * once *all* volumes are back in sync, call "unfence". */
 993        if (os.conn == C_SYNC_SOURCE) {
 994                enum drbd_disk_state disk_state = D_MASK;
 995                enum drbd_disk_state pdsk_state = D_MASK;
 996                enum drbd_fencing_p fp = FP_DONT_CARE;
 997
 998                rcu_read_lock();
 999                fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1000                if (fp != FP_DONT_CARE) {
1001                        struct drbd_peer_device *peer_device;
1002                        int vnr;
1003                        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1004                                struct drbd_device *device = peer_device->device;
1005                                disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1006                                pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1007                        }
1008                }
1009                rcu_read_unlock();
1010                if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1011                        conn_khelper(connection, "unfence-peer");
1012        }
1013
1014        put_ldev(device);
1015out:
1016        device->rs_total  = 0;
1017        device->rs_failed = 0;
1018        device->rs_paused = 0;
1019
1020        /* reset start sector, if we reached end of device */
1021        if (verify_done && device->ov_left == 0)
1022                device->ov_start_sector = 0;
1023
1024        drbd_md_sync(device);
1025
1026        if (khelper_cmd)
1027                drbd_khelper(device, khelper_cmd);
1028
1029        return 1;
1030}
1031
1032/* helper */
1033static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1034{
1035        if (drbd_peer_req_has_active_page(peer_req)) {
1036                /* This might happen if sendpage() has not finished */
1037                int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1038                atomic_add(i, &device->pp_in_use_by_net);
1039                atomic_sub(i, &device->pp_in_use);
1040                spin_lock_irq(&device->resource->req_lock);
1041                list_add_tail(&peer_req->w.list, &device->net_ee);
1042                spin_unlock_irq(&device->resource->req_lock);
1043                wake_up(&drbd_pp_wait);
1044        } else
1045                drbd_free_peer_req(device, peer_req);
1046}
1047
1048/**
1049 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1050 * @w:          work object.
1051 * @cancel:     The connection will be closed anyways
1052 */
1053int w_e_end_data_req(struct drbd_work *w, int cancel)
1054{
1055        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1056        struct drbd_peer_device *peer_device = peer_req->peer_device;
1057        struct drbd_device *device = peer_device->device;
1058        int err;
1059
1060        if (unlikely(cancel)) {
1061                drbd_free_peer_req(device, peer_req);
1062                dec_unacked(device);
1063                return 0;
1064        }
1065
1066        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1067                err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1068        } else {
1069                if (__ratelimit(&drbd_ratelimit_state))
1070                        drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1071                            (unsigned long long)peer_req->i.sector);
1072
1073                err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1074        }
1075
1076        dec_unacked(device);
1077
1078        move_to_net_ee_or_free(device, peer_req);
1079
1080        if (unlikely(err))
1081                drbd_err(device, "drbd_send_block() failed\n");
1082        return err;
1083}
1084
1085static bool all_zero(struct drbd_peer_request *peer_req)
1086{
1087        struct page *page = peer_req->pages;
1088        unsigned int len = peer_req->i.size;
1089
1090        page_chain_for_each(page) {
1091                unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1092                unsigned int i, words = l / sizeof(long);
1093                unsigned long *d;
1094
1095                d = kmap_atomic(page);
1096                for (i = 0; i < words; i++) {
1097                        if (d[i]) {
1098                                kunmap_atomic(d);
1099                                return false;
1100                        }
1101                }
1102                kunmap_atomic(d);
1103                len -= l;
1104        }
1105
1106        return true;
1107}
1108
1109/**
1110 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1111 * @w:          work object.
1112 * @cancel:     The connection will be closed anyways
1113 */
1114int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1115{
1116        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1117        struct drbd_peer_device *peer_device = peer_req->peer_device;
1118        struct drbd_device *device = peer_device->device;
1119        int err;
1120
1121        if (unlikely(cancel)) {
1122                drbd_free_peer_req(device, peer_req);
1123                dec_unacked(device);
1124                return 0;
1125        }
1126
1127        if (get_ldev_if_state(device, D_FAILED)) {
1128                drbd_rs_complete_io(device, peer_req->i.sector);
1129                put_ldev(device);
1130        }
1131
1132        if (device->state.conn == C_AHEAD) {
1133                err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1134        } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1135                if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1136                        inc_rs_pending(device);
1137                        if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1138                                err = drbd_send_rs_deallocated(peer_device, peer_req);
1139                        else
1140                                err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1141                } else {
1142                        if (__ratelimit(&drbd_ratelimit_state))
1143                                drbd_err(device, "Not sending RSDataReply, "
1144                                    "partner DISKLESS!\n");
1145                        err = 0;
1146                }
1147        } else {
1148                if (__ratelimit(&drbd_ratelimit_state))
1149                        drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1150                            (unsigned long long)peer_req->i.sector);
1151
1152                err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1153
1154                /* update resync data with failure */
1155                drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1156        }
1157
1158        dec_unacked(device);
1159
1160        move_to_net_ee_or_free(device, peer_req);
1161
1162        if (unlikely(err))
1163                drbd_err(device, "drbd_send_block() failed\n");
1164        return err;
1165}
1166
1167int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1168{
1169        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1170        struct drbd_peer_device *peer_device = peer_req->peer_device;
1171        struct drbd_device *device = peer_device->device;
1172        struct digest_info *di;
1173        int digest_size;
1174        void *digest = NULL;
1175        int err, eq = 0;
1176
1177        if (unlikely(cancel)) {
1178                drbd_free_peer_req(device, peer_req);
1179                dec_unacked(device);
1180                return 0;
1181        }
1182
1183        if (get_ldev(device)) {
1184                drbd_rs_complete_io(device, peer_req->i.sector);
1185                put_ldev(device);
1186        }
1187
1188        di = peer_req->digest;
1189
1190        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1191                /* quick hack to try to avoid a race against reconfiguration.
1192                 * a real fix would be much more involved,
1193                 * introducing more locking mechanisms */
1194                if (peer_device->connection->csums_tfm) {
1195                        digest_size = crypto_shash_digestsize(peer_device->connection->csums_tfm);
1196                        D_ASSERT(device, digest_size == di->digest_size);
1197                        digest = kmalloc(digest_size, GFP_NOIO);
1198                }
1199                if (digest) {
1200                        drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1201                        eq = !memcmp(digest, di->digest, digest_size);
1202                        kfree(digest);
1203                }
1204
1205                if (eq) {
1206                        drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1207                        /* rs_same_csums unit is BM_BLOCK_SIZE */
1208                        device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1209                        err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1210                } else {
1211                        inc_rs_pending(device);
1212                        peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1213                        peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1214                        kfree(di);
1215                        err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1216                }
1217        } else {
1218                err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1219                if (__ratelimit(&drbd_ratelimit_state))
1220                        drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1221        }
1222
1223        dec_unacked(device);
1224        move_to_net_ee_or_free(device, peer_req);
1225
1226        if (unlikely(err))
1227                drbd_err(device, "drbd_send_block/ack() failed\n");
1228        return err;
1229}
1230
1231int w_e_end_ov_req(struct drbd_work *w, int cancel)
1232{
1233        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1234        struct drbd_peer_device *peer_device = peer_req->peer_device;
1235        struct drbd_device *device = peer_device->device;
1236        sector_t sector = peer_req->i.sector;
1237        unsigned int size = peer_req->i.size;
1238        int digest_size;
1239        void *digest;
1240        int err = 0;
1241
1242        if (unlikely(cancel))
1243                goto out;
1244
1245        digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1246        digest = kmalloc(digest_size, GFP_NOIO);
1247        if (!digest) {
1248                err = 1;        /* terminate the connection in case the allocation failed */
1249                goto out;
1250        }
1251
1252        if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1253                drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1254        else
1255                memset(digest, 0, digest_size);
1256
1257        /* Free e and pages before send.
1258         * In case we block on congestion, we could otherwise run into
1259         * some distributed deadlock, if the other side blocks on
1260         * congestion as well, because our receiver blocks in
1261         * drbd_alloc_pages due to pp_in_use > max_buffers. */
1262        drbd_free_peer_req(device, peer_req);
1263        peer_req = NULL;
1264        inc_rs_pending(device);
1265        err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1266        if (err)
1267                dec_rs_pending(device);
1268        kfree(digest);
1269
1270out:
1271        if (peer_req)
1272                drbd_free_peer_req(device, peer_req);
1273        dec_unacked(device);
1274        return err;
1275}
1276
1277void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1278{
1279        if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1280                device->ov_last_oos_size += size>>9;
1281        } else {
1282                device->ov_last_oos_start = sector;
1283                device->ov_last_oos_size = size>>9;
1284        }
1285        drbd_set_out_of_sync(device, sector, size);
1286}
1287
1288int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1289{
1290        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1291        struct drbd_peer_device *peer_device = peer_req->peer_device;
1292        struct drbd_device *device = peer_device->device;
1293        struct digest_info *di;
1294        void *digest;
1295        sector_t sector = peer_req->i.sector;
1296        unsigned int size = peer_req->i.size;
1297        int digest_size;
1298        int err, eq = 0;
1299        bool stop_sector_reached = false;
1300
1301        if (unlikely(cancel)) {
1302                drbd_free_peer_req(device, peer_req);
1303                dec_unacked(device);
1304                return 0;
1305        }
1306
1307        /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1308         * the resync lru has been cleaned up already */
1309        if (get_ldev(device)) {
1310                drbd_rs_complete_io(device, peer_req->i.sector);
1311                put_ldev(device);
1312        }
1313
1314        di = peer_req->digest;
1315
1316        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1317                digest_size = crypto_shash_digestsize(peer_device->connection->verify_tfm);
1318                digest = kmalloc(digest_size, GFP_NOIO);
1319                if (digest) {
1320                        drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1321
1322                        D_ASSERT(device, digest_size == di->digest_size);
1323                        eq = !memcmp(digest, di->digest, digest_size);
1324                        kfree(digest);
1325                }
1326        }
1327
1328        /* Free peer_req and pages before send.
1329         * In case we block on congestion, we could otherwise run into
1330         * some distributed deadlock, if the other side blocks on
1331         * congestion as well, because our receiver blocks in
1332         * drbd_alloc_pages due to pp_in_use > max_buffers. */
1333        drbd_free_peer_req(device, peer_req);
1334        if (!eq)
1335                drbd_ov_out_of_sync_found(device, sector, size);
1336        else
1337                ov_out_of_sync_print(device);
1338
1339        err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1340                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1341
1342        dec_unacked(device);
1343
1344        --device->ov_left;
1345
1346        /* let's advance progress step marks only for every other megabyte */
1347        if ((device->ov_left & 0x200) == 0x200)
1348                drbd_advance_rs_marks(device, device->ov_left);
1349
1350        stop_sector_reached = verify_can_do_stop_sector(device) &&
1351                (sector + (size>>9)) >= device->ov_stop_sector;
1352
1353        if (device->ov_left == 0 || stop_sector_reached) {
1354                ov_out_of_sync_print(device);
1355                drbd_resync_finished(device);
1356        }
1357
1358        return err;
1359}
1360
1361/* FIXME
1362 * We need to track the number of pending barrier acks,
1363 * and to be able to wait for them.
1364 * See also comment in drbd_adm_attach before drbd_suspend_io.
1365 */
1366static int drbd_send_barrier(struct drbd_connection *connection)
1367{
1368        struct p_barrier *p;
1369        struct drbd_socket *sock;
1370
1371        sock = &connection->data;
1372        p = conn_prepare_command(connection, sock);
1373        if (!p)
1374                return -EIO;
1375        p->barrier = connection->send.current_epoch_nr;
1376        p->pad = 0;
1377        connection->send.current_epoch_writes = 0;
1378        connection->send.last_sent_barrier_jif = jiffies;
1379
1380        return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1381}
1382
1383static int pd_send_unplug_remote(struct drbd_peer_device *pd)
1384{
1385        struct drbd_socket *sock = &pd->connection->data;
1386        if (!drbd_prepare_command(pd, sock))
1387                return -EIO;
1388        return drbd_send_command(pd, sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1389}
1390
1391int w_send_write_hint(struct drbd_work *w, int cancel)
1392{
1393        struct drbd_device *device =
1394                container_of(w, struct drbd_device, unplug_work);
1395
1396        if (cancel)
1397                return 0;
1398        return pd_send_unplug_remote(first_peer_device(device));
1399}
1400
1401static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1402{
1403        if (!connection->send.seen_any_write_yet) {
1404                connection->send.seen_any_write_yet = true;
1405                connection->send.current_epoch_nr = epoch;
1406                connection->send.current_epoch_writes = 0;
1407                connection->send.last_sent_barrier_jif = jiffies;
1408        }
1409}
1410
1411static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1412{
1413        /* re-init if first write on this connection */
1414        if (!connection->send.seen_any_write_yet)
1415                return;
1416        if (connection->send.current_epoch_nr != epoch) {
1417                if (connection->send.current_epoch_writes)
1418                        drbd_send_barrier(connection);
1419                connection->send.current_epoch_nr = epoch;
1420        }
1421}
1422
1423int w_send_out_of_sync(struct drbd_work *w, int cancel)
1424{
1425        struct drbd_request *req = container_of(w, struct drbd_request, w);
1426        struct drbd_device *device = req->device;
1427        struct drbd_peer_device *const peer_device = first_peer_device(device);
1428        struct drbd_connection *const connection = peer_device->connection;
1429        int err;
1430
1431        if (unlikely(cancel)) {
1432                req_mod(req, SEND_CANCELED);
1433                return 0;
1434        }
1435        req->pre_send_jif = jiffies;
1436
1437        /* this time, no connection->send.current_epoch_writes++;
1438         * If it was sent, it was the closing barrier for the last
1439         * replicated epoch, before we went into AHEAD mode.
1440         * No more barriers will be sent, until we leave AHEAD mode again. */
1441        maybe_send_barrier(connection, req->epoch);
1442
1443        err = drbd_send_out_of_sync(peer_device, req);
1444        req_mod(req, OOS_HANDED_TO_NETWORK);
1445
1446        return err;
1447}
1448
1449/**
1450 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1451 * @w:          work object.
1452 * @cancel:     The connection will be closed anyways
1453 */
1454int w_send_dblock(struct drbd_work *w, int cancel)
1455{
1456        struct drbd_request *req = container_of(w, struct drbd_request, w);
1457        struct drbd_device *device = req->device;
1458        struct drbd_peer_device *const peer_device = first_peer_device(device);
1459        struct drbd_connection *connection = peer_device->connection;
1460        bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1461        int err;
1462
1463        if (unlikely(cancel)) {
1464                req_mod(req, SEND_CANCELED);
1465                return 0;
1466        }
1467        req->pre_send_jif = jiffies;
1468
1469        re_init_if_first_write(connection, req->epoch);
1470        maybe_send_barrier(connection, req->epoch);
1471        connection->send.current_epoch_writes++;
1472
1473        err = drbd_send_dblock(peer_device, req);
1474        req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1475
1476        if (do_send_unplug && !err)
1477                pd_send_unplug_remote(peer_device);
1478
1479        return err;
1480}
1481
1482/**
1483 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1484 * @w:          work object.
1485 * @cancel:     The connection will be closed anyways
1486 */
1487int w_send_read_req(struct drbd_work *w, int cancel)
1488{
1489        struct drbd_request *req = container_of(w, struct drbd_request, w);
1490        struct drbd_device *device = req->device;
1491        struct drbd_peer_device *const peer_device = first_peer_device(device);
1492        struct drbd_connection *connection = peer_device->connection;
1493        bool do_send_unplug = req->rq_state & RQ_UNPLUG;
1494        int err;
1495
1496        if (unlikely(cancel)) {
1497                req_mod(req, SEND_CANCELED);
1498                return 0;
1499        }
1500        req->pre_send_jif = jiffies;
1501
1502        /* Even read requests may close a write epoch,
1503         * if there was any yet. */
1504        maybe_send_barrier(connection, req->epoch);
1505
1506        err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1507                                 (unsigned long)req);
1508
1509        req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1510
1511        if (do_send_unplug && !err)
1512                pd_send_unplug_remote(peer_device);
1513
1514        return err;
1515}
1516
1517int w_restart_disk_io(struct drbd_work *w, int cancel)
1518{
1519        struct drbd_request *req = container_of(w, struct drbd_request, w);
1520        struct drbd_device *device = req->device;
1521
1522        if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1523                drbd_al_begin_io(device, &req->i);
1524
1525        drbd_req_make_private_bio(req, req->master_bio);
1526        bio_set_dev(req->private_bio, device->ldev->backing_bdev);
1527        generic_make_request(req->private_bio);
1528
1529        return 0;
1530}
1531
1532static int _drbd_may_sync_now(struct drbd_device *device)
1533{
1534        struct drbd_device *odev = device;
1535        int resync_after;
1536
1537        while (1) {
1538                if (!odev->ldev || odev->state.disk == D_DISKLESS)
1539                        return 1;
1540                rcu_read_lock();
1541                resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1542                rcu_read_unlock();
1543                if (resync_after == -1)
1544                        return 1;
1545                odev = minor_to_device(resync_after);
1546                if (!odev)
1547                        return 1;
1548                if ((odev->state.conn >= C_SYNC_SOURCE &&
1549                     odev->state.conn <= C_PAUSED_SYNC_T) ||
1550                    odev->state.aftr_isp || odev->state.peer_isp ||
1551                    odev->state.user_isp)
1552                        return 0;
1553        }
1554}
1555
1556/**
1557 * drbd_pause_after() - Pause resync on all devices that may not resync now
1558 * @device:     DRBD device.
1559 *
1560 * Called from process context only (admin command and after_state_ch).
1561 */
1562static bool drbd_pause_after(struct drbd_device *device)
1563{
1564        bool changed = false;
1565        struct drbd_device *odev;
1566        int i;
1567
1568        rcu_read_lock();
1569        idr_for_each_entry(&drbd_devices, odev, i) {
1570                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1571                        continue;
1572                if (!_drbd_may_sync_now(odev) &&
1573                    _drbd_set_state(_NS(odev, aftr_isp, 1),
1574                                    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1575                        changed = true;
1576        }
1577        rcu_read_unlock();
1578
1579        return changed;
1580}
1581
1582/**
1583 * drbd_resume_next() - Resume resync on all devices that may resync now
1584 * @device:     DRBD device.
1585 *
1586 * Called from process context only (admin command and worker).
1587 */
1588static bool drbd_resume_next(struct drbd_device *device)
1589{
1590        bool changed = false;
1591        struct drbd_device *odev;
1592        int i;
1593
1594        rcu_read_lock();
1595        idr_for_each_entry(&drbd_devices, odev, i) {
1596                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1597                        continue;
1598                if (odev->state.aftr_isp) {
1599                        if (_drbd_may_sync_now(odev) &&
1600                            _drbd_set_state(_NS(odev, aftr_isp, 0),
1601                                            CS_HARD, NULL) != SS_NOTHING_TO_DO)
1602                                changed = true;
1603                }
1604        }
1605        rcu_read_unlock();
1606        return changed;
1607}
1608
1609void resume_next_sg(struct drbd_device *device)
1610{
1611        lock_all_resources();
1612        drbd_resume_next(device);
1613        unlock_all_resources();
1614}
1615
1616void suspend_other_sg(struct drbd_device *device)
1617{
1618        lock_all_resources();
1619        drbd_pause_after(device);
1620        unlock_all_resources();
1621}
1622
1623/* caller must lock_all_resources() */
1624enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1625{
1626        struct drbd_device *odev;
1627        int resync_after;
1628
1629        if (o_minor == -1)
1630                return NO_ERROR;
1631        if (o_minor < -1 || o_minor > MINORMASK)
1632                return ERR_RESYNC_AFTER;
1633
1634        /* check for loops */
1635        odev = minor_to_device(o_minor);
1636        while (1) {
1637                if (odev == device)
1638                        return ERR_RESYNC_AFTER_CYCLE;
1639
1640                /* You are free to depend on diskless, non-existing,
1641                 * or not yet/no longer existing minors.
1642                 * We only reject dependency loops.
1643                 * We cannot follow the dependency chain beyond a detached or
1644                 * missing minor.
1645                 */
1646                if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1647                        return NO_ERROR;
1648
1649                rcu_read_lock();
1650                resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1651                rcu_read_unlock();
1652                /* dependency chain ends here, no cycles. */
1653                if (resync_after == -1)
1654                        return NO_ERROR;
1655
1656                /* follow the dependency chain */
1657                odev = minor_to_device(resync_after);
1658        }
1659}
1660
1661/* caller must lock_all_resources() */
1662void drbd_resync_after_changed(struct drbd_device *device)
1663{
1664        int changed;
1665
1666        do {
1667                changed  = drbd_pause_after(device);
1668                changed |= drbd_resume_next(device);
1669        } while (changed);
1670}
1671
1672void drbd_rs_controller_reset(struct drbd_device *device)
1673{
1674        struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1675        struct fifo_buffer *plan;
1676
1677        atomic_set(&device->rs_sect_in, 0);
1678        atomic_set(&device->rs_sect_ev, 0);
1679        device->rs_in_flight = 0;
1680        device->rs_last_events = (int)part_stat_read_accum(&disk->part0, sectors);
1681
1682        /* Updating the RCU protected object in place is necessary since
1683           this function gets called from atomic context.
1684           It is valid since all other updates also lead to an completely
1685           empty fifo */
1686        rcu_read_lock();
1687        plan = rcu_dereference(device->rs_plan_s);
1688        plan->total = 0;
1689        fifo_set(plan, 0);
1690        rcu_read_unlock();
1691}
1692
1693void start_resync_timer_fn(struct timer_list *t)
1694{
1695        struct drbd_device *device = from_timer(device, t, start_resync_timer);
1696        drbd_device_post_work(device, RS_START);
1697}
1698
1699static void do_start_resync(struct drbd_device *device)
1700{
1701        if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1702                drbd_warn(device, "postponing start_resync ...\n");
1703                device->start_resync_timer.expires = jiffies + HZ/10;
1704                add_timer(&device->start_resync_timer);
1705                return;
1706        }
1707
1708        drbd_start_resync(device, C_SYNC_SOURCE);
1709        clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1710}
1711
1712static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1713{
1714        bool csums_after_crash_only;
1715        rcu_read_lock();
1716        csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1717        rcu_read_unlock();
1718        return connection->agreed_pro_version >= 89 &&          /* supported? */
1719                connection->csums_tfm &&                        /* configured? */
1720                (csums_after_crash_only == false                /* use for each resync? */
1721                 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1722}
1723
1724/**
1725 * drbd_start_resync() - Start the resync process
1726 * @device:     DRBD device.
1727 * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1728 *
1729 * This function might bring you directly into one of the
1730 * C_PAUSED_SYNC_* states.
1731 */
1732void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1733{
1734        struct drbd_peer_device *peer_device = first_peer_device(device);
1735        struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1736        union drbd_state ns;
1737        int r;
1738
1739        if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1740                drbd_err(device, "Resync already running!\n");
1741                return;
1742        }
1743
1744        if (!connection) {
1745                drbd_err(device, "No connection to peer, aborting!\n");
1746                return;
1747        }
1748
1749        if (!test_bit(B_RS_H_DONE, &device->flags)) {
1750                if (side == C_SYNC_TARGET) {
1751                        /* Since application IO was locked out during C_WF_BITMAP_T and
1752                           C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1753                           we check that we might make the data inconsistent. */
1754                        r = drbd_khelper(device, "before-resync-target");
1755                        r = (r >> 8) & 0xff;
1756                        if (r > 0) {
1757                                drbd_info(device, "before-resync-target handler returned %d, "
1758                                         "dropping connection.\n", r);
1759                                conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1760                                return;
1761                        }
1762                } else /* C_SYNC_SOURCE */ {
1763                        r = drbd_khelper(device, "before-resync-source");
1764                        r = (r >> 8) & 0xff;
1765                        if (r > 0) {
1766                                if (r == 3) {
1767                                        drbd_info(device, "before-resync-source handler returned %d, "
1768                                                 "ignoring. Old userland tools?", r);
1769                                } else {
1770                                        drbd_info(device, "before-resync-source handler returned %d, "
1771                                                 "dropping connection.\n", r);
1772                                        conn_request_state(connection,
1773                                                           NS(conn, C_DISCONNECTING), CS_HARD);
1774                                        return;
1775                                }
1776                        }
1777                }
1778        }
1779
1780        if (current == connection->worker.task) {
1781                /* The worker should not sleep waiting for state_mutex,
1782                   that can take long */
1783                if (!mutex_trylock(device->state_mutex)) {
1784                        set_bit(B_RS_H_DONE, &device->flags);
1785                        device->start_resync_timer.expires = jiffies + HZ/5;
1786                        add_timer(&device->start_resync_timer);
1787                        return;
1788                }
1789        } else {
1790                mutex_lock(device->state_mutex);
1791        }
1792
1793        lock_all_resources();
1794        clear_bit(B_RS_H_DONE, &device->flags);
1795        /* Did some connection breakage or IO error race with us? */
1796        if (device->state.conn < C_CONNECTED
1797        || !get_ldev_if_state(device, D_NEGOTIATING)) {
1798                unlock_all_resources();
1799                goto out;
1800        }
1801
1802        ns = drbd_read_state(device);
1803
1804        ns.aftr_isp = !_drbd_may_sync_now(device);
1805
1806        ns.conn = side;
1807
1808        if (side == C_SYNC_TARGET)
1809                ns.disk = D_INCONSISTENT;
1810        else /* side == C_SYNC_SOURCE */
1811                ns.pdsk = D_INCONSISTENT;
1812
1813        r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1814        ns = drbd_read_state(device);
1815
1816        if (ns.conn < C_CONNECTED)
1817                r = SS_UNKNOWN_ERROR;
1818
1819        if (r == SS_SUCCESS) {
1820                unsigned long tw = drbd_bm_total_weight(device);
1821                unsigned long now = jiffies;
1822                int i;
1823
1824                device->rs_failed    = 0;
1825                device->rs_paused    = 0;
1826                device->rs_same_csum = 0;
1827                device->rs_last_sect_ev = 0;
1828                device->rs_total     = tw;
1829                device->rs_start     = now;
1830                for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1831                        device->rs_mark_left[i] = tw;
1832                        device->rs_mark_time[i] = now;
1833                }
1834                drbd_pause_after(device);
1835                /* Forget potentially stale cached per resync extent bit-counts.
1836                 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1837                 * disabled, and know the disk state is ok. */
1838                spin_lock(&device->al_lock);
1839                lc_reset(device->resync);
1840                device->resync_locked = 0;
1841                device->resync_wenr = LC_FREE;
1842                spin_unlock(&device->al_lock);
1843        }
1844        unlock_all_resources();
1845
1846        if (r == SS_SUCCESS) {
1847                wake_up(&device->al_wait); /* for lc_reset() above */
1848                /* reset rs_last_bcast when a resync or verify is started,
1849                 * to deal with potential jiffies wrap. */
1850                device->rs_last_bcast = jiffies - HZ;
1851
1852                drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1853                     drbd_conn_str(ns.conn),
1854                     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1855                     (unsigned long) device->rs_total);
1856                if (side == C_SYNC_TARGET) {
1857                        device->bm_resync_fo = 0;
1858                        device->use_csums = use_checksum_based_resync(connection, device);
1859                } else {
1860                        device->use_csums = false;
1861                }
1862
1863                /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1864                 * with w_send_oos, or the sync target will get confused as to
1865                 * how much bits to resync.  We cannot do that always, because for an
1866                 * empty resync and protocol < 95, we need to do it here, as we call
1867                 * drbd_resync_finished from here in that case.
1868                 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1869                 * and from after_state_ch otherwise. */
1870                if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1871                        drbd_gen_and_send_sync_uuid(peer_device);
1872
1873                if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1874                        /* This still has a race (about when exactly the peers
1875                         * detect connection loss) that can lead to a full sync
1876                         * on next handshake. In 8.3.9 we fixed this with explicit
1877                         * resync-finished notifications, but the fix
1878                         * introduces a protocol change.  Sleeping for some
1879                         * time longer than the ping interval + timeout on the
1880                         * SyncSource, to give the SyncTarget the chance to
1881                         * detect connection loss, then waiting for a ping
1882                         * response (implicit in drbd_resync_finished) reduces
1883                         * the race considerably, but does not solve it. */
1884                        if (side == C_SYNC_SOURCE) {
1885                                struct net_conf *nc;
1886                                int timeo;
1887
1888                                rcu_read_lock();
1889                                nc = rcu_dereference(connection->net_conf);
1890                                timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1891                                rcu_read_unlock();
1892                                schedule_timeout_interruptible(timeo);
1893                        }
1894                        drbd_resync_finished(device);
1895                }
1896
1897                drbd_rs_controller_reset(device);
1898                /* ns.conn may already be != device->state.conn,
1899                 * we may have been paused in between, or become paused until
1900                 * the timer triggers.
1901                 * No matter, that is handled in resync_timer_fn() */
1902                if (ns.conn == C_SYNC_TARGET)
1903                        mod_timer(&device->resync_timer, jiffies);
1904
1905                drbd_md_sync(device);
1906        }
1907        put_ldev(device);
1908out:
1909        mutex_unlock(device->state_mutex);
1910}
1911
1912static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1913{
1914        struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1915        device->rs_last_bcast = jiffies;
1916
1917        if (!get_ldev(device))
1918                return;
1919
1920        drbd_bm_write_lazy(device, 0);
1921        if (resync_done && is_sync_state(device->state.conn))
1922                drbd_resync_finished(device);
1923
1924        drbd_bcast_event(device, &sib);
1925        /* update timestamp, in case it took a while to write out stuff */
1926        device->rs_last_bcast = jiffies;
1927        put_ldev(device);
1928}
1929
1930static void drbd_ldev_destroy(struct drbd_device *device)
1931{
1932        lc_destroy(device->resync);
1933        device->resync = NULL;
1934        lc_destroy(device->act_log);
1935        device->act_log = NULL;
1936
1937        __acquire(local);
1938        drbd_backing_dev_free(device, device->ldev);
1939        device->ldev = NULL;
1940        __release(local);
1941
1942        clear_bit(GOING_DISKLESS, &device->flags);
1943        wake_up(&device->misc_wait);
1944}
1945
1946static void go_diskless(struct drbd_device *device)
1947{
1948        D_ASSERT(device, device->state.disk == D_FAILED);
1949        /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1950         * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1951         * the protected members anymore, though, so once put_ldev reaches zero
1952         * again, it will be safe to free them. */
1953
1954        /* Try to write changed bitmap pages, read errors may have just
1955         * set some bits outside the area covered by the activity log.
1956         *
1957         * If we have an IO error during the bitmap writeout,
1958         * we will want a full sync next time, just in case.
1959         * (Do we want a specific meta data flag for this?)
1960         *
1961         * If that does not make it to stable storage either,
1962         * we cannot do anything about that anymore.
1963         *
1964         * We still need to check if both bitmap and ldev are present, we may
1965         * end up here after a failed attach, before ldev was even assigned.
1966         */
1967        if (device->bitmap && device->ldev) {
1968                /* An interrupted resync or similar is allowed to recounts bits
1969                 * while we detach.
1970                 * Any modifications would not be expected anymore, though.
1971                 */
1972                if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1973                                        "detach", BM_LOCKED_TEST_ALLOWED)) {
1974                        if (test_bit(WAS_READ_ERROR, &device->flags)) {
1975                                drbd_md_set_flag(device, MDF_FULL_SYNC);
1976                                drbd_md_sync(device);
1977                        }
1978                }
1979        }
1980
1981        drbd_force_state(device, NS(disk, D_DISKLESS));
1982}
1983
1984static int do_md_sync(struct drbd_device *device)
1985{
1986        drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1987        drbd_md_sync(device);
1988        return 0;
1989}
1990
1991/* only called from drbd_worker thread, no locking */
1992void __update_timing_details(
1993                struct drbd_thread_timing_details *tdp,
1994                unsigned int *cb_nr,
1995                void *cb,
1996                const char *fn, const unsigned int line)
1997{
1998        unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1999        struct drbd_thread_timing_details *td = tdp + i;
2000
2001        td->start_jif = jiffies;
2002        td->cb_addr = cb;
2003        td->caller_fn = fn;
2004        td->line = line;
2005        td->cb_nr = *cb_nr;
2006
2007        i = (i+1) % DRBD_THREAD_DETAILS_HIST;
2008        td = tdp + i;
2009        memset(td, 0, sizeof(*td));
2010
2011        ++(*cb_nr);
2012}
2013
2014static void do_device_work(struct drbd_device *device, const unsigned long todo)
2015{
2016        if (test_bit(MD_SYNC, &todo))
2017                do_md_sync(device);
2018        if (test_bit(RS_DONE, &todo) ||
2019            test_bit(RS_PROGRESS, &todo))
2020                update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2021        if (test_bit(GO_DISKLESS, &todo))
2022                go_diskless(device);
2023        if (test_bit(DESTROY_DISK, &todo))
2024                drbd_ldev_destroy(device);
2025        if (test_bit(RS_START, &todo))
2026                do_start_resync(device);
2027}
2028
2029#define DRBD_DEVICE_WORK_MASK   \
2030        ((1UL << GO_DISKLESS)   \
2031        |(1UL << DESTROY_DISK)  \
2032        |(1UL << MD_SYNC)       \
2033        |(1UL << RS_START)      \
2034        |(1UL << RS_PROGRESS)   \
2035        |(1UL << RS_DONE)       \
2036        )
2037
2038static unsigned long get_work_bits(unsigned long *flags)
2039{
2040        unsigned long old, new;
2041        do {
2042                old = *flags;
2043                new = old & ~DRBD_DEVICE_WORK_MASK;
2044        } while (cmpxchg(flags, old, new) != old);
2045        return old & DRBD_DEVICE_WORK_MASK;
2046}
2047
2048static void do_unqueued_work(struct drbd_connection *connection)
2049{
2050        struct drbd_peer_device *peer_device;
2051        int vnr;
2052
2053        rcu_read_lock();
2054        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2055                struct drbd_device *device = peer_device->device;
2056                unsigned long todo = get_work_bits(&device->flags);
2057                if (!todo)
2058                        continue;
2059
2060                kref_get(&device->kref);
2061                rcu_read_unlock();
2062                do_device_work(device, todo);
2063                kref_put(&device->kref, drbd_destroy_device);
2064                rcu_read_lock();
2065        }
2066        rcu_read_unlock();
2067}
2068
2069static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2070{
2071        spin_lock_irq(&queue->q_lock);
2072        list_splice_tail_init(&queue->q, work_list);
2073        spin_unlock_irq(&queue->q_lock);
2074        return !list_empty(work_list);
2075}
2076
2077static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2078{
2079        DEFINE_WAIT(wait);
2080        struct net_conf *nc;
2081        int uncork, cork;
2082
2083        dequeue_work_batch(&connection->sender_work, work_list);
2084        if (!list_empty(work_list))
2085                return;
2086
2087        /* Still nothing to do?
2088         * Maybe we still need to close the current epoch,
2089         * even if no new requests are queued yet.
2090         *
2091         * Also, poke TCP, just in case.
2092         * Then wait for new work (or signal). */
2093        rcu_read_lock();
2094        nc = rcu_dereference(connection->net_conf);
2095        uncork = nc ? nc->tcp_cork : 0;
2096        rcu_read_unlock();
2097        if (uncork) {
2098                mutex_lock(&connection->data.mutex);
2099                if (connection->data.socket)
2100                        drbd_tcp_uncork(connection->data.socket);
2101                mutex_unlock(&connection->data.mutex);
2102        }
2103
2104        for (;;) {
2105                int send_barrier;
2106                prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2107                spin_lock_irq(&connection->resource->req_lock);
2108                spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2109                if (!list_empty(&connection->sender_work.q))
2110                        list_splice_tail_init(&connection->sender_work.q, work_list);
2111                spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2112                if (!list_empty(work_list) || signal_pending(current)) {
2113                        spin_unlock_irq(&connection->resource->req_lock);
2114                        break;
2115                }
2116
2117                /* We found nothing new to do, no to-be-communicated request,
2118                 * no other work item.  We may still need to close the last
2119                 * epoch.  Next incoming request epoch will be connection ->
2120                 * current transfer log epoch number.  If that is different
2121                 * from the epoch of the last request we communicated, it is
2122                 * safe to send the epoch separating barrier now.
2123                 */
2124                send_barrier =
2125                        atomic_read(&connection->current_tle_nr) !=
2126                        connection->send.current_epoch_nr;
2127                spin_unlock_irq(&connection->resource->req_lock);
2128
2129                if (send_barrier)
2130                        maybe_send_barrier(connection,
2131                                        connection->send.current_epoch_nr + 1);
2132
2133                if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2134                        break;
2135
2136                /* drbd_send() may have called flush_signals() */
2137                if (get_t_state(&connection->worker) != RUNNING)
2138                        break;
2139
2140                schedule();
2141                /* may be woken up for other things but new work, too,
2142                 * e.g. if the current epoch got closed.
2143                 * In which case we send the barrier above. */
2144        }
2145        finish_wait(&connection->sender_work.q_wait, &wait);
2146
2147        /* someone may have changed the config while we have been waiting above. */
2148        rcu_read_lock();
2149        nc = rcu_dereference(connection->net_conf);
2150        cork = nc ? nc->tcp_cork : 0;
2151        rcu_read_unlock();
2152        mutex_lock(&connection->data.mutex);
2153        if (connection->data.socket) {
2154                if (cork)
2155                        drbd_tcp_cork(connection->data.socket);
2156                else if (!uncork)
2157                        drbd_tcp_uncork(connection->data.socket);
2158        }
2159        mutex_unlock(&connection->data.mutex);
2160}
2161
2162int drbd_worker(struct drbd_thread *thi)
2163{
2164        struct drbd_connection *connection = thi->connection;
2165        struct drbd_work *w = NULL;
2166        struct drbd_peer_device *peer_device;
2167        LIST_HEAD(work_list);
2168        int vnr;
2169
2170        while (get_t_state(thi) == RUNNING) {
2171                drbd_thread_current_set_cpu(thi);
2172
2173                if (list_empty(&work_list)) {
2174                        update_worker_timing_details(connection, wait_for_work);
2175                        wait_for_work(connection, &work_list);
2176                }
2177
2178                if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2179                        update_worker_timing_details(connection, do_unqueued_work);
2180                        do_unqueued_work(connection);
2181                }
2182
2183                if (signal_pending(current)) {
2184                        flush_signals(current);
2185                        if (get_t_state(thi) == RUNNING) {
2186                                drbd_warn(connection, "Worker got an unexpected signal\n");
2187                                continue;
2188                        }
2189                        break;
2190                }
2191
2192                if (get_t_state(thi) != RUNNING)
2193                        break;
2194
2195                if (!list_empty(&work_list)) {
2196                        w = list_first_entry(&work_list, struct drbd_work, list);
2197                        list_del_init(&w->list);
2198                        update_worker_timing_details(connection, w->cb);
2199                        if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2200                                continue;
2201                        if (connection->cstate >= C_WF_REPORT_PARAMS)
2202                                conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2203                }
2204        }
2205
2206        do {
2207                if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2208                        update_worker_timing_details(connection, do_unqueued_work);
2209                        do_unqueued_work(connection);
2210                }
2211                if (!list_empty(&work_list)) {
2212                        w = list_first_entry(&work_list, struct drbd_work, list);
2213                        list_del_init(&w->list);
2214                        update_worker_timing_details(connection, w->cb);
2215                        w->cb(w, 1);
2216                } else
2217                        dequeue_work_batch(&connection->sender_work, &work_list);
2218        } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2219
2220        rcu_read_lock();
2221        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2222                struct drbd_device *device = peer_device->device;
2223                D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2224                kref_get(&device->kref);
2225                rcu_read_unlock();
2226                drbd_device_cleanup(device);
2227                kref_put(&device->kref, drbd_destroy_device);
2228                rcu_read_lock();
2229        }
2230        rcu_read_unlock();
2231
2232        return 0;
2233}
2234