linux/drivers/block/drbd/drbd_req.c
<<
>>
Prefs
   1/*
   2   drbd_req.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23
  24 */
  25
  26#include <linux/module.h>
  27
  28#include <linux/slab.h>
  29#include <linux/drbd.h>
  30#include "drbd_int.h"
  31#include "drbd_req.h"
  32
  33
  34static bool drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int size);
  35
  36/* Update disk stats at start of I/O request */
  37static void _drbd_start_io_acct(struct drbd_conf *mdev, struct drbd_request *req, struct bio *bio)
  38{
  39        const int rw = bio_data_dir(bio);
  40        int cpu;
  41        cpu = part_stat_lock();
  42        part_round_stats(cpu, &mdev->vdisk->part0);
  43        part_stat_inc(cpu, &mdev->vdisk->part0, ios[rw]);
  44        part_stat_add(cpu, &mdev->vdisk->part0, sectors[rw], bio_sectors(bio));
  45        (void) cpu; /* The macro invocations above want the cpu argument, I do not like
  46                       the compiler warning about cpu only assigned but never used... */
  47        part_inc_in_flight(&mdev->vdisk->part0, rw);
  48        part_stat_unlock();
  49}
  50
  51/* Update disk stats when completing request upwards */
  52static void _drbd_end_io_acct(struct drbd_conf *mdev, struct drbd_request *req)
  53{
  54        int rw = bio_data_dir(req->master_bio);
  55        unsigned long duration = jiffies - req->start_time;
  56        int cpu;
  57        cpu = part_stat_lock();
  58        part_stat_add(cpu, &mdev->vdisk->part0, ticks[rw], duration);
  59        part_round_stats(cpu, &mdev->vdisk->part0);
  60        part_dec_in_flight(&mdev->vdisk->part0, rw);
  61        part_stat_unlock();
  62}
  63
  64static struct drbd_request *drbd_req_new(struct drbd_conf *mdev,
  65                                               struct bio *bio_src)
  66{
  67        struct drbd_request *req;
  68
  69        req = mempool_alloc(drbd_request_mempool, GFP_NOIO);
  70        if (!req)
  71                return NULL;
  72
  73        drbd_req_make_private_bio(req, bio_src);
  74        req->rq_state    = bio_data_dir(bio_src) == WRITE ? RQ_WRITE : 0;
  75        req->w.mdev      = mdev;
  76        req->master_bio  = bio_src;
  77        req->epoch       = 0;
  78
  79        drbd_clear_interval(&req->i);
  80        req->i.sector     = bio_src->bi_sector;
  81        req->i.size      = bio_src->bi_size;
  82        req->i.local = true;
  83        req->i.waiting = false;
  84
  85        INIT_LIST_HEAD(&req->tl_requests);
  86        INIT_LIST_HEAD(&req->w.list);
  87
  88        /* one reference to be put by __drbd_make_request */
  89        atomic_set(&req->completion_ref, 1);
  90        /* one kref as long as completion_ref > 0 */
  91        kref_init(&req->kref);
  92        return req;
  93}
  94
  95void drbd_req_destroy(struct kref *kref)
  96{
  97        struct drbd_request *req = container_of(kref, struct drbd_request, kref);
  98        struct drbd_conf *mdev = req->w.mdev;
  99        const unsigned s = req->rq_state;
 100
 101        if ((req->master_bio && !(s & RQ_POSTPONED)) ||
 102                atomic_read(&req->completion_ref) ||
 103                (s & RQ_LOCAL_PENDING) ||
 104                ((s & RQ_NET_MASK) && !(s & RQ_NET_DONE))) {
 105                dev_err(DEV, "drbd_req_destroy: Logic BUG rq_state = 0x%x, completion_ref = %d\n",
 106                                s, atomic_read(&req->completion_ref));
 107                return;
 108        }
 109
 110        /* remove it from the transfer log.
 111         * well, only if it had been there in the first
 112         * place... if it had not (local only or conflicting
 113         * and never sent), it should still be "empty" as
 114         * initialized in drbd_req_new(), so we can list_del() it
 115         * here unconditionally */
 116        list_del_init(&req->tl_requests);
 117
 118        /* if it was a write, we may have to set the corresponding
 119         * bit(s) out-of-sync first. If it had a local part, we need to
 120         * release the reference to the activity log. */
 121        if (s & RQ_WRITE) {
 122                /* Set out-of-sync unless both OK flags are set
 123                 * (local only or remote failed).
 124                 * Other places where we set out-of-sync:
 125                 * READ with local io-error */
 126
 127                /* There is a special case:
 128                 * we may notice late that IO was suspended,
 129                 * and postpone, or schedule for retry, a write,
 130                 * before it even was submitted or sent.
 131                 * In that case we do not want to touch the bitmap at all.
 132                 */
 133                if ((s & (RQ_POSTPONED|RQ_LOCAL_MASK|RQ_NET_MASK)) != RQ_POSTPONED) {
 134                        if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK))
 135                                drbd_set_out_of_sync(mdev, req->i.sector, req->i.size);
 136
 137                        if ((s & RQ_NET_OK) && (s & RQ_LOCAL_OK) && (s & RQ_NET_SIS))
 138                                drbd_set_in_sync(mdev, req->i.sector, req->i.size);
 139                }
 140
 141                /* one might be tempted to move the drbd_al_complete_io
 142                 * to the local io completion callback drbd_request_endio.
 143                 * but, if this was a mirror write, we may only
 144                 * drbd_al_complete_io after this is RQ_NET_DONE,
 145                 * otherwise the extent could be dropped from the al
 146                 * before it has actually been written on the peer.
 147                 * if we crash before our peer knows about the request,
 148                 * but after the extent has been dropped from the al,
 149                 * we would forget to resync the corresponding extent.
 150                 */
 151                if (s & RQ_IN_ACT_LOG) {
 152                        if (get_ldev_if_state(mdev, D_FAILED)) {
 153                                drbd_al_complete_io(mdev, &req->i);
 154                                put_ldev(mdev);
 155                        } else if (__ratelimit(&drbd_ratelimit_state)) {
 156                                dev_warn(DEV, "Should have called drbd_al_complete_io(, %llu, %u), "
 157                                         "but my Disk seems to have failed :(\n",
 158                                         (unsigned long long) req->i.sector, req->i.size);
 159                        }
 160                }
 161        }
 162
 163        mempool_free(req, drbd_request_mempool);
 164}
 165
 166static void wake_all_senders(struct drbd_tconn *tconn) {
 167        wake_up(&tconn->sender_work.q_wait);
 168}
 169
 170/* must hold resource->req_lock */
 171void start_new_tl_epoch(struct drbd_tconn *tconn)
 172{
 173        /* no point closing an epoch, if it is empty, anyways. */
 174        if (tconn->current_tle_writes == 0)
 175                return;
 176
 177        tconn->current_tle_writes = 0;
 178        atomic_inc(&tconn->current_tle_nr);
 179        wake_all_senders(tconn);
 180}
 181
 182void complete_master_bio(struct drbd_conf *mdev,
 183                struct bio_and_error *m)
 184{
 185        bio_endio(m->bio, m->error);
 186        dec_ap_bio(mdev);
 187}
 188
 189
 190static void drbd_remove_request_interval(struct rb_root *root,
 191                                         struct drbd_request *req)
 192{
 193        struct drbd_conf *mdev = req->w.mdev;
 194        struct drbd_interval *i = &req->i;
 195
 196        drbd_remove_interval(root, i);
 197
 198        /* Wake up any processes waiting for this request to complete.  */
 199        if (i->waiting)
 200                wake_up(&mdev->misc_wait);
 201}
 202
 203/* Helper for __req_mod().
 204 * Set m->bio to the master bio, if it is fit to be completed,
 205 * or leave it alone (it is initialized to NULL in __req_mod),
 206 * if it has already been completed, or cannot be completed yet.
 207 * If m->bio is set, the error status to be returned is placed in m->error.
 208 */
 209static
 210void drbd_req_complete(struct drbd_request *req, struct bio_and_error *m)
 211{
 212        const unsigned s = req->rq_state;
 213        struct drbd_conf *mdev = req->w.mdev;
 214        int rw;
 215        int error, ok;
 216
 217        /* we must not complete the master bio, while it is
 218         *      still being processed by _drbd_send_zc_bio (drbd_send_dblock)
 219         *      not yet acknowledged by the peer
 220         *      not yet completed by the local io subsystem
 221         * these flags may get cleared in any order by
 222         *      the worker,
 223         *      the receiver,
 224         *      the bio_endio completion callbacks.
 225         */
 226        if ((s & RQ_LOCAL_PENDING && !(s & RQ_LOCAL_ABORTED)) ||
 227            (s & RQ_NET_QUEUED) || (s & RQ_NET_PENDING) ||
 228            (s & RQ_COMPLETION_SUSP)) {
 229                dev_err(DEV, "drbd_req_complete: Logic BUG rq_state = 0x%x\n", s);
 230                return;
 231        }
 232
 233        if (!req->master_bio) {
 234                dev_err(DEV, "drbd_req_complete: Logic BUG, master_bio == NULL!\n");
 235                return;
 236        }
 237
 238        rw = bio_rw(req->master_bio);
 239
 240        /*
 241         * figure out whether to report success or failure.
 242         *
 243         * report success when at least one of the operations succeeded.
 244         * or, to put the other way,
 245         * only report failure, when both operations failed.
 246         *
 247         * what to do about the failures is handled elsewhere.
 248         * what we need to do here is just: complete the master_bio.
 249         *
 250         * local completion error, if any, has been stored as ERR_PTR
 251         * in private_bio within drbd_request_endio.
 252         */
 253        ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
 254        error = PTR_ERR(req->private_bio);
 255
 256        /* remove the request from the conflict detection
 257         * respective block_id verification hash */
 258        if (!drbd_interval_empty(&req->i)) {
 259                struct rb_root *root;
 260
 261                if (rw == WRITE)
 262                        root = &mdev->write_requests;
 263                else
 264                        root = &mdev->read_requests;
 265                drbd_remove_request_interval(root, req);
 266        } else if (!(s & RQ_POSTPONED))
 267                D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0);
 268
 269        /* Before we can signal completion to the upper layers,
 270         * we may need to close the current transfer log epoch.
 271         * We are within the request lock, so we can simply compare
 272         * the request epoch number with the current transfer log
 273         * epoch number.  If they match, increase the current_tle_nr,
 274         * and reset the transfer log epoch write_cnt.
 275         */
 276        if (rw == WRITE &&
 277            req->epoch == atomic_read(&mdev->tconn->current_tle_nr))
 278                start_new_tl_epoch(mdev->tconn);
 279
 280        /* Update disk stats */
 281        _drbd_end_io_acct(mdev, req);
 282
 283        /* If READ failed,
 284         * have it be pushed back to the retry work queue,
 285         * so it will re-enter __drbd_make_request(),
 286         * and be re-assigned to a suitable local or remote path,
 287         * or failed if we do not have access to good data anymore.
 288         *
 289         * Unless it was failed early by __drbd_make_request(),
 290         * because no path was available, in which case
 291         * it was not even added to the transfer_log.
 292         *
 293         * READA may fail, and will not be retried.
 294         *
 295         * WRITE should have used all available paths already.
 296         */
 297        if (!ok && rw == READ && !list_empty(&req->tl_requests))
 298                req->rq_state |= RQ_POSTPONED;
 299
 300        if (!(req->rq_state & RQ_POSTPONED)) {
 301                m->error = ok ? 0 : (error ?: -EIO);
 302                m->bio = req->master_bio;
 303                req->master_bio = NULL;
 304        }
 305}
 306
 307static int drbd_req_put_completion_ref(struct drbd_request *req, struct bio_and_error *m, int put)
 308{
 309        struct drbd_conf *mdev = req->w.mdev;
 310        D_ASSERT(m || (req->rq_state & RQ_POSTPONED));
 311
 312        if (!atomic_sub_and_test(put, &req->completion_ref))
 313                return 0;
 314
 315        drbd_req_complete(req, m);
 316
 317        if (req->rq_state & RQ_POSTPONED) {
 318                /* don't destroy the req object just yet,
 319                 * but queue it for retry */
 320                drbd_restart_request(req);
 321                return 0;
 322        }
 323
 324        return 1;
 325}
 326
 327/* I'd like this to be the only place that manipulates
 328 * req->completion_ref and req->kref. */
 329static void mod_rq_state(struct drbd_request *req, struct bio_and_error *m,
 330                int clear, int set)
 331{
 332        struct drbd_conf *mdev = req->w.mdev;
 333        unsigned s = req->rq_state;
 334        int c_put = 0;
 335        int k_put = 0;
 336
 337        if (drbd_suspended(mdev) && !((s | clear) & RQ_COMPLETION_SUSP))
 338                set |= RQ_COMPLETION_SUSP;
 339
 340        /* apply */
 341
 342        req->rq_state &= ~clear;
 343        req->rq_state |= set;
 344
 345        /* no change? */
 346        if (req->rq_state == s)
 347                return;
 348
 349        /* intent: get references */
 350
 351        if (!(s & RQ_LOCAL_PENDING) && (set & RQ_LOCAL_PENDING))
 352                atomic_inc(&req->completion_ref);
 353
 354        if (!(s & RQ_NET_PENDING) && (set & RQ_NET_PENDING)) {
 355                inc_ap_pending(mdev);
 356                atomic_inc(&req->completion_ref);
 357        }
 358
 359        if (!(s & RQ_NET_QUEUED) && (set & RQ_NET_QUEUED))
 360                atomic_inc(&req->completion_ref);
 361
 362        if (!(s & RQ_EXP_BARR_ACK) && (set & RQ_EXP_BARR_ACK))
 363                kref_get(&req->kref); /* wait for the DONE */
 364
 365        if (!(s & RQ_NET_SENT) && (set & RQ_NET_SENT))
 366                atomic_add(req->i.size >> 9, &mdev->ap_in_flight);
 367
 368        if (!(s & RQ_COMPLETION_SUSP) && (set & RQ_COMPLETION_SUSP))
 369                atomic_inc(&req->completion_ref);
 370
 371        /* progress: put references */
 372
 373        if ((s & RQ_COMPLETION_SUSP) && (clear & RQ_COMPLETION_SUSP))
 374                ++c_put;
 375
 376        if (!(s & RQ_LOCAL_ABORTED) && (set & RQ_LOCAL_ABORTED)) {
 377                D_ASSERT(req->rq_state & RQ_LOCAL_PENDING);
 378                /* local completion may still come in later,
 379                 * we need to keep the req object around. */
 380                kref_get(&req->kref);
 381                ++c_put;
 382        }
 383
 384        if ((s & RQ_LOCAL_PENDING) && (clear & RQ_LOCAL_PENDING)) {
 385                if (req->rq_state & RQ_LOCAL_ABORTED)
 386                        ++k_put;
 387                else
 388                        ++c_put;
 389        }
 390
 391        if ((s & RQ_NET_PENDING) && (clear & RQ_NET_PENDING)) {
 392                dec_ap_pending(mdev);
 393                ++c_put;
 394        }
 395
 396        if ((s & RQ_NET_QUEUED) && (clear & RQ_NET_QUEUED))
 397                ++c_put;
 398
 399        if ((s & RQ_EXP_BARR_ACK) && !(s & RQ_NET_DONE) && (set & RQ_NET_DONE)) {
 400                if (req->rq_state & RQ_NET_SENT)
 401                        atomic_sub(req->i.size >> 9, &mdev->ap_in_flight);
 402                ++k_put;
 403        }
 404
 405        /* potentially complete and destroy */
 406
 407        if (k_put || c_put) {
 408                /* Completion does it's own kref_put.  If we are going to
 409                 * kref_sub below, we need req to be still around then. */
 410                int at_least = k_put + !!c_put;
 411                int refcount = atomic_read(&req->kref.refcount);
 412                if (refcount < at_least)
 413                        dev_err(DEV,
 414                                "mod_rq_state: Logic BUG: %x -> %x: refcount = %d, should be >= %d\n",
 415                                s, req->rq_state, refcount, at_least);
 416        }
 417
 418        /* If we made progress, retry conflicting peer requests, if any. */
 419        if (req->i.waiting)
 420                wake_up(&mdev->misc_wait);
 421
 422        if (c_put)
 423                k_put += drbd_req_put_completion_ref(req, m, c_put);
 424        if (k_put)
 425                kref_sub(&req->kref, k_put, drbd_req_destroy);
 426}
 427
 428static void drbd_report_io_error(struct drbd_conf *mdev, struct drbd_request *req)
 429{
 430        char b[BDEVNAME_SIZE];
 431
 432        if (!__ratelimit(&drbd_ratelimit_state))
 433                return;
 434
 435        dev_warn(DEV, "local %s IO error sector %llu+%u on %s\n",
 436                        (req->rq_state & RQ_WRITE) ? "WRITE" : "READ",
 437                        (unsigned long long)req->i.sector,
 438                        req->i.size >> 9,
 439                        bdevname(mdev->ldev->backing_bdev, b));
 440}
 441
 442/* obviously this could be coded as many single functions
 443 * instead of one huge switch,
 444 * or by putting the code directly in the respective locations
 445 * (as it has been before).
 446 *
 447 * but having it this way
 448 *  enforces that it is all in this one place, where it is easier to audit,
 449 *  it makes it obvious that whatever "event" "happens" to a request should
 450 *  happen "atomically" within the req_lock,
 451 *  and it enforces that we have to think in a very structured manner
 452 *  about the "events" that may happen to a request during its life time ...
 453 */
 454int __req_mod(struct drbd_request *req, enum drbd_req_event what,
 455                struct bio_and_error *m)
 456{
 457        struct drbd_conf *mdev = req->w.mdev;
 458        struct net_conf *nc;
 459        int p, rv = 0;
 460
 461        if (m)
 462                m->bio = NULL;
 463
 464        switch (what) {
 465        default:
 466                dev_err(DEV, "LOGIC BUG in %s:%u\n", __FILE__ , __LINE__);
 467                break;
 468
 469        /* does not happen...
 470         * initialization done in drbd_req_new
 471        case CREATED:
 472                break;
 473                */
 474
 475        case TO_BE_SENT: /* via network */
 476                /* reached via __drbd_make_request
 477                 * and from w_read_retry_remote */
 478                D_ASSERT(!(req->rq_state & RQ_NET_MASK));
 479                rcu_read_lock();
 480                nc = rcu_dereference(mdev->tconn->net_conf);
 481                p = nc->wire_protocol;
 482                rcu_read_unlock();
 483                req->rq_state |=
 484                        p == DRBD_PROT_C ? RQ_EXP_WRITE_ACK :
 485                        p == DRBD_PROT_B ? RQ_EXP_RECEIVE_ACK : 0;
 486                mod_rq_state(req, m, 0, RQ_NET_PENDING);
 487                break;
 488
 489        case TO_BE_SUBMITTED: /* locally */
 490                /* reached via __drbd_make_request */
 491                D_ASSERT(!(req->rq_state & RQ_LOCAL_MASK));
 492                mod_rq_state(req, m, 0, RQ_LOCAL_PENDING);
 493                break;
 494
 495        case COMPLETED_OK:
 496                if (req->rq_state & RQ_WRITE)
 497                        mdev->writ_cnt += req->i.size >> 9;
 498                else
 499                        mdev->read_cnt += req->i.size >> 9;
 500
 501                mod_rq_state(req, m, RQ_LOCAL_PENDING,
 502                                RQ_LOCAL_COMPLETED|RQ_LOCAL_OK);
 503                break;
 504
 505        case ABORT_DISK_IO:
 506                mod_rq_state(req, m, 0, RQ_LOCAL_ABORTED);
 507                break;
 508
 509        case WRITE_COMPLETED_WITH_ERROR:
 510                drbd_report_io_error(mdev, req);
 511                __drbd_chk_io_error(mdev, DRBD_WRITE_ERROR);
 512                mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
 513                break;
 514
 515        case READ_COMPLETED_WITH_ERROR:
 516                drbd_set_out_of_sync(mdev, req->i.sector, req->i.size);
 517                drbd_report_io_error(mdev, req);
 518                __drbd_chk_io_error(mdev, DRBD_READ_ERROR);
 519                /* fall through. */
 520        case READ_AHEAD_COMPLETED_WITH_ERROR:
 521                /* it is legal to fail READA, no __drbd_chk_io_error in that case. */
 522                mod_rq_state(req, m, RQ_LOCAL_PENDING, RQ_LOCAL_COMPLETED);
 523                break;
 524
 525        case QUEUE_FOR_NET_READ:
 526                /* READ or READA, and
 527                 * no local disk,
 528                 * or target area marked as invalid,
 529                 * or just got an io-error. */
 530                /* from __drbd_make_request
 531                 * or from bio_endio during read io-error recovery */
 532
 533                /* So we can verify the handle in the answer packet.
 534                 * Corresponding drbd_remove_request_interval is in
 535                 * drbd_req_complete() */
 536                D_ASSERT(drbd_interval_empty(&req->i));
 537                drbd_insert_interval(&mdev->read_requests, &req->i);
 538
 539                set_bit(UNPLUG_REMOTE, &mdev->flags);
 540
 541                D_ASSERT(req->rq_state & RQ_NET_PENDING);
 542                D_ASSERT((req->rq_state & RQ_LOCAL_MASK) == 0);
 543                mod_rq_state(req, m, 0, RQ_NET_QUEUED);
 544                req->w.cb = w_send_read_req;
 545                drbd_queue_work(&mdev->tconn->sender_work, &req->w);
 546                break;
 547
 548        case QUEUE_FOR_NET_WRITE:
 549                /* assert something? */
 550                /* from __drbd_make_request only */
 551
 552                /* Corresponding drbd_remove_request_interval is in
 553                 * drbd_req_complete() */
 554                D_ASSERT(drbd_interval_empty(&req->i));
 555                drbd_insert_interval(&mdev->write_requests, &req->i);
 556
 557                /* NOTE
 558                 * In case the req ended up on the transfer log before being
 559                 * queued on the worker, it could lead to this request being
 560                 * missed during cleanup after connection loss.
 561                 * So we have to do both operations here,
 562                 * within the same lock that protects the transfer log.
 563                 *
 564                 * _req_add_to_epoch(req); this has to be after the
 565                 * _maybe_start_new_epoch(req); which happened in
 566                 * __drbd_make_request, because we now may set the bit
 567                 * again ourselves to close the current epoch.
 568                 *
 569                 * Add req to the (now) current epoch (barrier). */
 570
 571                /* otherwise we may lose an unplug, which may cause some remote
 572                 * io-scheduler timeout to expire, increasing maximum latency,
 573                 * hurting performance. */
 574                set_bit(UNPLUG_REMOTE, &mdev->flags);
 575
 576                /* queue work item to send data */
 577                D_ASSERT(req->rq_state & RQ_NET_PENDING);
 578                mod_rq_state(req, m, 0, RQ_NET_QUEUED|RQ_EXP_BARR_ACK);
 579                req->w.cb =  w_send_dblock;
 580                drbd_queue_work(&mdev->tconn->sender_work, &req->w);
 581
 582                /* close the epoch, in case it outgrew the limit */
 583                rcu_read_lock();
 584                nc = rcu_dereference(mdev->tconn->net_conf);
 585                p = nc->max_epoch_size;
 586                rcu_read_unlock();
 587                if (mdev->tconn->current_tle_writes >= p)
 588                        start_new_tl_epoch(mdev->tconn);
 589
 590                break;
 591
 592        case QUEUE_FOR_SEND_OOS:
 593                mod_rq_state(req, m, 0, RQ_NET_QUEUED);
 594                req->w.cb =  w_send_out_of_sync;
 595                drbd_queue_work(&mdev->tconn->sender_work, &req->w);
 596                break;
 597
 598        case READ_RETRY_REMOTE_CANCELED:
 599        case SEND_CANCELED:
 600        case SEND_FAILED:
 601                /* real cleanup will be done from tl_clear.  just update flags
 602                 * so it is no longer marked as on the worker queue */
 603                mod_rq_state(req, m, RQ_NET_QUEUED, 0);
 604                break;
 605
 606        case HANDED_OVER_TO_NETWORK:
 607                /* assert something? */
 608                if (bio_data_dir(req->master_bio) == WRITE &&
 609                    !(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK))) {
 610                        /* this is what is dangerous about protocol A:
 611                         * pretend it was successfully written on the peer. */
 612                        if (req->rq_state & RQ_NET_PENDING)
 613                                mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK);
 614                        /* else: neg-ack was faster... */
 615                        /* it is still not yet RQ_NET_DONE until the
 616                         * corresponding epoch barrier got acked as well,
 617                         * so we know what to dirty on connection loss */
 618                }
 619                mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_SENT);
 620                break;
 621
 622        case OOS_HANDED_TO_NETWORK:
 623                /* Was not set PENDING, no longer QUEUED, so is now DONE
 624                 * as far as this connection is concerned. */
 625                mod_rq_state(req, m, RQ_NET_QUEUED, RQ_NET_DONE);
 626                break;
 627
 628        case CONNECTION_LOST_WHILE_PENDING:
 629                /* transfer log cleanup after connection loss */
 630                mod_rq_state(req, m,
 631                                RQ_NET_OK|RQ_NET_PENDING|RQ_COMPLETION_SUSP,
 632                                RQ_NET_DONE);
 633                break;
 634
 635        case CONFLICT_RESOLVED:
 636                /* for superseded conflicting writes of multiple primaries,
 637                 * there is no need to keep anything in the tl, potential
 638                 * node crashes are covered by the activity log.
 639                 *
 640                 * If this request had been marked as RQ_POSTPONED before,
 641                 * it will actually not be completed, but "restarted",
 642                 * resubmitted from the retry worker context. */
 643                D_ASSERT(req->rq_state & RQ_NET_PENDING);
 644                D_ASSERT(req->rq_state & RQ_EXP_WRITE_ACK);
 645                mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_DONE|RQ_NET_OK);
 646                break;
 647
 648        case WRITE_ACKED_BY_PEER_AND_SIS:
 649                req->rq_state |= RQ_NET_SIS;
 650        case WRITE_ACKED_BY_PEER:
 651                D_ASSERT(req->rq_state & RQ_EXP_WRITE_ACK);
 652                /* protocol C; successfully written on peer.
 653                 * Nothing more to do here.
 654                 * We want to keep the tl in place for all protocols, to cater
 655                 * for volatile write-back caches on lower level devices. */
 656
 657                goto ack_common;
 658        case RECV_ACKED_BY_PEER:
 659                D_ASSERT(req->rq_state & RQ_EXP_RECEIVE_ACK);
 660                /* protocol B; pretends to be successfully written on peer.
 661                 * see also notes above in HANDED_OVER_TO_NETWORK about
 662                 * protocol != C */
 663        ack_common:
 664                D_ASSERT(req->rq_state & RQ_NET_PENDING);
 665                mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK);
 666                break;
 667
 668        case POSTPONE_WRITE:
 669                D_ASSERT(req->rq_state & RQ_EXP_WRITE_ACK);
 670                /* If this node has already detected the write conflict, the
 671                 * worker will be waiting on misc_wait.  Wake it up once this
 672                 * request has completed locally.
 673                 */
 674                D_ASSERT(req->rq_state & RQ_NET_PENDING);
 675                req->rq_state |= RQ_POSTPONED;
 676                if (req->i.waiting)
 677                        wake_up(&mdev->misc_wait);
 678                /* Do not clear RQ_NET_PENDING. This request will make further
 679                 * progress via restart_conflicting_writes() or
 680                 * fail_postponed_requests(). Hopefully. */
 681                break;
 682
 683        case NEG_ACKED:
 684                mod_rq_state(req, m, RQ_NET_OK|RQ_NET_PENDING, 0);
 685                break;
 686
 687        case FAIL_FROZEN_DISK_IO:
 688                if (!(req->rq_state & RQ_LOCAL_COMPLETED))
 689                        break;
 690                mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0);
 691                break;
 692
 693        case RESTART_FROZEN_DISK_IO:
 694                if (!(req->rq_state & RQ_LOCAL_COMPLETED))
 695                        break;
 696
 697                mod_rq_state(req, m,
 698                                RQ_COMPLETION_SUSP|RQ_LOCAL_COMPLETED,
 699                                RQ_LOCAL_PENDING);
 700
 701                rv = MR_READ;
 702                if (bio_data_dir(req->master_bio) == WRITE)
 703                        rv = MR_WRITE;
 704
 705                get_ldev(mdev); /* always succeeds in this call path */
 706                req->w.cb = w_restart_disk_io;
 707                drbd_queue_work(&mdev->tconn->sender_work, &req->w);
 708                break;
 709
 710        case RESEND:
 711                /* Simply complete (local only) READs. */
 712                if (!(req->rq_state & RQ_WRITE) && !req->w.cb) {
 713                        mod_rq_state(req, m, RQ_COMPLETION_SUSP, 0);
 714                        break;
 715                }
 716
 717                /* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK
 718                   before the connection loss (B&C only); only P_BARRIER_ACK
 719                   (or the local completion?) was missing when we suspended.
 720                   Throwing them out of the TL here by pretending we got a BARRIER_ACK.
 721                   During connection handshake, we ensure that the peer was not rebooted. */
 722                if (!(req->rq_state & RQ_NET_OK)) {
 723                        /* FIXME could this possibly be a req->w.cb == w_send_out_of_sync?
 724                         * in that case we must not set RQ_NET_PENDING. */
 725
 726                        mod_rq_state(req, m, RQ_COMPLETION_SUSP, RQ_NET_QUEUED|RQ_NET_PENDING);
 727                        if (req->w.cb) {
 728                                drbd_queue_work(&mdev->tconn->sender_work, &req->w);
 729                                rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
 730                        } /* else: FIXME can this happen? */
 731                        break;
 732                }
 733                /* else, fall through to BARRIER_ACKED */
 734
 735        case BARRIER_ACKED:
 736                /* barrier ack for READ requests does not make sense */
 737                if (!(req->rq_state & RQ_WRITE))
 738                        break;
 739
 740                if (req->rq_state & RQ_NET_PENDING) {
 741                        /* barrier came in before all requests were acked.
 742                         * this is bad, because if the connection is lost now,
 743                         * we won't be able to clean them up... */
 744                        dev_err(DEV, "FIXME (BARRIER_ACKED but pending)\n");
 745                }
 746                /* Allowed to complete requests, even while suspended.
 747                 * As this is called for all requests within a matching epoch,
 748                 * we need to filter, and only set RQ_NET_DONE for those that
 749                 * have actually been on the wire. */
 750                mod_rq_state(req, m, RQ_COMPLETION_SUSP,
 751                                (req->rq_state & RQ_NET_MASK) ? RQ_NET_DONE : 0);
 752                break;
 753
 754        case DATA_RECEIVED:
 755                D_ASSERT(req->rq_state & RQ_NET_PENDING);
 756                mod_rq_state(req, m, RQ_NET_PENDING, RQ_NET_OK|RQ_NET_DONE);
 757                break;
 758        };
 759
 760        return rv;
 761}
 762
 763/* we may do a local read if:
 764 * - we are consistent (of course),
 765 * - or we are generally inconsistent,
 766 *   BUT we are still/already IN SYNC for this area.
 767 *   since size may be bigger than BM_BLOCK_SIZE,
 768 *   we may need to check several bits.
 769 */
 770static bool drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int size)
 771{
 772        unsigned long sbnr, ebnr;
 773        sector_t esector, nr_sectors;
 774
 775        if (mdev->state.disk == D_UP_TO_DATE)
 776                return true;
 777        if (mdev->state.disk != D_INCONSISTENT)
 778                return false;
 779        esector = sector + (size >> 9) - 1;
 780        nr_sectors = drbd_get_capacity(mdev->this_bdev);
 781        D_ASSERT(sector  < nr_sectors);
 782        D_ASSERT(esector < nr_sectors);
 783
 784        sbnr = BM_SECT_TO_BIT(sector);
 785        ebnr = BM_SECT_TO_BIT(esector);
 786
 787        return drbd_bm_count_bits(mdev, sbnr, ebnr) == 0;
 788}
 789
 790static bool remote_due_to_read_balancing(struct drbd_conf *mdev, sector_t sector,
 791                enum drbd_read_balancing rbm)
 792{
 793        struct backing_dev_info *bdi;
 794        int stripe_shift;
 795
 796        switch (rbm) {
 797        case RB_CONGESTED_REMOTE:
 798                bdi = &mdev->ldev->backing_bdev->bd_disk->queue->backing_dev_info;
 799                return bdi_read_congested(bdi);
 800        case RB_LEAST_PENDING:
 801                return atomic_read(&mdev->local_cnt) >
 802                        atomic_read(&mdev->ap_pending_cnt) + atomic_read(&mdev->rs_pending_cnt);
 803        case RB_32K_STRIPING:  /* stripe_shift = 15 */
 804        case RB_64K_STRIPING:
 805        case RB_128K_STRIPING:
 806        case RB_256K_STRIPING:
 807        case RB_512K_STRIPING:
 808        case RB_1M_STRIPING:   /* stripe_shift = 20 */
 809                stripe_shift = (rbm - RB_32K_STRIPING + 15);
 810                return (sector >> (stripe_shift - 9)) & 1;
 811        case RB_ROUND_ROBIN:
 812                return test_and_change_bit(READ_BALANCE_RR, &mdev->flags);
 813        case RB_PREFER_REMOTE:
 814                return true;
 815        case RB_PREFER_LOCAL:
 816        default:
 817                return false;
 818        }
 819}
 820
 821/*
 822 * complete_conflicting_writes  -  wait for any conflicting write requests
 823 *
 824 * The write_requests tree contains all active write requests which we
 825 * currently know about.  Wait for any requests to complete which conflict with
 826 * the new one.
 827 *
 828 * Only way out: remove the conflicting intervals from the tree.
 829 */
 830static void complete_conflicting_writes(struct drbd_request *req)
 831{
 832        DEFINE_WAIT(wait);
 833        struct drbd_conf *mdev = req->w.mdev;
 834        struct drbd_interval *i;
 835        sector_t sector = req->i.sector;
 836        int size = req->i.size;
 837
 838        i = drbd_find_overlap(&mdev->write_requests, sector, size);
 839        if (!i)
 840                return;
 841
 842        for (;;) {
 843                prepare_to_wait(&mdev->misc_wait, &wait, TASK_UNINTERRUPTIBLE);
 844                i = drbd_find_overlap(&mdev->write_requests, sector, size);
 845                if (!i)
 846                        break;
 847                /* Indicate to wake up device->misc_wait on progress.  */
 848                i->waiting = true;
 849                spin_unlock_irq(&mdev->tconn->req_lock);
 850                schedule();
 851                spin_lock_irq(&mdev->tconn->req_lock);
 852        }
 853        finish_wait(&mdev->misc_wait, &wait);
 854}
 855
 856/* called within req_lock and rcu_read_lock() */
 857static void maybe_pull_ahead(struct drbd_conf *mdev)
 858{
 859        struct drbd_tconn *tconn = mdev->tconn;
 860        struct net_conf *nc;
 861        bool congested = false;
 862        enum drbd_on_congestion on_congestion;
 863
 864        nc = rcu_dereference(tconn->net_conf);
 865        on_congestion = nc ? nc->on_congestion : OC_BLOCK;
 866        if (on_congestion == OC_BLOCK ||
 867            tconn->agreed_pro_version < 96)
 868                return;
 869
 870        /* If I don't even have good local storage, we can not reasonably try
 871         * to pull ahead of the peer. We also need the local reference to make
 872         * sure mdev->act_log is there.
 873         */
 874        if (!get_ldev_if_state(mdev, D_UP_TO_DATE))
 875                return;
 876
 877        if (nc->cong_fill &&
 878            atomic_read(&mdev->ap_in_flight) >= nc->cong_fill) {
 879                dev_info(DEV, "Congestion-fill threshold reached\n");
 880                congested = true;
 881        }
 882
 883        if (mdev->act_log->used >= nc->cong_extents) {
 884                dev_info(DEV, "Congestion-extents threshold reached\n");
 885                congested = true;
 886        }
 887
 888        if (congested) {
 889                /* start a new epoch for non-mirrored writes */
 890                start_new_tl_epoch(mdev->tconn);
 891
 892                if (on_congestion == OC_PULL_AHEAD)
 893                        _drbd_set_state(_NS(mdev, conn, C_AHEAD), 0, NULL);
 894                else  /*nc->on_congestion == OC_DISCONNECT */
 895                        _drbd_set_state(_NS(mdev, conn, C_DISCONNECTING), 0, NULL);
 896        }
 897        put_ldev(mdev);
 898}
 899
 900/* If this returns false, and req->private_bio is still set,
 901 * this should be submitted locally.
 902 *
 903 * If it returns false, but req->private_bio is not set,
 904 * we do not have access to good data :(
 905 *
 906 * Otherwise, this destroys req->private_bio, if any,
 907 * and returns true.
 908 */
 909static bool do_remote_read(struct drbd_request *req)
 910{
 911        struct drbd_conf *mdev = req->w.mdev;
 912        enum drbd_read_balancing rbm;
 913
 914        if (req->private_bio) {
 915                if (!drbd_may_do_local_read(mdev,
 916                                        req->i.sector, req->i.size)) {
 917                        bio_put(req->private_bio);
 918                        req->private_bio = NULL;
 919                        put_ldev(mdev);
 920                }
 921        }
 922
 923        if (mdev->state.pdsk != D_UP_TO_DATE)
 924                return false;
 925
 926        if (req->private_bio == NULL)
 927                return true;
 928
 929        /* TODO: improve read balancing decisions, take into account drbd
 930         * protocol, pending requests etc. */
 931
 932        rcu_read_lock();
 933        rbm = rcu_dereference(mdev->ldev->disk_conf)->read_balancing;
 934        rcu_read_unlock();
 935
 936        if (rbm == RB_PREFER_LOCAL && req->private_bio)
 937                return false; /* submit locally */
 938
 939        if (remote_due_to_read_balancing(mdev, req->i.sector, rbm)) {
 940                if (req->private_bio) {
 941                        bio_put(req->private_bio);
 942                        req->private_bio = NULL;
 943                        put_ldev(mdev);
 944                }
 945                return true;
 946        }
 947
 948        return false;
 949}
 950
 951/* returns number of connections (== 1, for drbd 8.4)
 952 * expected to actually write this data,
 953 * which does NOT include those that we are L_AHEAD for. */
 954static int drbd_process_write_request(struct drbd_request *req)
 955{
 956        struct drbd_conf *mdev = req->w.mdev;
 957        int remote, send_oos;
 958
 959        rcu_read_lock();
 960        remote = drbd_should_do_remote(mdev->state);
 961        if (remote) {
 962                maybe_pull_ahead(mdev);
 963                remote = drbd_should_do_remote(mdev->state);
 964        }
 965        send_oos = drbd_should_send_out_of_sync(mdev->state);
 966        rcu_read_unlock();
 967
 968        /* Need to replicate writes.  Unless it is an empty flush,
 969         * which is better mapped to a DRBD P_BARRIER packet,
 970         * also for drbd wire protocol compatibility reasons.
 971         * If this was a flush, just start a new epoch.
 972         * Unless the current epoch was empty anyways, or we are not currently
 973         * replicating, in which case there is no point. */
 974        if (unlikely(req->i.size == 0)) {
 975                /* The only size==0 bios we expect are empty flushes. */
 976                D_ASSERT(req->master_bio->bi_rw & REQ_FLUSH);
 977                if (remote)
 978                        start_new_tl_epoch(mdev->tconn);
 979                return 0;
 980        }
 981
 982        if (!remote && !send_oos)
 983                return 0;
 984
 985        D_ASSERT(!(remote && send_oos));
 986
 987        if (remote) {
 988                _req_mod(req, TO_BE_SENT);
 989                _req_mod(req, QUEUE_FOR_NET_WRITE);
 990        } else if (drbd_set_out_of_sync(mdev, req->i.sector, req->i.size))
 991                _req_mod(req, QUEUE_FOR_SEND_OOS);
 992
 993        return remote;
 994}
 995
 996static void
 997drbd_submit_req_private_bio(struct drbd_request *req)
 998{
 999        struct drbd_conf *mdev = req->w.mdev;
1000        struct bio *bio = req->private_bio;
1001        const int rw = bio_rw(bio);
1002
1003        bio->bi_bdev = mdev->ldev->backing_bdev;
1004
1005        /* State may have changed since we grabbed our reference on the
1006         * ->ldev member. Double check, and short-circuit to endio.
1007         * In case the last activity log transaction failed to get on
1008         * stable storage, and this is a WRITE, we may not even submit
1009         * this bio. */
1010        if (get_ldev(mdev)) {
1011                if (drbd_insert_fault(mdev,
1012                                      rw == WRITE ? DRBD_FAULT_DT_WR
1013                                    : rw == READ  ? DRBD_FAULT_DT_RD
1014                                    :               DRBD_FAULT_DT_RA))
1015                        bio_endio(bio, -EIO);
1016                else
1017                        generic_make_request(bio);
1018                put_ldev(mdev);
1019        } else
1020                bio_endio(bio, -EIO);
1021}
1022
1023void __drbd_make_request(struct drbd_conf *mdev, struct bio *bio, unsigned long start_time)
1024{
1025        const int rw = bio_rw(bio);
1026        struct bio_and_error m = { NULL, };
1027        struct drbd_request *req;
1028        bool no_remote = false;
1029
1030        /* allocate outside of all locks; */
1031        req = drbd_req_new(mdev, bio);
1032        if (!req) {
1033                dec_ap_bio(mdev);
1034                /* only pass the error to the upper layers.
1035                 * if user cannot handle io errors, that's not our business. */
1036                dev_err(DEV, "could not kmalloc() req\n");
1037                bio_endio(bio, -ENOMEM);
1038                return;
1039        }
1040        req->start_time = start_time;
1041
1042        if (!get_ldev(mdev)) {
1043                bio_put(req->private_bio);
1044                req->private_bio = NULL;
1045        }
1046
1047        /* For WRITES going to the local disk, grab a reference on the target
1048         * extent.  This waits for any resync activity in the corresponding
1049         * resync extent to finish, and, if necessary, pulls in the target
1050         * extent into the activity log, which involves further disk io because
1051         * of transactional on-disk meta data updates.
1052         * Empty flushes don't need to go into the activity log, they can only
1053         * flush data for pending writes which are already in there. */
1054        if (rw == WRITE && req->private_bio && req->i.size
1055        && !test_bit(AL_SUSPENDED, &mdev->flags)) {
1056                req->rq_state |= RQ_IN_ACT_LOG;
1057                drbd_al_begin_io(mdev, &req->i);
1058        }
1059
1060        spin_lock_irq(&mdev->tconn->req_lock);
1061        if (rw == WRITE) {
1062                /* This may temporarily give up the req_lock,
1063                 * but will re-aquire it before it returns here.
1064                 * Needs to be before the check on drbd_suspended() */
1065                complete_conflicting_writes(req);
1066        }
1067
1068        /* no more giving up req_lock from now on! */
1069
1070        if (drbd_suspended(mdev)) {
1071                /* push back and retry: */
1072                req->rq_state |= RQ_POSTPONED;
1073                if (req->private_bio) {
1074                        bio_put(req->private_bio);
1075                        req->private_bio = NULL;
1076                        put_ldev(mdev);
1077                }
1078                goto out;
1079        }
1080
1081        /* Update disk stats */
1082        _drbd_start_io_acct(mdev, req, bio);
1083
1084        /* We fail READ/READA early, if we can not serve it.
1085         * We must do this before req is registered on any lists.
1086         * Otherwise, drbd_req_complete() will queue failed READ for retry. */
1087        if (rw != WRITE) {
1088                if (!do_remote_read(req) && !req->private_bio)
1089                        goto nodata;
1090        }
1091
1092        /* which transfer log epoch does this belong to? */
1093        req->epoch = atomic_read(&mdev->tconn->current_tle_nr);
1094
1095        /* no point in adding empty flushes to the transfer log,
1096         * they are mapped to drbd barriers already. */
1097        if (likely(req->i.size!=0)) {
1098                if (rw == WRITE)
1099                        mdev->tconn->current_tle_writes++;
1100
1101                list_add_tail(&req->tl_requests, &mdev->tconn->transfer_log);
1102        }
1103
1104        if (rw == WRITE) {
1105                if (!drbd_process_write_request(req))
1106                        no_remote = true;
1107        } else {
1108                /* We either have a private_bio, or we can read from remote.
1109                 * Otherwise we had done the goto nodata above. */
1110                if (req->private_bio == NULL) {
1111                        _req_mod(req, TO_BE_SENT);
1112                        _req_mod(req, QUEUE_FOR_NET_READ);
1113                } else
1114                        no_remote = true;
1115        }
1116
1117        if (req->private_bio) {
1118                /* needs to be marked within the same spinlock */
1119                _req_mod(req, TO_BE_SUBMITTED);
1120                /* but we need to give up the spinlock to submit */
1121                spin_unlock_irq(&mdev->tconn->req_lock);
1122                drbd_submit_req_private_bio(req);
1123                spin_lock_irq(&mdev->tconn->req_lock);
1124        } else if (no_remote) {
1125nodata:
1126                if (__ratelimit(&drbd_ratelimit_state))
1127                        dev_err(DEV, "IO ERROR: neither local nor remote data, sector %llu+%u\n",
1128                                        (unsigned long long)req->i.sector, req->i.size >> 9);
1129                /* A write may have been queued for send_oos, however.
1130                 * So we can not simply free it, we must go through drbd_req_put_completion_ref() */
1131        }
1132
1133out:
1134        if (drbd_req_put_completion_ref(req, &m, 1))
1135                kref_put(&req->kref, drbd_req_destroy);
1136        spin_unlock_irq(&mdev->tconn->req_lock);
1137
1138        if (m.bio)
1139                complete_master_bio(mdev, &m);
1140        return;
1141}
1142
1143void drbd_make_request(struct request_queue *q, struct bio *bio)
1144{
1145        struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;
1146        unsigned long start_time;
1147
1148        start_time = jiffies;
1149
1150        /*
1151         * what we "blindly" assume:
1152         */
1153        D_ASSERT(IS_ALIGNED(bio->bi_size, 512));
1154
1155        inc_ap_bio(mdev);
1156        __drbd_make_request(mdev, bio, start_time);
1157}
1158
1159/* This is called by bio_add_page().
1160 *
1161 * q->max_hw_sectors and other global limits are already enforced there.
1162 *
1163 * We need to call down to our lower level device,
1164 * in case it has special restrictions.
1165 *
1166 * We also may need to enforce configured max-bio-bvecs limits.
1167 *
1168 * As long as the BIO is empty we have to allow at least one bvec,
1169 * regardless of size and offset, so no need to ask lower levels.
1170 */
1171int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct bio_vec *bvec)
1172{
1173        struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;
1174        unsigned int bio_size = bvm->bi_size;
1175        int limit = DRBD_MAX_BIO_SIZE;
1176        int backing_limit;
1177
1178        if (bio_size && get_ldev(mdev)) {
1179                struct request_queue * const b =
1180                        mdev->ldev->backing_bdev->bd_disk->queue;
1181                if (b->merge_bvec_fn) {
1182                        backing_limit = b->merge_bvec_fn(b, bvm, bvec);
1183                        limit = min(limit, backing_limit);
1184                }
1185                put_ldev(mdev);
1186        }
1187        return limit;
1188}
1189
1190struct drbd_request *find_oldest_request(struct drbd_tconn *tconn)
1191{
1192        /* Walk the transfer log,
1193         * and find the oldest not yet completed request */
1194        struct drbd_request *r;
1195        list_for_each_entry(r, &tconn->transfer_log, tl_requests) {
1196                if (atomic_read(&r->completion_ref))
1197                        return r;
1198        }
1199        return NULL;
1200}
1201
1202void request_timer_fn(unsigned long data)
1203{
1204        struct drbd_conf *mdev = (struct drbd_conf *) data;
1205        struct drbd_tconn *tconn = mdev->tconn;
1206        struct drbd_request *req; /* oldest request */
1207        struct net_conf *nc;
1208        unsigned long ent = 0, dt = 0, et, nt; /* effective timeout = ko_count * timeout */
1209        unsigned long now;
1210
1211        rcu_read_lock();
1212        nc = rcu_dereference(tconn->net_conf);
1213        if (nc && mdev->state.conn >= C_WF_REPORT_PARAMS)
1214                ent = nc->timeout * HZ/10 * nc->ko_count;
1215
1216        if (get_ldev(mdev)) { /* implicit state.disk >= D_INCONSISTENT */
1217                dt = rcu_dereference(mdev->ldev->disk_conf)->disk_timeout * HZ / 10;
1218                put_ldev(mdev);
1219        }
1220        rcu_read_unlock();
1221
1222        et = min_not_zero(dt, ent);
1223
1224        if (!et)
1225                return; /* Recurring timer stopped */
1226
1227        now = jiffies;
1228
1229        spin_lock_irq(&tconn->req_lock);
1230        req = find_oldest_request(tconn);
1231        if (!req) {
1232                spin_unlock_irq(&tconn->req_lock);
1233                mod_timer(&mdev->request_timer, now + et);
1234                return;
1235        }
1236
1237        /* The request is considered timed out, if
1238         * - we have some effective timeout from the configuration,
1239         *   with above state restrictions applied,
1240         * - the oldest request is waiting for a response from the network
1241         *   resp. the local disk,
1242         * - the oldest request is in fact older than the effective timeout,
1243         * - the connection was established (resp. disk was attached)
1244         *   for longer than the timeout already.
1245         * Note that for 32bit jiffies and very stable connections/disks,
1246         * we may have a wrap around, which is catched by
1247         *   !time_in_range(now, last_..._jif, last_..._jif + timeout).
1248         *
1249         * Side effect: once per 32bit wrap-around interval, which means every
1250         * ~198 days with 250 HZ, we have a window where the timeout would need
1251         * to expire twice (worst case) to become effective. Good enough.
1252         */
1253        if (ent && req->rq_state & RQ_NET_PENDING &&
1254                 time_after(now, req->start_time + ent) &&
1255                !time_in_range(now, tconn->last_reconnect_jif, tconn->last_reconnect_jif + ent)) {
1256                dev_warn(DEV, "Remote failed to finish a request within ko-count * timeout\n");
1257                _drbd_set_state(_NS(mdev, conn, C_TIMEOUT), CS_VERBOSE | CS_HARD, NULL);
1258        }
1259        if (dt && req->rq_state & RQ_LOCAL_PENDING && req->w.mdev == mdev &&
1260                 time_after(now, req->start_time + dt) &&
1261                !time_in_range(now, mdev->last_reattach_jif, mdev->last_reattach_jif + dt)) {
1262                dev_warn(DEV, "Local backing device failed to meet the disk-timeout\n");
1263                __drbd_chk_io_error(mdev, DRBD_FORCE_DETACH);
1264        }
1265        nt = (time_after(now, req->start_time + et) ? now : req->start_time) + et;
1266        spin_unlock_irq(&tconn->req_lock);
1267        mod_timer(&mdev->request_timer, nt);
1268}
1269