linux/drivers/block/drbd/drbd_worker.c
<<
>>
Prefs
   1/*
   2   drbd_worker.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23
  24*/
  25
  26#include <linux/module.h>
  27#include <linux/drbd.h>
  28#include <linux/sched.h>
  29#include <linux/wait.h>
  30#include <linux/mm.h>
  31#include <linux/memcontrol.h>
  32#include <linux/mm_inline.h>
  33#include <linux/slab.h>
  34#include <linux/random.h>
  35#include <linux/string.h>
  36#include <linux/scatterlist.h>
  37
  38#include "drbd_int.h"
  39#include "drbd_protocol.h"
  40#include "drbd_req.h"
  41
  42static int make_ov_request(struct drbd_device *, int);
  43static int make_resync_request(struct drbd_device *, int);
  44
  45/* endio handlers:
  46 *   drbd_md_endio (defined here)
  47 *   drbd_request_endio (defined here)
  48 *   drbd_peer_request_endio (defined here)
  49 *   drbd_bm_endio (defined in drbd_bitmap.c)
  50 *
  51 * For all these callbacks, note the following:
  52 * The callbacks will be called in irq context by the IDE drivers,
  53 * and in Softirqs/Tasklets/BH context by the SCSI drivers.
  54 * Try to get the locking right :)
  55 *
  56 */
  57
  58/* used for synchronous meta data and bitmap IO
  59 * submitted by drbd_md_sync_page_io()
  60 */
  61void drbd_md_endio(struct bio *bio)
  62{
  63        struct drbd_device *device;
  64
  65        device = bio->bi_private;
  66        device->md_io.error = bio->bi_error;
  67
  68        /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able
  69         * to timeout on the lower level device, and eventually detach from it.
  70         * If this io completion runs after that timeout expired, this
  71         * drbd_md_put_buffer() may allow us to finally try and re-attach.
  72         * During normal operation, this only puts that extra reference
  73         * down to 1 again.
  74         * Make sure we first drop the reference, and only then signal
  75         * completion, or we may (in drbd_al_read_log()) cycle so fast into the
  76         * next drbd_md_sync_page_io(), that we trigger the
  77         * ASSERT(atomic_read(&device->md_io_in_use) == 1) there.
  78         */
  79        drbd_md_put_buffer(device);
  80        device->md_io.done = 1;
  81        wake_up(&device->misc_wait);
  82        bio_put(bio);
  83        if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */
  84                put_ldev(device);
  85}
  86
  87/* reads on behalf of the partner,
  88 * "submitted" by the receiver
  89 */
  90static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local)
  91{
  92        unsigned long flags = 0;
  93        struct drbd_peer_device *peer_device = peer_req->peer_device;
  94        struct drbd_device *device = peer_device->device;
  95
  96        spin_lock_irqsave(&device->resource->req_lock, flags);
  97        device->read_cnt += peer_req->i.size >> 9;
  98        list_del(&peer_req->w.list);
  99        if (list_empty(&device->read_ee))
 100                wake_up(&device->ee_wait);
 101        if (test_bit(__EE_WAS_ERROR, &peer_req->flags))
 102                __drbd_chk_io_error(device, DRBD_READ_ERROR);
 103        spin_unlock_irqrestore(&device->resource->req_lock, flags);
 104
 105        drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w);
 106        put_ldev(device);
 107}
 108
 109/* writes on behalf of the partner, or resync writes,
 110 * "submitted" by the receiver, final stage.  */
 111void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local)
 112{
 113        unsigned long flags = 0;
 114        struct drbd_peer_device *peer_device = peer_req->peer_device;
 115        struct drbd_device *device = peer_device->device;
 116        struct drbd_connection *connection = peer_device->connection;
 117        struct drbd_interval i;
 118        int do_wake;
 119        u64 block_id;
 120        int do_al_complete_io;
 121
 122        /* after we moved peer_req to done_ee,
 123         * we may no longer access it,
 124         * it may be freed/reused already!
 125         * (as soon as we release the req_lock) */
 126        i = peer_req->i;
 127        do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO;
 128        block_id = peer_req->block_id;
 129        peer_req->flags &= ~EE_CALL_AL_COMPLETE_IO;
 130
 131        spin_lock_irqsave(&device->resource->req_lock, flags);
 132        device->writ_cnt += peer_req->i.size >> 9;
 133        list_move_tail(&peer_req->w.list, &device->done_ee);
 134
 135        /*
 136         * Do not remove from the write_requests tree here: we did not send the
 137         * Ack yet and did not wake possibly waiting conflicting requests.
 138         * Removed from the tree from "drbd_process_done_ee" within the
 139         * appropriate dw.cb (e_end_block/e_end_resync_block) or from
 140         * _drbd_clear_done_ee.
 141         */
 142
 143        do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee);
 144
 145        /* FIXME do we want to detach for failed REQ_DISCARD?
 146         * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */
 147        if (peer_req->flags & EE_WAS_ERROR)
 148                __drbd_chk_io_error(device, DRBD_WRITE_ERROR);
 149
 150        if (connection->cstate >= C_WF_REPORT_PARAMS) {
 151                kref_get(&device->kref); /* put is in drbd_send_acks_wf() */
 152                if (!queue_work(connection->ack_sender, &peer_device->send_acks_work))
 153                        kref_put(&device->kref, drbd_destroy_device);
 154        }
 155        spin_unlock_irqrestore(&device->resource->req_lock, flags);
 156
 157        if (block_id == ID_SYNCER)
 158                drbd_rs_complete_io(device, i.sector);
 159
 160        if (do_wake)
 161                wake_up(&device->ee_wait);
 162
 163        if (do_al_complete_io)
 164                drbd_al_complete_io(device, &i);
 165
 166        put_ldev(device);
 167}
 168
 169/* writes on behalf of the partner, or resync writes,
 170 * "submitted" by the receiver.
 171 */
 172void drbd_peer_request_endio(struct bio *bio)
 173{
 174        struct drbd_peer_request *peer_req = bio->bi_private;
 175        struct drbd_device *device = peer_req->peer_device->device;
 176        bool is_write = bio_data_dir(bio) == WRITE;
 177        bool is_discard = !!(bio_op(bio) == REQ_OP_DISCARD);
 178
 179        if (bio->bi_error && __ratelimit(&drbd_ratelimit_state))
 180                drbd_warn(device, "%s: error=%d s=%llus\n",
 181                                is_write ? (is_discard ? "discard" : "write")
 182                                        : "read", bio->bi_error,
 183                                (unsigned long long)peer_req->i.sector);
 184
 185        if (bio->bi_error)
 186                set_bit(__EE_WAS_ERROR, &peer_req->flags);
 187
 188        bio_put(bio); /* no need for the bio anymore */
 189        if (atomic_dec_and_test(&peer_req->pending_bios)) {
 190                if (is_write)
 191                        drbd_endio_write_sec_final(peer_req);
 192                else
 193                        drbd_endio_read_sec_final(peer_req);
 194        }
 195}
 196
 197void drbd_panic_after_delayed_completion_of_aborted_request(struct drbd_device *device)
 198{
 199        panic("drbd%u %s/%u potential random memory corruption caused by delayed completion of aborted local request\n",
 200                device->minor, device->resource->name, device->vnr);
 201}
 202
 203/* read, readA or write requests on R_PRIMARY coming from drbd_make_request
 204 */
 205void drbd_request_endio(struct bio *bio)
 206{
 207        unsigned long flags;
 208        struct drbd_request *req = bio->bi_private;
 209        struct drbd_device *device = req->device;
 210        struct bio_and_error m;
 211        enum drbd_req_event what;
 212
 213        /* If this request was aborted locally before,
 214         * but now was completed "successfully",
 215         * chances are that this caused arbitrary data corruption.
 216         *
 217         * "aborting" requests, or force-detaching the disk, is intended for
 218         * completely blocked/hung local backing devices which do no longer
 219         * complete requests at all, not even do error completions.  In this
 220         * situation, usually a hard-reset and failover is the only way out.
 221         *
 222         * By "aborting", basically faking a local error-completion,
 223         * we allow for a more graceful swichover by cleanly migrating services.
 224         * Still the affected node has to be rebooted "soon".
 225         *
 226         * By completing these requests, we allow the upper layers to re-use
 227         * the associated data pages.
 228         *
 229         * If later the local backing device "recovers", and now DMAs some data
 230         * from disk into the original request pages, in the best case it will
 231         * just put random data into unused pages; but typically it will corrupt
 232         * meanwhile completely unrelated data, causing all sorts of damage.
 233         *
 234         * Which means delayed successful completion,
 235         * especially for READ requests,
 236         * is a reason to panic().
 237         *
 238         * We assume that a delayed *error* completion is OK,
 239         * though we still will complain noisily about it.
 240         */
 241        if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) {
 242                if (__ratelimit(&drbd_ratelimit_state))
 243                        drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n");
 244
 245                if (!bio->bi_error)
 246                        drbd_panic_after_delayed_completion_of_aborted_request(device);
 247        }
 248
 249        /* to avoid recursion in __req_mod */
 250        if (unlikely(bio->bi_error)) {
 251                switch (bio_op(bio)) {
 252                case REQ_OP_DISCARD:
 253                        if (bio->bi_error == -EOPNOTSUPP)
 254                                what = DISCARD_COMPLETED_NOTSUPP;
 255                        else
 256                                what = DISCARD_COMPLETED_WITH_ERROR;
 257                        break;
 258                case REQ_OP_READ:
 259                        if (bio->bi_opf & REQ_RAHEAD)
 260                                what = READ_AHEAD_COMPLETED_WITH_ERROR;
 261                        else
 262                                what = READ_COMPLETED_WITH_ERROR;
 263                        break;
 264                default:
 265                        what = WRITE_COMPLETED_WITH_ERROR;
 266                        break;
 267                }
 268        } else {
 269                what = COMPLETED_OK;
 270        }
 271
 272        bio_put(req->private_bio);
 273        req->private_bio = ERR_PTR(bio->bi_error);
 274
 275        /* not req_mod(), we need irqsave here! */
 276        spin_lock_irqsave(&device->resource->req_lock, flags);
 277        __req_mod(req, what, &m);
 278        spin_unlock_irqrestore(&device->resource->req_lock, flags);
 279        put_ldev(device);
 280
 281        if (m.bio)
 282                complete_master_bio(device, &m);
 283}
 284
 285void drbd_csum_ee(struct crypto_ahash *tfm, struct drbd_peer_request *peer_req, void *digest)
 286{
 287        AHASH_REQUEST_ON_STACK(req, tfm);
 288        struct scatterlist sg;
 289        struct page *page = peer_req->pages;
 290        struct page *tmp;
 291        unsigned len;
 292
 293        ahash_request_set_tfm(req, tfm);
 294        ahash_request_set_callback(req, 0, NULL, NULL);
 295
 296        sg_init_table(&sg, 1);
 297        crypto_ahash_init(req);
 298
 299        while ((tmp = page_chain_next(page))) {
 300                /* all but the last page will be fully used */
 301                sg_set_page(&sg, page, PAGE_SIZE, 0);
 302                ahash_request_set_crypt(req, &sg, NULL, sg.length);
 303                crypto_ahash_update(req);
 304                page = tmp;
 305        }
 306        /* and now the last, possibly only partially used page */
 307        len = peer_req->i.size & (PAGE_SIZE - 1);
 308        sg_set_page(&sg, page, len ?: PAGE_SIZE, 0);
 309        ahash_request_set_crypt(req, &sg, digest, sg.length);
 310        crypto_ahash_finup(req);
 311        ahash_request_zero(req);
 312}
 313
 314void drbd_csum_bio(struct crypto_ahash *tfm, struct bio *bio, void *digest)
 315{
 316        AHASH_REQUEST_ON_STACK(req, tfm);
 317        struct scatterlist sg;
 318        struct bio_vec bvec;
 319        struct bvec_iter iter;
 320
 321        ahash_request_set_tfm(req, tfm);
 322        ahash_request_set_callback(req, 0, NULL, NULL);
 323
 324        sg_init_table(&sg, 1);
 325        crypto_ahash_init(req);
 326
 327        bio_for_each_segment(bvec, bio, iter) {
 328                sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset);
 329                ahash_request_set_crypt(req, &sg, NULL, sg.length);
 330                crypto_ahash_update(req);
 331                /* REQ_OP_WRITE_SAME has only one segment,
 332                 * checksum the payload only once. */
 333                if (bio_op(bio) == REQ_OP_WRITE_SAME)
 334                        break;
 335        }
 336        ahash_request_set_crypt(req, NULL, digest, 0);
 337        crypto_ahash_final(req);
 338        ahash_request_zero(req);
 339}
 340
 341/* MAYBE merge common code with w_e_end_ov_req */
 342static int w_e_send_csum(struct drbd_work *w, int cancel)
 343{
 344        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
 345        struct drbd_peer_device *peer_device = peer_req->peer_device;
 346        struct drbd_device *device = peer_device->device;
 347        int digest_size;
 348        void *digest;
 349        int err = 0;
 350
 351        if (unlikely(cancel))
 352                goto out;
 353
 354        if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0))
 355                goto out;
 356
 357        digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
 358        digest = kmalloc(digest_size, GFP_NOIO);
 359        if (digest) {
 360                sector_t sector = peer_req->i.sector;
 361                unsigned int size = peer_req->i.size;
 362                drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
 363                /* Free peer_req and pages before send.
 364                 * In case we block on congestion, we could otherwise run into
 365                 * some distributed deadlock, if the other side blocks on
 366                 * congestion as well, because our receiver blocks in
 367                 * drbd_alloc_pages due to pp_in_use > max_buffers. */
 368                drbd_free_peer_req(device, peer_req);
 369                peer_req = NULL;
 370                inc_rs_pending(device);
 371                err = drbd_send_drequest_csum(peer_device, sector, size,
 372                                              digest, digest_size,
 373                                              P_CSUM_RS_REQUEST);
 374                kfree(digest);
 375        } else {
 376                drbd_err(device, "kmalloc() of digest failed.\n");
 377                err = -ENOMEM;
 378        }
 379
 380out:
 381        if (peer_req)
 382                drbd_free_peer_req(device, peer_req);
 383
 384        if (unlikely(err))
 385                drbd_err(device, "drbd_send_drequest(..., csum) failed\n");
 386        return err;
 387}
 388
 389#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
 390
 391static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size)
 392{
 393        struct drbd_device *device = peer_device->device;
 394        struct drbd_peer_request *peer_req;
 395
 396        if (!get_ldev(device))
 397                return -EIO;
 398
 399        /* GFP_TRY, because if there is no memory available right now, this may
 400         * be rescheduled for later. It is "only" background resync, after all. */
 401        peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector,
 402                                       size, size, GFP_TRY);
 403        if (!peer_req)
 404                goto defer;
 405
 406        peer_req->w.cb = w_e_send_csum;
 407        spin_lock_irq(&device->resource->req_lock);
 408        list_add_tail(&peer_req->w.list, &device->read_ee);
 409        spin_unlock_irq(&device->resource->req_lock);
 410
 411        atomic_add(size >> 9, &device->rs_sect_ev);
 412        if (drbd_submit_peer_request(device, peer_req, REQ_OP_READ, 0,
 413                                     DRBD_FAULT_RS_RD) == 0)
 414                return 0;
 415
 416        /* If it failed because of ENOMEM, retry should help.  If it failed
 417         * because bio_add_page failed (probably broken lower level driver),
 418         * retry may or may not help.
 419         * If it does not, you may need to force disconnect. */
 420        spin_lock_irq(&device->resource->req_lock);
 421        list_del(&peer_req->w.list);
 422        spin_unlock_irq(&device->resource->req_lock);
 423
 424        drbd_free_peer_req(device, peer_req);
 425defer:
 426        put_ldev(device);
 427        return -EAGAIN;
 428}
 429
 430int w_resync_timer(struct drbd_work *w, int cancel)
 431{
 432        struct drbd_device *device =
 433                container_of(w, struct drbd_device, resync_work);
 434
 435        switch (device->state.conn) {
 436        case C_VERIFY_S:
 437                make_ov_request(device, cancel);
 438                break;
 439        case C_SYNC_TARGET:
 440                make_resync_request(device, cancel);
 441                break;
 442        }
 443
 444        return 0;
 445}
 446
 447void resync_timer_fn(unsigned long data)
 448{
 449        struct drbd_device *device = (struct drbd_device *) data;
 450
 451        drbd_queue_work_if_unqueued(
 452                &first_peer_device(device)->connection->sender_work,
 453                &device->resync_work);
 454}
 455
 456static void fifo_set(struct fifo_buffer *fb, int value)
 457{
 458        int i;
 459
 460        for (i = 0; i < fb->size; i++)
 461                fb->values[i] = value;
 462}
 463
 464static int fifo_push(struct fifo_buffer *fb, int value)
 465{
 466        int ov;
 467
 468        ov = fb->values[fb->head_index];
 469        fb->values[fb->head_index++] = value;
 470
 471        if (fb->head_index >= fb->size)
 472                fb->head_index = 0;
 473
 474        return ov;
 475}
 476
 477static void fifo_add_val(struct fifo_buffer *fb, int value)
 478{
 479        int i;
 480
 481        for (i = 0; i < fb->size; i++)
 482                fb->values[i] += value;
 483}
 484
 485struct fifo_buffer *fifo_alloc(int fifo_size)
 486{
 487        struct fifo_buffer *fb;
 488
 489        fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO);
 490        if (!fb)
 491                return NULL;
 492
 493        fb->head_index = 0;
 494        fb->size = fifo_size;
 495        fb->total = 0;
 496
 497        return fb;
 498}
 499
 500static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in)
 501{
 502        struct disk_conf *dc;
 503        unsigned int want;     /* The number of sectors we want in-flight */
 504        int req_sect; /* Number of sectors to request in this turn */
 505        int correction; /* Number of sectors more we need in-flight */
 506        int cps; /* correction per invocation of drbd_rs_controller() */
 507        int steps; /* Number of time steps to plan ahead */
 508        int curr_corr;
 509        int max_sect;
 510        struct fifo_buffer *plan;
 511
 512        dc = rcu_dereference(device->ldev->disk_conf);
 513        plan = rcu_dereference(device->rs_plan_s);
 514
 515        steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */
 516
 517        if (device->rs_in_flight + sect_in == 0) { /* At start of resync */
 518                want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps;
 519        } else { /* normal path */
 520                want = dc->c_fill_target ? dc->c_fill_target :
 521                        sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10);
 522        }
 523
 524        correction = want - device->rs_in_flight - plan->total;
 525
 526        /* Plan ahead */
 527        cps = correction / steps;
 528        fifo_add_val(plan, cps);
 529        plan->total += cps * steps;
 530
 531        /* What we do in this step */
 532        curr_corr = fifo_push(plan, 0);
 533        plan->total -= curr_corr;
 534
 535        req_sect = sect_in + curr_corr;
 536        if (req_sect < 0)
 537                req_sect = 0;
 538
 539        max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ;
 540        if (req_sect > max_sect)
 541                req_sect = max_sect;
 542
 543        /*
 544        drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n",
 545                 sect_in, device->rs_in_flight, want, correction,
 546                 steps, cps, device->rs_planed, curr_corr, req_sect);
 547        */
 548
 549        return req_sect;
 550}
 551
 552static int drbd_rs_number_requests(struct drbd_device *device)
 553{
 554        unsigned int sect_in;  /* Number of sectors that came in since the last turn */
 555        int number, mxb;
 556
 557        sect_in = atomic_xchg(&device->rs_sect_in, 0);
 558        device->rs_in_flight -= sect_in;
 559
 560        rcu_read_lock();
 561        mxb = drbd_get_max_buffers(device) / 2;
 562        if (rcu_dereference(device->rs_plan_s)->size) {
 563                number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9);
 564                device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME;
 565        } else {
 566                device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate;
 567                number = SLEEP_TIME * device->c_sync_rate  / ((BM_BLOCK_SIZE / 1024) * HZ);
 568        }
 569        rcu_read_unlock();
 570
 571        /* Don't have more than "max-buffers"/2 in-flight.
 572         * Otherwise we may cause the remote site to stall on drbd_alloc_pages(),
 573         * potentially causing a distributed deadlock on congestion during
 574         * online-verify or (checksum-based) resync, if max-buffers,
 575         * socket buffer sizes and resync rate settings are mis-configured. */
 576
 577        /* note that "number" is in units of "BM_BLOCK_SIZE" (which is 4k),
 578         * mxb (as used here, and in drbd_alloc_pages on the peer) is
 579         * "number of pages" (typically also 4k),
 580         * but "rs_in_flight" is in "sectors" (512 Byte). */
 581        if (mxb - device->rs_in_flight/8 < number)
 582                number = mxb - device->rs_in_flight/8;
 583
 584        return number;
 585}
 586
 587static int make_resync_request(struct drbd_device *const device, int cancel)
 588{
 589        struct drbd_peer_device *const peer_device = first_peer_device(device);
 590        struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL;
 591        unsigned long bit;
 592        sector_t sector;
 593        const sector_t capacity = drbd_get_capacity(device->this_bdev);
 594        int max_bio_size;
 595        int number, rollback_i, size;
 596        int align, requeue = 0;
 597        int i = 0;
 598        int discard_granularity = 0;
 599
 600        if (unlikely(cancel))
 601                return 0;
 602
 603        if (device->rs_total == 0) {
 604                /* empty resync? */
 605                drbd_resync_finished(device);
 606                return 0;
 607        }
 608
 609        if (!get_ldev(device)) {
 610                /* Since we only need to access device->rsync a
 611                   get_ldev_if_state(device,D_FAILED) would be sufficient, but
 612                   to continue resync with a broken disk makes no sense at
 613                   all */
 614                drbd_err(device, "Disk broke down during resync!\n");
 615                return 0;
 616        }
 617
 618        if (connection->agreed_features & DRBD_FF_THIN_RESYNC) {
 619                rcu_read_lock();
 620                discard_granularity = rcu_dereference(device->ldev->disk_conf)->rs_discard_granularity;
 621                rcu_read_unlock();
 622        }
 623
 624        max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9;
 625        number = drbd_rs_number_requests(device);
 626        if (number <= 0)
 627                goto requeue;
 628
 629        for (i = 0; i < number; i++) {
 630                /* Stop generating RS requests when half of the send buffer is filled,
 631                 * but notify TCP that we'd like to have more space. */
 632                mutex_lock(&connection->data.mutex);
 633                if (connection->data.socket) {
 634                        struct sock *sk = connection->data.socket->sk;
 635                        int queued = sk->sk_wmem_queued;
 636                        int sndbuf = sk->sk_sndbuf;
 637                        if (queued > sndbuf / 2) {
 638                                requeue = 1;
 639                                if (sk->sk_socket)
 640                                        set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
 641                        }
 642                } else
 643                        requeue = 1;
 644                mutex_unlock(&connection->data.mutex);
 645                if (requeue)
 646                        goto requeue;
 647
 648next_sector:
 649                size = BM_BLOCK_SIZE;
 650                bit  = drbd_bm_find_next(device, device->bm_resync_fo);
 651
 652                if (bit == DRBD_END_OF_BITMAP) {
 653                        device->bm_resync_fo = drbd_bm_bits(device);
 654                        put_ldev(device);
 655                        return 0;
 656                }
 657
 658                sector = BM_BIT_TO_SECT(bit);
 659
 660                if (drbd_try_rs_begin_io(device, sector)) {
 661                        device->bm_resync_fo = bit;
 662                        goto requeue;
 663                }
 664                device->bm_resync_fo = bit + 1;
 665
 666                if (unlikely(drbd_bm_test_bit(device, bit) == 0)) {
 667                        drbd_rs_complete_io(device, sector);
 668                        goto next_sector;
 669                }
 670
 671#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE
 672                /* try to find some adjacent bits.
 673                 * we stop if we have already the maximum req size.
 674                 *
 675                 * Additionally always align bigger requests, in order to
 676                 * be prepared for all stripe sizes of software RAIDs.
 677                 */
 678                align = 1;
 679                rollback_i = i;
 680                while (i < number) {
 681                        if (size + BM_BLOCK_SIZE > max_bio_size)
 682                                break;
 683
 684                        /* Be always aligned */
 685                        if (sector & ((1<<(align+3))-1))
 686                                break;
 687
 688                        if (discard_granularity && size == discard_granularity)
 689                                break;
 690
 691                        /* do not cross extent boundaries */
 692                        if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0)
 693                                break;
 694                        /* now, is it actually dirty, after all?
 695                         * caution, drbd_bm_test_bit is tri-state for some
 696                         * obscure reason; ( b == 0 ) would get the out-of-band
 697                         * only accidentally right because of the "oddly sized"
 698                         * adjustment below */
 699                        if (drbd_bm_test_bit(device, bit+1) != 1)
 700                                break;
 701                        bit++;
 702                        size += BM_BLOCK_SIZE;
 703                        if ((BM_BLOCK_SIZE << align) <= size)
 704                                align++;
 705                        i++;
 706                }
 707                /* if we merged some,
 708                 * reset the offset to start the next drbd_bm_find_next from */
 709                if (size > BM_BLOCK_SIZE)
 710                        device->bm_resync_fo = bit + 1;
 711#endif
 712
 713                /* adjust very last sectors, in case we are oddly sized */
 714                if (sector + (size>>9) > capacity)
 715                        size = (capacity-sector)<<9;
 716
 717                if (device->use_csums) {
 718                        switch (read_for_csum(peer_device, sector, size)) {
 719                        case -EIO: /* Disk failure */
 720                                put_ldev(device);
 721                                return -EIO;
 722                        case -EAGAIN: /* allocation failed, or ldev busy */
 723                                drbd_rs_complete_io(device, sector);
 724                                device->bm_resync_fo = BM_SECT_TO_BIT(sector);
 725                                i = rollback_i;
 726                                goto requeue;
 727                        case 0:
 728                                /* everything ok */
 729                                break;
 730                        default:
 731                                BUG();
 732                        }
 733                } else {
 734                        int err;
 735
 736                        inc_rs_pending(device);
 737                        err = drbd_send_drequest(peer_device,
 738                                                 size == discard_granularity ? P_RS_THIN_REQ : P_RS_DATA_REQUEST,
 739                                                 sector, size, ID_SYNCER);
 740                        if (err) {
 741                                drbd_err(device, "drbd_send_drequest() failed, aborting...\n");
 742                                dec_rs_pending(device);
 743                                put_ldev(device);
 744                                return err;
 745                        }
 746                }
 747        }
 748
 749        if (device->bm_resync_fo >= drbd_bm_bits(device)) {
 750                /* last syncer _request_ was sent,
 751                 * but the P_RS_DATA_REPLY not yet received.  sync will end (and
 752                 * next sync group will resume), as soon as we receive the last
 753                 * resync data block, and the last bit is cleared.
 754                 * until then resync "work" is "inactive" ...
 755                 */
 756                put_ldev(device);
 757                return 0;
 758        }
 759
 760 requeue:
 761        device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 762        mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 763        put_ldev(device);
 764        return 0;
 765}
 766
 767static int make_ov_request(struct drbd_device *device, int cancel)
 768{
 769        int number, i, size;
 770        sector_t sector;
 771        const sector_t capacity = drbd_get_capacity(device->this_bdev);
 772        bool stop_sector_reached = false;
 773
 774        if (unlikely(cancel))
 775                return 1;
 776
 777        number = drbd_rs_number_requests(device);
 778
 779        sector = device->ov_position;
 780        for (i = 0; i < number; i++) {
 781                if (sector >= capacity)
 782                        return 1;
 783
 784                /* We check for "finished" only in the reply path:
 785                 * w_e_end_ov_reply().
 786                 * We need to send at least one request out. */
 787                stop_sector_reached = i > 0
 788                        && verify_can_do_stop_sector(device)
 789                        && sector >= device->ov_stop_sector;
 790                if (stop_sector_reached)
 791                        break;
 792
 793                size = BM_BLOCK_SIZE;
 794
 795                if (drbd_try_rs_begin_io(device, sector)) {
 796                        device->ov_position = sector;
 797                        goto requeue;
 798                }
 799
 800                if (sector + (size>>9) > capacity)
 801                        size = (capacity-sector)<<9;
 802
 803                inc_rs_pending(device);
 804                if (drbd_send_ov_request(first_peer_device(device), sector, size)) {
 805                        dec_rs_pending(device);
 806                        return 0;
 807                }
 808                sector += BM_SECT_PER_BIT;
 809        }
 810        device->ov_position = sector;
 811
 812 requeue:
 813        device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9));
 814        if (i == 0 || !stop_sector_reached)
 815                mod_timer(&device->resync_timer, jiffies + SLEEP_TIME);
 816        return 1;
 817}
 818
 819int w_ov_finished(struct drbd_work *w, int cancel)
 820{
 821        struct drbd_device_work *dw =
 822                container_of(w, struct drbd_device_work, w);
 823        struct drbd_device *device = dw->device;
 824        kfree(dw);
 825        ov_out_of_sync_print(device);
 826        drbd_resync_finished(device);
 827
 828        return 0;
 829}
 830
 831static int w_resync_finished(struct drbd_work *w, int cancel)
 832{
 833        struct drbd_device_work *dw =
 834                container_of(w, struct drbd_device_work, w);
 835        struct drbd_device *device = dw->device;
 836        kfree(dw);
 837
 838        drbd_resync_finished(device);
 839
 840        return 0;
 841}
 842
 843static void ping_peer(struct drbd_device *device)
 844{
 845        struct drbd_connection *connection = first_peer_device(device)->connection;
 846
 847        clear_bit(GOT_PING_ACK, &connection->flags);
 848        request_ping(connection);
 849        wait_event(connection->ping_wait,
 850                   test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED);
 851}
 852
 853int drbd_resync_finished(struct drbd_device *device)
 854{
 855        struct drbd_connection *connection = first_peer_device(device)->connection;
 856        unsigned long db, dt, dbdt;
 857        unsigned long n_oos;
 858        union drbd_state os, ns;
 859        struct drbd_device_work *dw;
 860        char *khelper_cmd = NULL;
 861        int verify_done = 0;
 862
 863        /* Remove all elements from the resync LRU. Since future actions
 864         * might set bits in the (main) bitmap, then the entries in the
 865         * resync LRU would be wrong. */
 866        if (drbd_rs_del_all(device)) {
 867                /* In case this is not possible now, most probably because
 868                 * there are P_RS_DATA_REPLY Packets lingering on the worker's
 869                 * queue (or even the read operations for those packets
 870                 * is not finished by now).   Retry in 100ms. */
 871
 872                schedule_timeout_interruptible(HZ / 10);
 873                dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC);
 874                if (dw) {
 875                        dw->w.cb = w_resync_finished;
 876                        dw->device = device;
 877                        drbd_queue_work(&connection->sender_work, &dw->w);
 878                        return 1;
 879                }
 880                drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n");
 881        }
 882
 883        dt = (jiffies - device->rs_start - device->rs_paused) / HZ;
 884        if (dt <= 0)
 885                dt = 1;
 886
 887        db = device->rs_total;
 888        /* adjust for verify start and stop sectors, respective reached position */
 889        if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T)
 890                db -= device->ov_left;
 891
 892        dbdt = Bit2KB(db/dt);
 893        device->rs_paused /= HZ;
 894
 895        if (!get_ldev(device))
 896                goto out;
 897
 898        ping_peer(device);
 899
 900        spin_lock_irq(&device->resource->req_lock);
 901        os = drbd_read_state(device);
 902
 903        verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T);
 904
 905        /* This protects us against multiple calls (that can happen in the presence
 906           of application IO), and against connectivity loss just before we arrive here. */
 907        if (os.conn <= C_CONNECTED)
 908                goto out_unlock;
 909
 910        ns = os;
 911        ns.conn = C_CONNECTED;
 912
 913        drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n",
 914             verify_done ? "Online verify" : "Resync",
 915             dt + device->rs_paused, device->rs_paused, dbdt);
 916
 917        n_oos = drbd_bm_total_weight(device);
 918
 919        if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) {
 920                if (n_oos) {
 921                        drbd_alert(device, "Online verify found %lu %dk block out of sync!\n",
 922                              n_oos, Bit2KB(1));
 923                        khelper_cmd = "out-of-sync";
 924                }
 925        } else {
 926                D_ASSERT(device, (n_oos - device->rs_failed) == 0);
 927
 928                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T)
 929                        khelper_cmd = "after-resync-target";
 930
 931                if (device->use_csums && device->rs_total) {
 932                        const unsigned long s = device->rs_same_csum;
 933                        const unsigned long t = device->rs_total;
 934                        const int ratio =
 935                                (t == 0)     ? 0 :
 936                        (t < 100000) ? ((s*100)/t) : (s/(t/100));
 937                        drbd_info(device, "%u %% had equal checksums, eliminated: %luK; "
 938                             "transferred %luK total %luK\n",
 939                             ratio,
 940                             Bit2KB(device->rs_same_csum),
 941                             Bit2KB(device->rs_total - device->rs_same_csum),
 942                             Bit2KB(device->rs_total));
 943                }
 944        }
 945
 946        if (device->rs_failed) {
 947                drbd_info(device, "            %lu failed blocks\n", device->rs_failed);
 948
 949                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 950                        ns.disk = D_INCONSISTENT;
 951                        ns.pdsk = D_UP_TO_DATE;
 952                } else {
 953                        ns.disk = D_UP_TO_DATE;
 954                        ns.pdsk = D_INCONSISTENT;
 955                }
 956        } else {
 957                ns.disk = D_UP_TO_DATE;
 958                ns.pdsk = D_UP_TO_DATE;
 959
 960                if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) {
 961                        if (device->p_uuid) {
 962                                int i;
 963                                for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++)
 964                                        _drbd_uuid_set(device, i, device->p_uuid[i]);
 965                                drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]);
 966                                _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]);
 967                        } else {
 968                                drbd_err(device, "device->p_uuid is NULL! BUG\n");
 969                        }
 970                }
 971
 972                if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) {
 973                        /* for verify runs, we don't update uuids here,
 974                         * so there would be nothing to report. */
 975                        drbd_uuid_set_bm(device, 0UL);
 976                        drbd_print_uuids(device, "updated UUIDs");
 977                        if (device->p_uuid) {
 978                                /* Now the two UUID sets are equal, update what we
 979                                 * know of the peer. */
 980                                int i;
 981                                for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++)
 982                                        device->p_uuid[i] = device->ldev->md.uuid[i];
 983                        }
 984                }
 985        }
 986
 987        _drbd_set_state(device, ns, CS_VERBOSE, NULL);
 988out_unlock:
 989        spin_unlock_irq(&device->resource->req_lock);
 990
 991        /* If we have been sync source, and have an effective fencing-policy,
 992         * once *all* volumes are back in sync, call "unfence". */
 993        if (os.conn == C_SYNC_SOURCE) {
 994                enum drbd_disk_state disk_state = D_MASK;
 995                enum drbd_disk_state pdsk_state = D_MASK;
 996                enum drbd_fencing_p fp = FP_DONT_CARE;
 997
 998                rcu_read_lock();
 999                fp = rcu_dereference(device->ldev->disk_conf)->fencing;
1000                if (fp != FP_DONT_CARE) {
1001                        struct drbd_peer_device *peer_device;
1002                        int vnr;
1003                        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
1004                                struct drbd_device *device = peer_device->device;
1005                                disk_state = min_t(enum drbd_disk_state, disk_state, device->state.disk);
1006                                pdsk_state = min_t(enum drbd_disk_state, pdsk_state, device->state.pdsk);
1007                        }
1008                }
1009                rcu_read_unlock();
1010                if (disk_state == D_UP_TO_DATE && pdsk_state == D_UP_TO_DATE)
1011                        conn_khelper(connection, "unfence-peer");
1012        }
1013
1014        put_ldev(device);
1015out:
1016        device->rs_total  = 0;
1017        device->rs_failed = 0;
1018        device->rs_paused = 0;
1019
1020        /* reset start sector, if we reached end of device */
1021        if (verify_done && device->ov_left == 0)
1022                device->ov_start_sector = 0;
1023
1024        drbd_md_sync(device);
1025
1026        if (khelper_cmd)
1027                drbd_khelper(device, khelper_cmd);
1028
1029        return 1;
1030}
1031
1032/* helper */
1033static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req)
1034{
1035        if (drbd_peer_req_has_active_page(peer_req)) {
1036                /* This might happen if sendpage() has not finished */
1037                int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT;
1038                atomic_add(i, &device->pp_in_use_by_net);
1039                atomic_sub(i, &device->pp_in_use);
1040                spin_lock_irq(&device->resource->req_lock);
1041                list_add_tail(&peer_req->w.list, &device->net_ee);
1042                spin_unlock_irq(&device->resource->req_lock);
1043                wake_up(&drbd_pp_wait);
1044        } else
1045                drbd_free_peer_req(device, peer_req);
1046}
1047
1048/**
1049 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST
1050 * @w:          work object.
1051 * @cancel:     The connection will be closed anyways
1052 */
1053int w_e_end_data_req(struct drbd_work *w, int cancel)
1054{
1055        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1056        struct drbd_peer_device *peer_device = peer_req->peer_device;
1057        struct drbd_device *device = peer_device->device;
1058        int err;
1059
1060        if (unlikely(cancel)) {
1061                drbd_free_peer_req(device, peer_req);
1062                dec_unacked(device);
1063                return 0;
1064        }
1065
1066        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1067                err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req);
1068        } else {
1069                if (__ratelimit(&drbd_ratelimit_state))
1070                        drbd_err(device, "Sending NegDReply. sector=%llus.\n",
1071                            (unsigned long long)peer_req->i.sector);
1072
1073                err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req);
1074        }
1075
1076        dec_unacked(device);
1077
1078        move_to_net_ee_or_free(device, peer_req);
1079
1080        if (unlikely(err))
1081                drbd_err(device, "drbd_send_block() failed\n");
1082        return err;
1083}
1084
1085static bool all_zero(struct drbd_peer_request *peer_req)
1086{
1087        struct page *page = peer_req->pages;
1088        unsigned int len = peer_req->i.size;
1089
1090        page_chain_for_each(page) {
1091                unsigned int l = min_t(unsigned int, len, PAGE_SIZE);
1092                unsigned int i, words = l / sizeof(long);
1093                unsigned long *d;
1094
1095                d = kmap_atomic(page);
1096                for (i = 0; i < words; i++) {
1097                        if (d[i]) {
1098                                kunmap_atomic(d);
1099                                return false;
1100                        }
1101                }
1102                kunmap_atomic(d);
1103                len -= l;
1104        }
1105
1106        return true;
1107}
1108
1109/**
1110 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST
1111 * @w:          work object.
1112 * @cancel:     The connection will be closed anyways
1113 */
1114int w_e_end_rsdata_req(struct drbd_work *w, int cancel)
1115{
1116        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1117        struct drbd_peer_device *peer_device = peer_req->peer_device;
1118        struct drbd_device *device = peer_device->device;
1119        int err;
1120
1121        if (unlikely(cancel)) {
1122                drbd_free_peer_req(device, peer_req);
1123                dec_unacked(device);
1124                return 0;
1125        }
1126
1127        if (get_ldev_if_state(device, D_FAILED)) {
1128                drbd_rs_complete_io(device, peer_req->i.sector);
1129                put_ldev(device);
1130        }
1131
1132        if (device->state.conn == C_AHEAD) {
1133                err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req);
1134        } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1135                if (likely(device->state.pdsk >= D_INCONSISTENT)) {
1136                        inc_rs_pending(device);
1137                        if (peer_req->flags & EE_RS_THIN_REQ && all_zero(peer_req))
1138                                err = drbd_send_rs_deallocated(peer_device, peer_req);
1139                        else
1140                                err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1141                } else {
1142                        if (__ratelimit(&drbd_ratelimit_state))
1143                                drbd_err(device, "Not sending RSDataReply, "
1144                                    "partner DISKLESS!\n");
1145                        err = 0;
1146                }
1147        } else {
1148                if (__ratelimit(&drbd_ratelimit_state))
1149                        drbd_err(device, "Sending NegRSDReply. sector %llus.\n",
1150                            (unsigned long long)peer_req->i.sector);
1151
1152                err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1153
1154                /* update resync data with failure */
1155                drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size);
1156        }
1157
1158        dec_unacked(device);
1159
1160        move_to_net_ee_or_free(device, peer_req);
1161
1162        if (unlikely(err))
1163                drbd_err(device, "drbd_send_block() failed\n");
1164        return err;
1165}
1166
1167int w_e_end_csum_rs_req(struct drbd_work *w, int cancel)
1168{
1169        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1170        struct drbd_peer_device *peer_device = peer_req->peer_device;
1171        struct drbd_device *device = peer_device->device;
1172        struct digest_info *di;
1173        int digest_size;
1174        void *digest = NULL;
1175        int err, eq = 0;
1176
1177        if (unlikely(cancel)) {
1178                drbd_free_peer_req(device, peer_req);
1179                dec_unacked(device);
1180                return 0;
1181        }
1182
1183        if (get_ldev(device)) {
1184                drbd_rs_complete_io(device, peer_req->i.sector);
1185                put_ldev(device);
1186        }
1187
1188        di = peer_req->digest;
1189
1190        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1191                /* quick hack to try to avoid a race against reconfiguration.
1192                 * a real fix would be much more involved,
1193                 * introducing more locking mechanisms */
1194                if (peer_device->connection->csums_tfm) {
1195                        digest_size = crypto_ahash_digestsize(peer_device->connection->csums_tfm);
1196                        D_ASSERT(device, digest_size == di->digest_size);
1197                        digest = kmalloc(digest_size, GFP_NOIO);
1198                }
1199                if (digest) {
1200                        drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest);
1201                        eq = !memcmp(digest, di->digest, digest_size);
1202                        kfree(digest);
1203                }
1204
1205                if (eq) {
1206                        drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size);
1207                        /* rs_same_csums unit is BM_BLOCK_SIZE */
1208                        device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT;
1209                        err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req);
1210                } else {
1211                        inc_rs_pending(device);
1212                        peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */
1213                        peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */
1214                        kfree(di);
1215                        err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req);
1216                }
1217        } else {
1218                err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req);
1219                if (__ratelimit(&drbd_ratelimit_state))
1220                        drbd_err(device, "Sending NegDReply. I guess it gets messy.\n");
1221        }
1222
1223        dec_unacked(device);
1224        move_to_net_ee_or_free(device, peer_req);
1225
1226        if (unlikely(err))
1227                drbd_err(device, "drbd_send_block/ack() failed\n");
1228        return err;
1229}
1230
1231int w_e_end_ov_req(struct drbd_work *w, int cancel)
1232{
1233        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1234        struct drbd_peer_device *peer_device = peer_req->peer_device;
1235        struct drbd_device *device = peer_device->device;
1236        sector_t sector = peer_req->i.sector;
1237        unsigned int size = peer_req->i.size;
1238        int digest_size;
1239        void *digest;
1240        int err = 0;
1241
1242        if (unlikely(cancel))
1243                goto out;
1244
1245        digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1246        digest = kmalloc(digest_size, GFP_NOIO);
1247        if (!digest) {
1248                err = 1;        /* terminate the connection in case the allocation failed */
1249                goto out;
1250        }
1251
1252        if (likely(!(peer_req->flags & EE_WAS_ERROR)))
1253                drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1254        else
1255                memset(digest, 0, digest_size);
1256
1257        /* Free e and pages before send.
1258         * In case we block on congestion, we could otherwise run into
1259         * some distributed deadlock, if the other side blocks on
1260         * congestion as well, because our receiver blocks in
1261         * drbd_alloc_pages due to pp_in_use > max_buffers. */
1262        drbd_free_peer_req(device, peer_req);
1263        peer_req = NULL;
1264        inc_rs_pending(device);
1265        err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY);
1266        if (err)
1267                dec_rs_pending(device);
1268        kfree(digest);
1269
1270out:
1271        if (peer_req)
1272                drbd_free_peer_req(device, peer_req);
1273        dec_unacked(device);
1274        return err;
1275}
1276
1277void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size)
1278{
1279        if (device->ov_last_oos_start + device->ov_last_oos_size == sector) {
1280                device->ov_last_oos_size += size>>9;
1281        } else {
1282                device->ov_last_oos_start = sector;
1283                device->ov_last_oos_size = size>>9;
1284        }
1285        drbd_set_out_of_sync(device, sector, size);
1286}
1287
1288int w_e_end_ov_reply(struct drbd_work *w, int cancel)
1289{
1290        struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w);
1291        struct drbd_peer_device *peer_device = peer_req->peer_device;
1292        struct drbd_device *device = peer_device->device;
1293        struct digest_info *di;
1294        void *digest;
1295        sector_t sector = peer_req->i.sector;
1296        unsigned int size = peer_req->i.size;
1297        int digest_size;
1298        int err, eq = 0;
1299        bool stop_sector_reached = false;
1300
1301        if (unlikely(cancel)) {
1302                drbd_free_peer_req(device, peer_req);
1303                dec_unacked(device);
1304                return 0;
1305        }
1306
1307        /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all
1308         * the resync lru has been cleaned up already */
1309        if (get_ldev(device)) {
1310                drbd_rs_complete_io(device, peer_req->i.sector);
1311                put_ldev(device);
1312        }
1313
1314        di = peer_req->digest;
1315
1316        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1317                digest_size = crypto_ahash_digestsize(peer_device->connection->verify_tfm);
1318                digest = kmalloc(digest_size, GFP_NOIO);
1319                if (digest) {
1320                        drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest);
1321
1322                        D_ASSERT(device, digest_size == di->digest_size);
1323                        eq = !memcmp(digest, di->digest, digest_size);
1324                        kfree(digest);
1325                }
1326        }
1327
1328        /* Free peer_req and pages before send.
1329         * In case we block on congestion, we could otherwise run into
1330         * some distributed deadlock, if the other side blocks on
1331         * congestion as well, because our receiver blocks in
1332         * drbd_alloc_pages due to pp_in_use > max_buffers. */
1333        drbd_free_peer_req(device, peer_req);
1334        if (!eq)
1335                drbd_ov_out_of_sync_found(device, sector, size);
1336        else
1337                ov_out_of_sync_print(device);
1338
1339        err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size,
1340                               eq ? ID_IN_SYNC : ID_OUT_OF_SYNC);
1341
1342        dec_unacked(device);
1343
1344        --device->ov_left;
1345
1346        /* let's advance progress step marks only for every other megabyte */
1347        if ((device->ov_left & 0x200) == 0x200)
1348                drbd_advance_rs_marks(device, device->ov_left);
1349
1350        stop_sector_reached = verify_can_do_stop_sector(device) &&
1351                (sector + (size>>9)) >= device->ov_stop_sector;
1352
1353        if (device->ov_left == 0 || stop_sector_reached) {
1354                ov_out_of_sync_print(device);
1355                drbd_resync_finished(device);
1356        }
1357
1358        return err;
1359}
1360
1361/* FIXME
1362 * We need to track the number of pending barrier acks,
1363 * and to be able to wait for them.
1364 * See also comment in drbd_adm_attach before drbd_suspend_io.
1365 */
1366static int drbd_send_barrier(struct drbd_connection *connection)
1367{
1368        struct p_barrier *p;
1369        struct drbd_socket *sock;
1370
1371        sock = &connection->data;
1372        p = conn_prepare_command(connection, sock);
1373        if (!p)
1374                return -EIO;
1375        p->barrier = connection->send.current_epoch_nr;
1376        p->pad = 0;
1377        connection->send.current_epoch_writes = 0;
1378        connection->send.last_sent_barrier_jif = jiffies;
1379
1380        return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0);
1381}
1382
1383int w_send_write_hint(struct drbd_work *w, int cancel)
1384{
1385        struct drbd_device *device =
1386                container_of(w, struct drbd_device, unplug_work);
1387        struct drbd_socket *sock;
1388
1389        if (cancel)
1390                return 0;
1391        sock = &first_peer_device(device)->connection->data;
1392        if (!drbd_prepare_command(first_peer_device(device), sock))
1393                return -EIO;
1394        return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0);
1395}
1396
1397static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch)
1398{
1399        if (!connection->send.seen_any_write_yet) {
1400                connection->send.seen_any_write_yet = true;
1401                connection->send.current_epoch_nr = epoch;
1402                connection->send.current_epoch_writes = 0;
1403                connection->send.last_sent_barrier_jif = jiffies;
1404        }
1405}
1406
1407static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch)
1408{
1409        /* re-init if first write on this connection */
1410        if (!connection->send.seen_any_write_yet)
1411                return;
1412        if (connection->send.current_epoch_nr != epoch) {
1413                if (connection->send.current_epoch_writes)
1414                        drbd_send_barrier(connection);
1415                connection->send.current_epoch_nr = epoch;
1416        }
1417}
1418
1419int w_send_out_of_sync(struct drbd_work *w, int cancel)
1420{
1421        struct drbd_request *req = container_of(w, struct drbd_request, w);
1422        struct drbd_device *device = req->device;
1423        struct drbd_peer_device *const peer_device = first_peer_device(device);
1424        struct drbd_connection *const connection = peer_device->connection;
1425        int err;
1426
1427        if (unlikely(cancel)) {
1428                req_mod(req, SEND_CANCELED);
1429                return 0;
1430        }
1431        req->pre_send_jif = jiffies;
1432
1433        /* this time, no connection->send.current_epoch_writes++;
1434         * If it was sent, it was the closing barrier for the last
1435         * replicated epoch, before we went into AHEAD mode.
1436         * No more barriers will be sent, until we leave AHEAD mode again. */
1437        maybe_send_barrier(connection, req->epoch);
1438
1439        err = drbd_send_out_of_sync(peer_device, req);
1440        req_mod(req, OOS_HANDED_TO_NETWORK);
1441
1442        return err;
1443}
1444
1445/**
1446 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request
1447 * @w:          work object.
1448 * @cancel:     The connection will be closed anyways
1449 */
1450int w_send_dblock(struct drbd_work *w, int cancel)
1451{
1452        struct drbd_request *req = container_of(w, struct drbd_request, w);
1453        struct drbd_device *device = req->device;
1454        struct drbd_peer_device *const peer_device = first_peer_device(device);
1455        struct drbd_connection *connection = peer_device->connection;
1456        int err;
1457
1458        if (unlikely(cancel)) {
1459                req_mod(req, SEND_CANCELED);
1460                return 0;
1461        }
1462        req->pre_send_jif = jiffies;
1463
1464        re_init_if_first_write(connection, req->epoch);
1465        maybe_send_barrier(connection, req->epoch);
1466        connection->send.current_epoch_writes++;
1467
1468        err = drbd_send_dblock(peer_device, req);
1469        req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1470
1471        return err;
1472}
1473
1474/**
1475 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet
1476 * @w:          work object.
1477 * @cancel:     The connection will be closed anyways
1478 */
1479int w_send_read_req(struct drbd_work *w, int cancel)
1480{
1481        struct drbd_request *req = container_of(w, struct drbd_request, w);
1482        struct drbd_device *device = req->device;
1483        struct drbd_peer_device *const peer_device = first_peer_device(device);
1484        struct drbd_connection *connection = peer_device->connection;
1485        int err;
1486
1487        if (unlikely(cancel)) {
1488                req_mod(req, SEND_CANCELED);
1489                return 0;
1490        }
1491        req->pre_send_jif = jiffies;
1492
1493        /* Even read requests may close a write epoch,
1494         * if there was any yet. */
1495        maybe_send_barrier(connection, req->epoch);
1496
1497        err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size,
1498                                 (unsigned long)req);
1499
1500        req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK);
1501
1502        return err;
1503}
1504
1505int w_restart_disk_io(struct drbd_work *w, int cancel)
1506{
1507        struct drbd_request *req = container_of(w, struct drbd_request, w);
1508        struct drbd_device *device = req->device;
1509
1510        if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG)
1511                drbd_al_begin_io(device, &req->i);
1512
1513        drbd_req_make_private_bio(req, req->master_bio);
1514        req->private_bio->bi_bdev = device->ldev->backing_bdev;
1515        generic_make_request(req->private_bio);
1516
1517        return 0;
1518}
1519
1520static int _drbd_may_sync_now(struct drbd_device *device)
1521{
1522        struct drbd_device *odev = device;
1523        int resync_after;
1524
1525        while (1) {
1526                if (!odev->ldev || odev->state.disk == D_DISKLESS)
1527                        return 1;
1528                rcu_read_lock();
1529                resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1530                rcu_read_unlock();
1531                if (resync_after == -1)
1532                        return 1;
1533                odev = minor_to_device(resync_after);
1534                if (!odev)
1535                        return 1;
1536                if ((odev->state.conn >= C_SYNC_SOURCE &&
1537                     odev->state.conn <= C_PAUSED_SYNC_T) ||
1538                    odev->state.aftr_isp || odev->state.peer_isp ||
1539                    odev->state.user_isp)
1540                        return 0;
1541        }
1542}
1543
1544/**
1545 * drbd_pause_after() - Pause resync on all devices that may not resync now
1546 * @device:     DRBD device.
1547 *
1548 * Called from process context only (admin command and after_state_ch).
1549 */
1550static bool drbd_pause_after(struct drbd_device *device)
1551{
1552        bool changed = false;
1553        struct drbd_device *odev;
1554        int i;
1555
1556        rcu_read_lock();
1557        idr_for_each_entry(&drbd_devices, odev, i) {
1558                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1559                        continue;
1560                if (!_drbd_may_sync_now(odev) &&
1561                    _drbd_set_state(_NS(odev, aftr_isp, 1),
1562                                    CS_HARD, NULL) != SS_NOTHING_TO_DO)
1563                        changed = true;
1564        }
1565        rcu_read_unlock();
1566
1567        return changed;
1568}
1569
1570/**
1571 * drbd_resume_next() - Resume resync on all devices that may resync now
1572 * @device:     DRBD device.
1573 *
1574 * Called from process context only (admin command and worker).
1575 */
1576static bool drbd_resume_next(struct drbd_device *device)
1577{
1578        bool changed = false;
1579        struct drbd_device *odev;
1580        int i;
1581
1582        rcu_read_lock();
1583        idr_for_each_entry(&drbd_devices, odev, i) {
1584                if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS)
1585                        continue;
1586                if (odev->state.aftr_isp) {
1587                        if (_drbd_may_sync_now(odev) &&
1588                            _drbd_set_state(_NS(odev, aftr_isp, 0),
1589                                            CS_HARD, NULL) != SS_NOTHING_TO_DO)
1590                                changed = true;
1591                }
1592        }
1593        rcu_read_unlock();
1594        return changed;
1595}
1596
1597void resume_next_sg(struct drbd_device *device)
1598{
1599        lock_all_resources();
1600        drbd_resume_next(device);
1601        unlock_all_resources();
1602}
1603
1604void suspend_other_sg(struct drbd_device *device)
1605{
1606        lock_all_resources();
1607        drbd_pause_after(device);
1608        unlock_all_resources();
1609}
1610
1611/* caller must lock_all_resources() */
1612enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor)
1613{
1614        struct drbd_device *odev;
1615        int resync_after;
1616
1617        if (o_minor == -1)
1618                return NO_ERROR;
1619        if (o_minor < -1 || o_minor > MINORMASK)
1620                return ERR_RESYNC_AFTER;
1621
1622        /* check for loops */
1623        odev = minor_to_device(o_minor);
1624        while (1) {
1625                if (odev == device)
1626                        return ERR_RESYNC_AFTER_CYCLE;
1627
1628                /* You are free to depend on diskless, non-existing,
1629                 * or not yet/no longer existing minors.
1630                 * We only reject dependency loops.
1631                 * We cannot follow the dependency chain beyond a detached or
1632                 * missing minor.
1633                 */
1634                if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS)
1635                        return NO_ERROR;
1636
1637                rcu_read_lock();
1638                resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after;
1639                rcu_read_unlock();
1640                /* dependency chain ends here, no cycles. */
1641                if (resync_after == -1)
1642                        return NO_ERROR;
1643
1644                /* follow the dependency chain */
1645                odev = minor_to_device(resync_after);
1646        }
1647}
1648
1649/* caller must lock_all_resources() */
1650void drbd_resync_after_changed(struct drbd_device *device)
1651{
1652        int changed;
1653
1654        do {
1655                changed  = drbd_pause_after(device);
1656                changed |= drbd_resume_next(device);
1657        } while (changed);
1658}
1659
1660void drbd_rs_controller_reset(struct drbd_device *device)
1661{
1662        struct gendisk *disk = device->ldev->backing_bdev->bd_contains->bd_disk;
1663        struct fifo_buffer *plan;
1664
1665        atomic_set(&device->rs_sect_in, 0);
1666        atomic_set(&device->rs_sect_ev, 0);
1667        device->rs_in_flight = 0;
1668        device->rs_last_events =
1669                (int)part_stat_read(&disk->part0, sectors[0]) +
1670                (int)part_stat_read(&disk->part0, sectors[1]);
1671
1672        /* Updating the RCU protected object in place is necessary since
1673           this function gets called from atomic context.
1674           It is valid since all other updates also lead to an completely
1675           empty fifo */
1676        rcu_read_lock();
1677        plan = rcu_dereference(device->rs_plan_s);
1678        plan->total = 0;
1679        fifo_set(plan, 0);
1680        rcu_read_unlock();
1681}
1682
1683void start_resync_timer_fn(unsigned long data)
1684{
1685        struct drbd_device *device = (struct drbd_device *) data;
1686        drbd_device_post_work(device, RS_START);
1687}
1688
1689static void do_start_resync(struct drbd_device *device)
1690{
1691        if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) {
1692                drbd_warn(device, "postponing start_resync ...\n");
1693                device->start_resync_timer.expires = jiffies + HZ/10;
1694                add_timer(&device->start_resync_timer);
1695                return;
1696        }
1697
1698        drbd_start_resync(device, C_SYNC_SOURCE);
1699        clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags);
1700}
1701
1702static bool use_checksum_based_resync(struct drbd_connection *connection, struct drbd_device *device)
1703{
1704        bool csums_after_crash_only;
1705        rcu_read_lock();
1706        csums_after_crash_only = rcu_dereference(connection->net_conf)->csums_after_crash_only;
1707        rcu_read_unlock();
1708        return connection->agreed_pro_version >= 89 &&          /* supported? */
1709                connection->csums_tfm &&                        /* configured? */
1710                (csums_after_crash_only == false                /* use for each resync? */
1711                 || test_bit(CRASHED_PRIMARY, &device->flags)); /* or only after Primary crash? */
1712}
1713
1714/**
1715 * drbd_start_resync() - Start the resync process
1716 * @device:     DRBD device.
1717 * @side:       Either C_SYNC_SOURCE or C_SYNC_TARGET
1718 *
1719 * This function might bring you directly into one of the
1720 * C_PAUSED_SYNC_* states.
1721 */
1722void drbd_start_resync(struct drbd_device *device, enum drbd_conns side)
1723{
1724        struct drbd_peer_device *peer_device = first_peer_device(device);
1725        struct drbd_connection *connection = peer_device ? peer_device->connection : NULL;
1726        union drbd_state ns;
1727        int r;
1728
1729        if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) {
1730                drbd_err(device, "Resync already running!\n");
1731                return;
1732        }
1733
1734        if (!test_bit(B_RS_H_DONE, &device->flags)) {
1735                if (side == C_SYNC_TARGET) {
1736                        /* Since application IO was locked out during C_WF_BITMAP_T and
1737                           C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET
1738                           we check that we might make the data inconsistent. */
1739                        r = drbd_khelper(device, "before-resync-target");
1740                        r = (r >> 8) & 0xff;
1741                        if (r > 0) {
1742                                drbd_info(device, "before-resync-target handler returned %d, "
1743                                         "dropping connection.\n", r);
1744                                conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD);
1745                                return;
1746                        }
1747                } else /* C_SYNC_SOURCE */ {
1748                        r = drbd_khelper(device, "before-resync-source");
1749                        r = (r >> 8) & 0xff;
1750                        if (r > 0) {
1751                                if (r == 3) {
1752                                        drbd_info(device, "before-resync-source handler returned %d, "
1753                                                 "ignoring. Old userland tools?", r);
1754                                } else {
1755                                        drbd_info(device, "before-resync-source handler returned %d, "
1756                                                 "dropping connection.\n", r);
1757                                        conn_request_state(connection,
1758                                                           NS(conn, C_DISCONNECTING), CS_HARD);
1759                                        return;
1760                                }
1761                        }
1762                }
1763        }
1764
1765        if (current == connection->worker.task) {
1766                /* The worker should not sleep waiting for state_mutex,
1767                   that can take long */
1768                if (!mutex_trylock(device->state_mutex)) {
1769                        set_bit(B_RS_H_DONE, &device->flags);
1770                        device->start_resync_timer.expires = jiffies + HZ/5;
1771                        add_timer(&device->start_resync_timer);
1772                        return;
1773                }
1774        } else {
1775                mutex_lock(device->state_mutex);
1776        }
1777
1778        lock_all_resources();
1779        clear_bit(B_RS_H_DONE, &device->flags);
1780        /* Did some connection breakage or IO error race with us? */
1781        if (device->state.conn < C_CONNECTED
1782        || !get_ldev_if_state(device, D_NEGOTIATING)) {
1783                unlock_all_resources();
1784                goto out;
1785        }
1786
1787        ns = drbd_read_state(device);
1788
1789        ns.aftr_isp = !_drbd_may_sync_now(device);
1790
1791        ns.conn = side;
1792
1793        if (side == C_SYNC_TARGET)
1794                ns.disk = D_INCONSISTENT;
1795        else /* side == C_SYNC_SOURCE */
1796                ns.pdsk = D_INCONSISTENT;
1797
1798        r = _drbd_set_state(device, ns, CS_VERBOSE, NULL);
1799        ns = drbd_read_state(device);
1800
1801        if (ns.conn < C_CONNECTED)
1802                r = SS_UNKNOWN_ERROR;
1803
1804        if (r == SS_SUCCESS) {
1805                unsigned long tw = drbd_bm_total_weight(device);
1806                unsigned long now = jiffies;
1807                int i;
1808
1809                device->rs_failed    = 0;
1810                device->rs_paused    = 0;
1811                device->rs_same_csum = 0;
1812                device->rs_last_sect_ev = 0;
1813                device->rs_total     = tw;
1814                device->rs_start     = now;
1815                for (i = 0; i < DRBD_SYNC_MARKS; i++) {
1816                        device->rs_mark_left[i] = tw;
1817                        device->rs_mark_time[i] = now;
1818                }
1819                drbd_pause_after(device);
1820                /* Forget potentially stale cached per resync extent bit-counts.
1821                 * Open coded drbd_rs_cancel_all(device), we already have IRQs
1822                 * disabled, and know the disk state is ok. */
1823                spin_lock(&device->al_lock);
1824                lc_reset(device->resync);
1825                device->resync_locked = 0;
1826                device->resync_wenr = LC_FREE;
1827                spin_unlock(&device->al_lock);
1828        }
1829        unlock_all_resources();
1830
1831        if (r == SS_SUCCESS) {
1832                wake_up(&device->al_wait); /* for lc_reset() above */
1833                /* reset rs_last_bcast when a resync or verify is started,
1834                 * to deal with potential jiffies wrap. */
1835                device->rs_last_bcast = jiffies - HZ;
1836
1837                drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n",
1838                     drbd_conn_str(ns.conn),
1839                     (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10),
1840                     (unsigned long) device->rs_total);
1841                if (side == C_SYNC_TARGET) {
1842                        device->bm_resync_fo = 0;
1843                        device->use_csums = use_checksum_based_resync(connection, device);
1844                } else {
1845                        device->use_csums = false;
1846                }
1847
1848                /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid
1849                 * with w_send_oos, or the sync target will get confused as to
1850                 * how much bits to resync.  We cannot do that always, because for an
1851                 * empty resync and protocol < 95, we need to do it here, as we call
1852                 * drbd_resync_finished from here in that case.
1853                 * We drbd_gen_and_send_sync_uuid here for protocol < 96,
1854                 * and from after_state_ch otherwise. */
1855                if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96)
1856                        drbd_gen_and_send_sync_uuid(peer_device);
1857
1858                if (connection->agreed_pro_version < 95 && device->rs_total == 0) {
1859                        /* This still has a race (about when exactly the peers
1860                         * detect connection loss) that can lead to a full sync
1861                         * on next handshake. In 8.3.9 we fixed this with explicit
1862                         * resync-finished notifications, but the fix
1863                         * introduces a protocol change.  Sleeping for some
1864                         * time longer than the ping interval + timeout on the
1865                         * SyncSource, to give the SyncTarget the chance to
1866                         * detect connection loss, then waiting for a ping
1867                         * response (implicit in drbd_resync_finished) reduces
1868                         * the race considerably, but does not solve it. */
1869                        if (side == C_SYNC_SOURCE) {
1870                                struct net_conf *nc;
1871                                int timeo;
1872
1873                                rcu_read_lock();
1874                                nc = rcu_dereference(connection->net_conf);
1875                                timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9;
1876                                rcu_read_unlock();
1877                                schedule_timeout_interruptible(timeo);
1878                        }
1879                        drbd_resync_finished(device);
1880                }
1881
1882                drbd_rs_controller_reset(device);
1883                /* ns.conn may already be != device->state.conn,
1884                 * we may have been paused in between, or become paused until
1885                 * the timer triggers.
1886                 * No matter, that is handled in resync_timer_fn() */
1887                if (ns.conn == C_SYNC_TARGET)
1888                        mod_timer(&device->resync_timer, jiffies);
1889
1890                drbd_md_sync(device);
1891        }
1892        put_ldev(device);
1893out:
1894        mutex_unlock(device->state_mutex);
1895}
1896
1897static void update_on_disk_bitmap(struct drbd_device *device, bool resync_done)
1898{
1899        struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, };
1900        device->rs_last_bcast = jiffies;
1901
1902        if (!get_ldev(device))
1903                return;
1904
1905        drbd_bm_write_lazy(device, 0);
1906        if (resync_done && is_sync_state(device->state.conn))
1907                drbd_resync_finished(device);
1908
1909        drbd_bcast_event(device, &sib);
1910        /* update timestamp, in case it took a while to write out stuff */
1911        device->rs_last_bcast = jiffies;
1912        put_ldev(device);
1913}
1914
1915static void drbd_ldev_destroy(struct drbd_device *device)
1916{
1917        lc_destroy(device->resync);
1918        device->resync = NULL;
1919        lc_destroy(device->act_log);
1920        device->act_log = NULL;
1921
1922        __acquire(local);
1923        drbd_backing_dev_free(device, device->ldev);
1924        device->ldev = NULL;
1925        __release(local);
1926
1927        clear_bit(GOING_DISKLESS, &device->flags);
1928        wake_up(&device->misc_wait);
1929}
1930
1931static void go_diskless(struct drbd_device *device)
1932{
1933        D_ASSERT(device, device->state.disk == D_FAILED);
1934        /* we cannot assert local_cnt == 0 here, as get_ldev_if_state will
1935         * inc/dec it frequently. Once we are D_DISKLESS, no one will touch
1936         * the protected members anymore, though, so once put_ldev reaches zero
1937         * again, it will be safe to free them. */
1938
1939        /* Try to write changed bitmap pages, read errors may have just
1940         * set some bits outside the area covered by the activity log.
1941         *
1942         * If we have an IO error during the bitmap writeout,
1943         * we will want a full sync next time, just in case.
1944         * (Do we want a specific meta data flag for this?)
1945         *
1946         * If that does not make it to stable storage either,
1947         * we cannot do anything about that anymore.
1948         *
1949         * We still need to check if both bitmap and ldev are present, we may
1950         * end up here after a failed attach, before ldev was even assigned.
1951         */
1952        if (device->bitmap && device->ldev) {
1953                /* An interrupted resync or similar is allowed to recounts bits
1954                 * while we detach.
1955                 * Any modifications would not be expected anymore, though.
1956                 */
1957                if (drbd_bitmap_io_from_worker(device, drbd_bm_write,
1958                                        "detach", BM_LOCKED_TEST_ALLOWED)) {
1959                        if (test_bit(WAS_READ_ERROR, &device->flags)) {
1960                                drbd_md_set_flag(device, MDF_FULL_SYNC);
1961                                drbd_md_sync(device);
1962                        }
1963                }
1964        }
1965
1966        drbd_force_state(device, NS(disk, D_DISKLESS));
1967}
1968
1969static int do_md_sync(struct drbd_device *device)
1970{
1971        drbd_warn(device, "md_sync_timer expired! Worker calls drbd_md_sync().\n");
1972        drbd_md_sync(device);
1973        return 0;
1974}
1975
1976/* only called from drbd_worker thread, no locking */
1977void __update_timing_details(
1978                struct drbd_thread_timing_details *tdp,
1979                unsigned int *cb_nr,
1980                void *cb,
1981                const char *fn, const unsigned int line)
1982{
1983        unsigned int i = *cb_nr % DRBD_THREAD_DETAILS_HIST;
1984        struct drbd_thread_timing_details *td = tdp + i;
1985
1986        td->start_jif = jiffies;
1987        td->cb_addr = cb;
1988        td->caller_fn = fn;
1989        td->line = line;
1990        td->cb_nr = *cb_nr;
1991
1992        i = (i+1) % DRBD_THREAD_DETAILS_HIST;
1993        td = tdp + i;
1994        memset(td, 0, sizeof(*td));
1995
1996        ++(*cb_nr);
1997}
1998
1999static void do_device_work(struct drbd_device *device, const unsigned long todo)
2000{
2001        if (test_bit(MD_SYNC, &todo))
2002                do_md_sync(device);
2003        if (test_bit(RS_DONE, &todo) ||
2004            test_bit(RS_PROGRESS, &todo))
2005                update_on_disk_bitmap(device, test_bit(RS_DONE, &todo));
2006        if (test_bit(GO_DISKLESS, &todo))
2007                go_diskless(device);
2008        if (test_bit(DESTROY_DISK, &todo))
2009                drbd_ldev_destroy(device);
2010        if (test_bit(RS_START, &todo))
2011                do_start_resync(device);
2012}
2013
2014#define DRBD_DEVICE_WORK_MASK   \
2015        ((1UL << GO_DISKLESS)   \
2016        |(1UL << DESTROY_DISK)  \
2017        |(1UL << MD_SYNC)       \
2018        |(1UL << RS_START)      \
2019        |(1UL << RS_PROGRESS)   \
2020        |(1UL << RS_DONE)       \
2021        )
2022
2023static unsigned long get_work_bits(unsigned long *flags)
2024{
2025        unsigned long old, new;
2026        do {
2027                old = *flags;
2028                new = old & ~DRBD_DEVICE_WORK_MASK;
2029        } while (cmpxchg(flags, old, new) != old);
2030        return old & DRBD_DEVICE_WORK_MASK;
2031}
2032
2033static void do_unqueued_work(struct drbd_connection *connection)
2034{
2035        struct drbd_peer_device *peer_device;
2036        int vnr;
2037
2038        rcu_read_lock();
2039        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2040                struct drbd_device *device = peer_device->device;
2041                unsigned long todo = get_work_bits(&device->flags);
2042                if (!todo)
2043                        continue;
2044
2045                kref_get(&device->kref);
2046                rcu_read_unlock();
2047                do_device_work(device, todo);
2048                kref_put(&device->kref, drbd_destroy_device);
2049                rcu_read_lock();
2050        }
2051        rcu_read_unlock();
2052}
2053
2054static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list)
2055{
2056        spin_lock_irq(&queue->q_lock);
2057        list_splice_tail_init(&queue->q, work_list);
2058        spin_unlock_irq(&queue->q_lock);
2059        return !list_empty(work_list);
2060}
2061
2062static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list)
2063{
2064        DEFINE_WAIT(wait);
2065        struct net_conf *nc;
2066        int uncork, cork;
2067
2068        dequeue_work_batch(&connection->sender_work, work_list);
2069        if (!list_empty(work_list))
2070                return;
2071
2072        /* Still nothing to do?
2073         * Maybe we still need to close the current epoch,
2074         * even if no new requests are queued yet.
2075         *
2076         * Also, poke TCP, just in case.
2077         * Then wait for new work (or signal). */
2078        rcu_read_lock();
2079        nc = rcu_dereference(connection->net_conf);
2080        uncork = nc ? nc->tcp_cork : 0;
2081        rcu_read_unlock();
2082        if (uncork) {
2083                mutex_lock(&connection->data.mutex);
2084                if (connection->data.socket)
2085                        drbd_tcp_uncork(connection->data.socket);
2086                mutex_unlock(&connection->data.mutex);
2087        }
2088
2089        for (;;) {
2090                int send_barrier;
2091                prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE);
2092                spin_lock_irq(&connection->resource->req_lock);
2093                spin_lock(&connection->sender_work.q_lock);     /* FIXME get rid of this one? */
2094                if (!list_empty(&connection->sender_work.q))
2095                        list_splice_tail_init(&connection->sender_work.q, work_list);
2096                spin_unlock(&connection->sender_work.q_lock);   /* FIXME get rid of this one? */
2097                if (!list_empty(work_list) || signal_pending(current)) {
2098                        spin_unlock_irq(&connection->resource->req_lock);
2099                        break;
2100                }
2101
2102                /* We found nothing new to do, no to-be-communicated request,
2103                 * no other work item.  We may still need to close the last
2104                 * epoch.  Next incoming request epoch will be connection ->
2105                 * current transfer log epoch number.  If that is different
2106                 * from the epoch of the last request we communicated, it is
2107                 * safe to send the epoch separating barrier now.
2108                 */
2109                send_barrier =
2110                        atomic_read(&connection->current_tle_nr) !=
2111                        connection->send.current_epoch_nr;
2112                spin_unlock_irq(&connection->resource->req_lock);
2113
2114                if (send_barrier)
2115                        maybe_send_barrier(connection,
2116                                        connection->send.current_epoch_nr + 1);
2117
2118                if (test_bit(DEVICE_WORK_PENDING, &connection->flags))
2119                        break;
2120
2121                /* drbd_send() may have called flush_signals() */
2122                if (get_t_state(&connection->worker) != RUNNING)
2123                        break;
2124
2125                schedule();
2126                /* may be woken up for other things but new work, too,
2127                 * e.g. if the current epoch got closed.
2128                 * In which case we send the barrier above. */
2129        }
2130        finish_wait(&connection->sender_work.q_wait, &wait);
2131
2132        /* someone may have changed the config while we have been waiting above. */
2133        rcu_read_lock();
2134        nc = rcu_dereference(connection->net_conf);
2135        cork = nc ? nc->tcp_cork : 0;
2136        rcu_read_unlock();
2137        mutex_lock(&connection->data.mutex);
2138        if (connection->data.socket) {
2139                if (cork)
2140                        drbd_tcp_cork(connection->data.socket);
2141                else if (!uncork)
2142                        drbd_tcp_uncork(connection->data.socket);
2143        }
2144        mutex_unlock(&connection->data.mutex);
2145}
2146
2147int drbd_worker(struct drbd_thread *thi)
2148{
2149        struct drbd_connection *connection = thi->connection;
2150        struct drbd_work *w = NULL;
2151        struct drbd_peer_device *peer_device;
2152        LIST_HEAD(work_list);
2153        int vnr;
2154
2155        while (get_t_state(thi) == RUNNING) {
2156                drbd_thread_current_set_cpu(thi);
2157
2158                if (list_empty(&work_list)) {
2159                        update_worker_timing_details(connection, wait_for_work);
2160                        wait_for_work(connection, &work_list);
2161                }
2162
2163                if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2164                        update_worker_timing_details(connection, do_unqueued_work);
2165                        do_unqueued_work(connection);
2166                }
2167
2168                if (signal_pending(current)) {
2169                        flush_signals(current);
2170                        if (get_t_state(thi) == RUNNING) {
2171                                drbd_warn(connection, "Worker got an unexpected signal\n");
2172                                continue;
2173                        }
2174                        break;
2175                }
2176
2177                if (get_t_state(thi) != RUNNING)
2178                        break;
2179
2180                if (!list_empty(&work_list)) {
2181                        w = list_first_entry(&work_list, struct drbd_work, list);
2182                        list_del_init(&w->list);
2183                        update_worker_timing_details(connection, w->cb);
2184                        if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0)
2185                                continue;
2186                        if (connection->cstate >= C_WF_REPORT_PARAMS)
2187                                conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD);
2188                }
2189        }
2190
2191        do {
2192                if (test_and_clear_bit(DEVICE_WORK_PENDING, &connection->flags)) {
2193                        update_worker_timing_details(connection, do_unqueued_work);
2194                        do_unqueued_work(connection);
2195                }
2196                if (!list_empty(&work_list)) {
2197                        w = list_first_entry(&work_list, struct drbd_work, list);
2198                        list_del_init(&w->list);
2199                        update_worker_timing_details(connection, w->cb);
2200                        w->cb(w, 1);
2201                } else
2202                        dequeue_work_batch(&connection->sender_work, &work_list);
2203        } while (!list_empty(&work_list) || test_bit(DEVICE_WORK_PENDING, &connection->flags));
2204
2205        rcu_read_lock();
2206        idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
2207                struct drbd_device *device = peer_device->device;
2208                D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE);
2209                kref_get(&device->kref);
2210                rcu_read_unlock();
2211                drbd_device_cleanup(device);
2212                kref_put(&device->kref, drbd_destroy_device);
2213                rcu_read_lock();
2214        }
2215        rcu_read_unlock();
2216
2217        return 0;
2218}
2219