linux/drivers/block/drbd/drbd_req.c
<<
>>
Prefs
   1/*
   2   drbd_req.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23
  24 */
  25
  26#include <linux/module.h>
  27
  28#include <linux/slab.h>
  29#include <linux/drbd.h>
  30#include "drbd_int.h"
  31#include "drbd_req.h"
  32
  33
  34/* Update disk stats at start of I/O request */
  35static void _drbd_start_io_acct(struct drbd_conf *mdev, struct drbd_request *req, struct bio *bio)
  36{
  37        const int rw = bio_data_dir(bio);
  38        int cpu;
  39        cpu = part_stat_lock();
  40        part_stat_inc(cpu, &mdev->vdisk->part0, ios[rw]);
  41        part_stat_add(cpu, &mdev->vdisk->part0, sectors[rw], bio_sectors(bio));
  42        part_inc_in_flight(&mdev->vdisk->part0, rw);
  43        part_stat_unlock();
  44}
  45
  46/* Update disk stats when completing request upwards */
  47static void _drbd_end_io_acct(struct drbd_conf *mdev, struct drbd_request *req)
  48{
  49        int rw = bio_data_dir(req->master_bio);
  50        unsigned long duration = jiffies - req->start_time;
  51        int cpu;
  52        cpu = part_stat_lock();
  53        part_stat_add(cpu, &mdev->vdisk->part0, ticks[rw], duration);
  54        part_round_stats(cpu, &mdev->vdisk->part0);
  55        part_dec_in_flight(&mdev->vdisk->part0, rw);
  56        part_stat_unlock();
  57}
  58
  59static void _req_is_done(struct drbd_conf *mdev, struct drbd_request *req, const int rw)
  60{
  61        const unsigned long s = req->rq_state;
  62
  63        /* remove it from the transfer log.
  64         * well, only if it had been there in the first
  65         * place... if it had not (local only or conflicting
  66         * and never sent), it should still be "empty" as
  67         * initialized in drbd_req_new(), so we can list_del() it
  68         * here unconditionally */
  69        list_del(&req->tl_requests);
  70
  71        /* if it was a write, we may have to set the corresponding
  72         * bit(s) out-of-sync first. If it had a local part, we need to
  73         * release the reference to the activity log. */
  74        if (rw == WRITE) {
  75                /* Set out-of-sync unless both OK flags are set
  76                 * (local only or remote failed).
  77                 * Other places where we set out-of-sync:
  78                 * READ with local io-error */
  79                if (!(s & RQ_NET_OK) || !(s & RQ_LOCAL_OK))
  80                        drbd_set_out_of_sync(mdev, req->sector, req->size);
  81
  82                if ((s & RQ_NET_OK) && (s & RQ_LOCAL_OK) && (s & RQ_NET_SIS))
  83                        drbd_set_in_sync(mdev, req->sector, req->size);
  84
  85                /* one might be tempted to move the drbd_al_complete_io
  86                 * to the local io completion callback drbd_endio_pri.
  87                 * but, if this was a mirror write, we may only
  88                 * drbd_al_complete_io after this is RQ_NET_DONE,
  89                 * otherwise the extent could be dropped from the al
  90                 * before it has actually been written on the peer.
  91                 * if we crash before our peer knows about the request,
  92                 * but after the extent has been dropped from the al,
  93                 * we would forget to resync the corresponding extent.
  94                 */
  95                if (s & RQ_LOCAL_MASK) {
  96                        if (get_ldev_if_state(mdev, D_FAILED)) {
  97                                if (s & RQ_IN_ACT_LOG)
  98                                        drbd_al_complete_io(mdev, req->sector);
  99                                put_ldev(mdev);
 100                        } else if (__ratelimit(&drbd_ratelimit_state)) {
 101                                dev_warn(DEV, "Should have called drbd_al_complete_io(, %llu), "
 102                                     "but my Disk seems to have failed :(\n",
 103                                     (unsigned long long) req->sector);
 104                        }
 105                }
 106        }
 107
 108        drbd_req_free(req);
 109}
 110
 111static void queue_barrier(struct drbd_conf *mdev)
 112{
 113        struct drbd_tl_epoch *b;
 114
 115        /* We are within the req_lock. Once we queued the barrier for sending,
 116         * we set the CREATE_BARRIER bit. It is cleared as soon as a new
 117         * barrier/epoch object is added. This is the only place this bit is
 118         * set. It indicates that the barrier for this epoch is already queued,
 119         * and no new epoch has been created yet. */
 120        if (test_bit(CREATE_BARRIER, &mdev->flags))
 121                return;
 122
 123        b = mdev->newest_tle;
 124        b->w.cb = w_send_barrier;
 125        /* inc_ap_pending done here, so we won't
 126         * get imbalanced on connection loss.
 127         * dec_ap_pending will be done in got_BarrierAck
 128         * or (on connection loss) in tl_clear.  */
 129        inc_ap_pending(mdev);
 130        drbd_queue_work(&mdev->data.work, &b->w);
 131        set_bit(CREATE_BARRIER, &mdev->flags);
 132}
 133
 134static void _about_to_complete_local_write(struct drbd_conf *mdev,
 135        struct drbd_request *req)
 136{
 137        const unsigned long s = req->rq_state;
 138        struct drbd_request *i;
 139        struct drbd_epoch_entry *e;
 140        struct hlist_node *n;
 141        struct hlist_head *slot;
 142
 143        /* before we can signal completion to the upper layers,
 144         * we may need to close the current epoch */
 145        if (mdev->state.conn >= C_CONNECTED &&
 146            req->epoch == mdev->newest_tle->br_number)
 147                queue_barrier(mdev);
 148
 149        /* we need to do the conflict detection stuff,
 150         * if we have the ee_hash (two_primaries) and
 151         * this has been on the network */
 152        if ((s & RQ_NET_DONE) && mdev->ee_hash != NULL) {
 153                const sector_t sector = req->sector;
 154                const int size = req->size;
 155
 156                /* ASSERT:
 157                 * there must be no conflicting requests, since
 158                 * they must have been failed on the spot */
 159#define OVERLAPS overlaps(sector, size, i->sector, i->size)
 160                slot = tl_hash_slot(mdev, sector);
 161                hlist_for_each_entry(i, n, slot, colision) {
 162                        if (OVERLAPS) {
 163                                dev_alert(DEV, "LOGIC BUG: completed: %p %llus +%u; "
 164                                      "other: %p %llus +%u\n",
 165                                      req, (unsigned long long)sector, size,
 166                                      i, (unsigned long long)i->sector, i->size);
 167                        }
 168                }
 169
 170                /* maybe "wake" those conflicting epoch entries
 171                 * that wait for this request to finish.
 172                 *
 173                 * currently, there can be only _one_ such ee
 174                 * (well, or some more, which would be pending
 175                 * P_DISCARD_ACK not yet sent by the asender...),
 176                 * since we block the receiver thread upon the
 177                 * first conflict detection, which will wait on
 178                 * misc_wait.  maybe we want to assert that?
 179                 *
 180                 * anyways, if we found one,
 181                 * we just have to do a wake_up.  */
 182#undef OVERLAPS
 183#define OVERLAPS overlaps(sector, size, e->sector, e->size)
 184                slot = ee_hash_slot(mdev, req->sector);
 185                hlist_for_each_entry(e, n, slot, colision) {
 186                        if (OVERLAPS) {
 187                                wake_up(&mdev->misc_wait);
 188                                break;
 189                        }
 190                }
 191        }
 192#undef OVERLAPS
 193}
 194
 195void complete_master_bio(struct drbd_conf *mdev,
 196                struct bio_and_error *m)
 197{
 198        bio_endio(m->bio, m->error);
 199        dec_ap_bio(mdev);
 200}
 201
 202/* Helper for __req_mod().
 203 * Set m->bio to the master bio, if it is fit to be completed,
 204 * or leave it alone (it is initialized to NULL in __req_mod),
 205 * if it has already been completed, or cannot be completed yet.
 206 * If m->bio is set, the error status to be returned is placed in m->error.
 207 */
 208void _req_may_be_done(struct drbd_request *req, struct bio_and_error *m)
 209{
 210        const unsigned long s = req->rq_state;
 211        struct drbd_conf *mdev = req->mdev;
 212        /* only WRITES may end up here without a master bio (on barrier ack) */
 213        int rw = req->master_bio ? bio_data_dir(req->master_bio) : WRITE;
 214
 215        /* we must not complete the master bio, while it is
 216         *      still being processed by _drbd_send_zc_bio (drbd_send_dblock)
 217         *      not yet acknowledged by the peer
 218         *      not yet completed by the local io subsystem
 219         * these flags may get cleared in any order by
 220         *      the worker,
 221         *      the receiver,
 222         *      the bio_endio completion callbacks.
 223         */
 224        if (s & RQ_NET_QUEUED)
 225                return;
 226        if (s & RQ_NET_PENDING)
 227                return;
 228        if (s & RQ_LOCAL_PENDING)
 229                return;
 230
 231        if (req->master_bio) {
 232                /* this is data_received (remote read)
 233                 * or protocol C P_WRITE_ACK
 234                 * or protocol B P_RECV_ACK
 235                 * or protocol A "handed_over_to_network" (SendAck)
 236                 * or canceled or failed,
 237                 * or killed from the transfer log due to connection loss.
 238                 */
 239
 240                /*
 241                 * figure out whether to report success or failure.
 242                 *
 243                 * report success when at least one of the operations succeeded.
 244                 * or, to put the other way,
 245                 * only report failure, when both operations failed.
 246                 *
 247                 * what to do about the failures is handled elsewhere.
 248                 * what we need to do here is just: complete the master_bio.
 249                 *
 250                 * local completion error, if any, has been stored as ERR_PTR
 251                 * in private_bio within drbd_endio_pri.
 252                 */
 253                int ok = (s & RQ_LOCAL_OK) || (s & RQ_NET_OK);
 254                int error = PTR_ERR(req->private_bio);
 255
 256                /* remove the request from the conflict detection
 257                 * respective block_id verification hash */
 258                if (!hlist_unhashed(&req->colision))
 259                        hlist_del(&req->colision);
 260                else
 261                        D_ASSERT((s & (RQ_NET_MASK & ~RQ_NET_DONE)) == 0);
 262
 263                /* for writes we need to do some extra housekeeping */
 264                if (rw == WRITE)
 265                        _about_to_complete_local_write(mdev, req);
 266
 267                /* Update disk stats */
 268                _drbd_end_io_acct(mdev, req);
 269
 270                m->error = ok ? 0 : (error ?: -EIO);
 271                m->bio = req->master_bio;
 272                req->master_bio = NULL;
 273        }
 274
 275        if ((s & RQ_NET_MASK) == 0 || (s & RQ_NET_DONE)) {
 276                /* this is disconnected (local only) operation,
 277                 * or protocol C P_WRITE_ACK,
 278                 * or protocol A or B P_BARRIER_ACK,
 279                 * or killed from the transfer log due to connection loss. */
 280                _req_is_done(mdev, req, rw);
 281        }
 282        /* else: network part and not DONE yet. that is
 283         * protocol A or B, barrier ack still pending... */
 284}
 285
 286static void _req_may_be_done_not_susp(struct drbd_request *req, struct bio_and_error *m)
 287{
 288        struct drbd_conf *mdev = req->mdev;
 289
 290        if (!is_susp(mdev->state))
 291                _req_may_be_done(req, m);
 292}
 293
 294/*
 295 * checks whether there was an overlapping request
 296 * or ee already registered.
 297 *
 298 * if so, return 1, in which case this request is completed on the spot,
 299 * without ever being submitted or send.
 300 *
 301 * return 0 if it is ok to submit this request.
 302 *
 303 * NOTE:
 304 * paranoia: assume something above us is broken, and issues different write
 305 * requests for the same block simultaneously...
 306 *
 307 * To ensure these won't be reordered differently on both nodes, resulting in
 308 * diverging data sets, we discard the later one(s). Not that this is supposed
 309 * to happen, but this is the rationale why we also have to check for
 310 * conflicting requests with local origin, and why we have to do so regardless
 311 * of whether we allowed multiple primaries.
 312 *
 313 * BTW, in case we only have one primary, the ee_hash is empty anyways, and the
 314 * second hlist_for_each_entry becomes a noop. This is even simpler than to
 315 * grab a reference on the net_conf, and check for the two_primaries flag...
 316 */
 317static int _req_conflicts(struct drbd_request *req)
 318{
 319        struct drbd_conf *mdev = req->mdev;
 320        const sector_t sector = req->sector;
 321        const int size = req->size;
 322        struct drbd_request *i;
 323        struct drbd_epoch_entry *e;
 324        struct hlist_node *n;
 325        struct hlist_head *slot;
 326
 327        D_ASSERT(hlist_unhashed(&req->colision));
 328
 329        if (!get_net_conf(mdev))
 330                return 0;
 331
 332        /* BUG_ON */
 333        ERR_IF (mdev->tl_hash_s == 0)
 334                goto out_no_conflict;
 335        BUG_ON(mdev->tl_hash == NULL);
 336
 337#define OVERLAPS overlaps(i->sector, i->size, sector, size)
 338        slot = tl_hash_slot(mdev, sector);
 339        hlist_for_each_entry(i, n, slot, colision) {
 340                if (OVERLAPS) {
 341                        dev_alert(DEV, "%s[%u] Concurrent local write detected! "
 342                              "[DISCARD L] new: %llus +%u; "
 343                              "pending: %llus +%u\n",
 344                              current->comm, current->pid,
 345                              (unsigned long long)sector, size,
 346                              (unsigned long long)i->sector, i->size);
 347                        goto out_conflict;
 348                }
 349        }
 350
 351        if (mdev->ee_hash_s) {
 352                /* now, check for overlapping requests with remote origin */
 353                BUG_ON(mdev->ee_hash == NULL);
 354#undef OVERLAPS
 355#define OVERLAPS overlaps(e->sector, e->size, sector, size)
 356                slot = ee_hash_slot(mdev, sector);
 357                hlist_for_each_entry(e, n, slot, colision) {
 358                        if (OVERLAPS) {
 359                                dev_alert(DEV, "%s[%u] Concurrent remote write detected!"
 360                                      " [DISCARD L] new: %llus +%u; "
 361                                      "pending: %llus +%u\n",
 362                                      current->comm, current->pid,
 363                                      (unsigned long long)sector, size,
 364                                      (unsigned long long)e->sector, e->size);
 365                                goto out_conflict;
 366                        }
 367                }
 368        }
 369#undef OVERLAPS
 370
 371out_no_conflict:
 372        /* this is like it should be, and what we expected.
 373         * our users do behave after all... */
 374        put_net_conf(mdev);
 375        return 0;
 376
 377out_conflict:
 378        put_net_conf(mdev);
 379        return 1;
 380}
 381
 382/* obviously this could be coded as many single functions
 383 * instead of one huge switch,
 384 * or by putting the code directly in the respective locations
 385 * (as it has been before).
 386 *
 387 * but having it this way
 388 *  enforces that it is all in this one place, where it is easier to audit,
 389 *  it makes it obvious that whatever "event" "happens" to a request should
 390 *  happen "atomically" within the req_lock,
 391 *  and it enforces that we have to think in a very structured manner
 392 *  about the "events" that may happen to a request during its life time ...
 393 */
 394int __req_mod(struct drbd_request *req, enum drbd_req_event what,
 395                struct bio_and_error *m)
 396{
 397        struct drbd_conf *mdev = req->mdev;
 398        int rv = 0;
 399        m->bio = NULL;
 400
 401        switch (what) {
 402        default:
 403                dev_err(DEV, "LOGIC BUG in %s:%u\n", __FILE__ , __LINE__);
 404                break;
 405
 406        /* does not happen...
 407         * initialization done in drbd_req_new
 408        case created:
 409                break;
 410                */
 411
 412        case to_be_send: /* via network */
 413                /* reached via drbd_make_request_common
 414                 * and from w_read_retry_remote */
 415                D_ASSERT(!(req->rq_state & RQ_NET_MASK));
 416                req->rq_state |= RQ_NET_PENDING;
 417                inc_ap_pending(mdev);
 418                break;
 419
 420        case to_be_submitted: /* locally */
 421                /* reached via drbd_make_request_common */
 422                D_ASSERT(!(req->rq_state & RQ_LOCAL_MASK));
 423                req->rq_state |= RQ_LOCAL_PENDING;
 424                break;
 425
 426        case completed_ok:
 427                if (bio_data_dir(req->master_bio) == WRITE)
 428                        mdev->writ_cnt += req->size>>9;
 429                else
 430                        mdev->read_cnt += req->size>>9;
 431
 432                req->rq_state |= (RQ_LOCAL_COMPLETED|RQ_LOCAL_OK);
 433                req->rq_state &= ~RQ_LOCAL_PENDING;
 434
 435                _req_may_be_done_not_susp(req, m);
 436                put_ldev(mdev);
 437                break;
 438
 439        case write_completed_with_error:
 440                req->rq_state |= RQ_LOCAL_COMPLETED;
 441                req->rq_state &= ~RQ_LOCAL_PENDING;
 442
 443                __drbd_chk_io_error(mdev, FALSE);
 444                _req_may_be_done_not_susp(req, m);
 445                put_ldev(mdev);
 446                break;
 447
 448        case read_ahead_completed_with_error:
 449                /* it is legal to fail READA */
 450                req->rq_state |= RQ_LOCAL_COMPLETED;
 451                req->rq_state &= ~RQ_LOCAL_PENDING;
 452                _req_may_be_done_not_susp(req, m);
 453                put_ldev(mdev);
 454                break;
 455
 456        case read_completed_with_error:
 457                drbd_set_out_of_sync(mdev, req->sector, req->size);
 458
 459                req->rq_state |= RQ_LOCAL_COMPLETED;
 460                req->rq_state &= ~RQ_LOCAL_PENDING;
 461
 462                D_ASSERT(!(req->rq_state & RQ_NET_MASK));
 463
 464                __drbd_chk_io_error(mdev, FALSE);
 465                put_ldev(mdev);
 466
 467                /* no point in retrying if there is no good remote data,
 468                 * or we have no connection. */
 469                if (mdev->state.pdsk != D_UP_TO_DATE) {
 470                        _req_may_be_done_not_susp(req, m);
 471                        break;
 472                }
 473
 474                /* _req_mod(req,to_be_send); oops, recursion... */
 475                req->rq_state |= RQ_NET_PENDING;
 476                inc_ap_pending(mdev);
 477                /* fall through: _req_mod(req,queue_for_net_read); */
 478
 479        case queue_for_net_read:
 480                /* READ or READA, and
 481                 * no local disk,
 482                 * or target area marked as invalid,
 483                 * or just got an io-error. */
 484                /* from drbd_make_request_common
 485                 * or from bio_endio during read io-error recovery */
 486
 487                /* so we can verify the handle in the answer packet
 488                 * corresponding hlist_del is in _req_may_be_done() */
 489                hlist_add_head(&req->colision, ar_hash_slot(mdev, req->sector));
 490
 491                set_bit(UNPLUG_REMOTE, &mdev->flags);
 492
 493                D_ASSERT(req->rq_state & RQ_NET_PENDING);
 494                req->rq_state |= RQ_NET_QUEUED;
 495                req->w.cb = (req->rq_state & RQ_LOCAL_MASK)
 496                        ? w_read_retry_remote
 497                        : w_send_read_req;
 498                drbd_queue_work(&mdev->data.work, &req->w);
 499                break;
 500
 501        case queue_for_net_write:
 502                /* assert something? */
 503                /* from drbd_make_request_common only */
 504
 505                hlist_add_head(&req->colision, tl_hash_slot(mdev, req->sector));
 506                /* corresponding hlist_del is in _req_may_be_done() */
 507
 508                /* NOTE
 509                 * In case the req ended up on the transfer log before being
 510                 * queued on the worker, it could lead to this request being
 511                 * missed during cleanup after connection loss.
 512                 * So we have to do both operations here,
 513                 * within the same lock that protects the transfer log.
 514                 *
 515                 * _req_add_to_epoch(req); this has to be after the
 516                 * _maybe_start_new_epoch(req); which happened in
 517                 * drbd_make_request_common, because we now may set the bit
 518                 * again ourselves to close the current epoch.
 519                 *
 520                 * Add req to the (now) current epoch (barrier). */
 521
 522                /* otherwise we may lose an unplug, which may cause some remote
 523                 * io-scheduler timeout to expire, increasing maximum latency,
 524                 * hurting performance. */
 525                set_bit(UNPLUG_REMOTE, &mdev->flags);
 526
 527                /* see drbd_make_request_common,
 528                 * just after it grabs the req_lock */
 529                D_ASSERT(test_bit(CREATE_BARRIER, &mdev->flags) == 0);
 530
 531                req->epoch = mdev->newest_tle->br_number;
 532
 533                /* increment size of current epoch */
 534                mdev->newest_tle->n_writes++;
 535
 536                /* queue work item to send data */
 537                D_ASSERT(req->rq_state & RQ_NET_PENDING);
 538                req->rq_state |= RQ_NET_QUEUED;
 539                req->w.cb =  w_send_dblock;
 540                drbd_queue_work(&mdev->data.work, &req->w);
 541
 542                /* close the epoch, in case it outgrew the limit */
 543                if (mdev->newest_tle->n_writes >= mdev->net_conf->max_epoch_size)
 544                        queue_barrier(mdev);
 545
 546                break;
 547
 548        case send_canceled:
 549                /* treat it the same */
 550        case send_failed:
 551                /* real cleanup will be done from tl_clear.  just update flags
 552                 * so it is no longer marked as on the worker queue */
 553                req->rq_state &= ~RQ_NET_QUEUED;
 554                /* if we did it right, tl_clear should be scheduled only after
 555                 * this, so this should not be necessary! */
 556                _req_may_be_done_not_susp(req, m);
 557                break;
 558
 559        case handed_over_to_network:
 560                /* assert something? */
 561                if (bio_data_dir(req->master_bio) == WRITE &&
 562                    mdev->net_conf->wire_protocol == DRBD_PROT_A) {
 563                        /* this is what is dangerous about protocol A:
 564                         * pretend it was successfully written on the peer. */
 565                        if (req->rq_state & RQ_NET_PENDING) {
 566                                dec_ap_pending(mdev);
 567                                req->rq_state &= ~RQ_NET_PENDING;
 568                                req->rq_state |= RQ_NET_OK;
 569                        } /* else: neg-ack was faster... */
 570                        /* it is still not yet RQ_NET_DONE until the
 571                         * corresponding epoch barrier got acked as well,
 572                         * so we know what to dirty on connection loss */
 573                }
 574                req->rq_state &= ~RQ_NET_QUEUED;
 575                req->rq_state |= RQ_NET_SENT;
 576                /* because _drbd_send_zc_bio could sleep, and may want to
 577                 * dereference the bio even after the "write_acked_by_peer" and
 578                 * "completed_ok" events came in, once we return from
 579                 * _drbd_send_zc_bio (drbd_send_dblock), we have to check
 580                 * whether it is done already, and end it.  */
 581                _req_may_be_done_not_susp(req, m);
 582                break;
 583
 584        case read_retry_remote_canceled:
 585                req->rq_state &= ~RQ_NET_QUEUED;
 586                /* fall through, in case we raced with drbd_disconnect */
 587        case connection_lost_while_pending:
 588                /* transfer log cleanup after connection loss */
 589                /* assert something? */
 590                if (req->rq_state & RQ_NET_PENDING)
 591                        dec_ap_pending(mdev);
 592                req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING);
 593                req->rq_state |= RQ_NET_DONE;
 594                /* if it is still queued, we may not complete it here.
 595                 * it will be canceled soon. */
 596                if (!(req->rq_state & RQ_NET_QUEUED))
 597                        _req_may_be_done(req, m); /* Allowed while state.susp */
 598                break;
 599
 600        case write_acked_by_peer_and_sis:
 601                req->rq_state |= RQ_NET_SIS;
 602        case conflict_discarded_by_peer:
 603                /* for discarded conflicting writes of multiple primaries,
 604                 * there is no need to keep anything in the tl, potential
 605                 * node crashes are covered by the activity log. */
 606                if (what == conflict_discarded_by_peer)
 607                        dev_alert(DEV, "Got DiscardAck packet %llus +%u!"
 608                              " DRBD is not a random data generator!\n",
 609                              (unsigned long long)req->sector, req->size);
 610                req->rq_state |= RQ_NET_DONE;
 611                /* fall through */
 612        case write_acked_by_peer:
 613                /* protocol C; successfully written on peer.
 614                 * Nothing to do here.
 615                 * We want to keep the tl in place for all protocols, to cater
 616                 * for volatile write-back caches on lower level devices.
 617                 *
 618                 * A barrier request is expected to have forced all prior
 619                 * requests onto stable storage, so completion of a barrier
 620                 * request could set NET_DONE right here, and not wait for the
 621                 * P_BARRIER_ACK, but that is an unnecessary optimization. */
 622
 623                /* this makes it effectively the same as for: */
 624        case recv_acked_by_peer:
 625                /* protocol B; pretends to be successfully written on peer.
 626                 * see also notes above in handed_over_to_network about
 627                 * protocol != C */
 628                req->rq_state |= RQ_NET_OK;
 629                D_ASSERT(req->rq_state & RQ_NET_PENDING);
 630                dec_ap_pending(mdev);
 631                req->rq_state &= ~RQ_NET_PENDING;
 632                _req_may_be_done_not_susp(req, m);
 633                break;
 634
 635        case neg_acked:
 636                /* assert something? */
 637                if (req->rq_state & RQ_NET_PENDING)
 638                        dec_ap_pending(mdev);
 639                req->rq_state &= ~(RQ_NET_OK|RQ_NET_PENDING);
 640
 641                req->rq_state |= RQ_NET_DONE;
 642                _req_may_be_done_not_susp(req, m);
 643                /* else: done by handed_over_to_network */
 644                break;
 645
 646        case fail_frozen_disk_io:
 647                if (!(req->rq_state & RQ_LOCAL_COMPLETED))
 648                        break;
 649
 650                _req_may_be_done(req, m); /* Allowed while state.susp */
 651                break;
 652
 653        case restart_frozen_disk_io:
 654                if (!(req->rq_state & RQ_LOCAL_COMPLETED))
 655                        break;
 656
 657                req->rq_state &= ~RQ_LOCAL_COMPLETED;
 658
 659                rv = MR_READ;
 660                if (bio_data_dir(req->master_bio) == WRITE)
 661                        rv = MR_WRITE;
 662
 663                get_ldev(mdev);
 664                req->w.cb = w_restart_disk_io;
 665                drbd_queue_work(&mdev->data.work, &req->w);
 666                break;
 667
 668        case resend:
 669                /* If RQ_NET_OK is already set, we got a P_WRITE_ACK or P_RECV_ACK
 670                   before the connection loss (B&C only); only P_BARRIER_ACK was missing.
 671                   Trowing them out of the TL here by pretending we got a BARRIER_ACK
 672                   We ensure that the peer was not rebooted */
 673                if (!(req->rq_state & RQ_NET_OK)) {
 674                        if (req->w.cb) {
 675                                drbd_queue_work(&mdev->data.work, &req->w);
 676                                rv = req->rq_state & RQ_WRITE ? MR_WRITE : MR_READ;
 677                        }
 678                        break;
 679                }
 680                /* else, fall through to barrier_acked */
 681
 682        case barrier_acked:
 683                if (!(req->rq_state & RQ_WRITE))
 684                        break;
 685
 686                if (req->rq_state & RQ_NET_PENDING) {
 687                        /* barrier came in before all requests have been acked.
 688                         * this is bad, because if the connection is lost now,
 689                         * we won't be able to clean them up... */
 690                        dev_err(DEV, "FIXME (barrier_acked but pending)\n");
 691                        list_move(&req->tl_requests, &mdev->out_of_sequence_requests);
 692                }
 693                D_ASSERT(req->rq_state & RQ_NET_SENT);
 694                req->rq_state |= RQ_NET_DONE;
 695                _req_may_be_done(req, m); /* Allowed while state.susp */
 696                break;
 697
 698        case data_received:
 699                D_ASSERT(req->rq_state & RQ_NET_PENDING);
 700                dec_ap_pending(mdev);
 701                req->rq_state &= ~RQ_NET_PENDING;
 702                req->rq_state |= (RQ_NET_OK|RQ_NET_DONE);
 703                _req_may_be_done_not_susp(req, m);
 704                break;
 705        };
 706
 707        return rv;
 708}
 709
 710/* we may do a local read if:
 711 * - we are consistent (of course),
 712 * - or we are generally inconsistent,
 713 *   BUT we are still/already IN SYNC for this area.
 714 *   since size may be bigger than BM_BLOCK_SIZE,
 715 *   we may need to check several bits.
 716 */
 717static int drbd_may_do_local_read(struct drbd_conf *mdev, sector_t sector, int size)
 718{
 719        unsigned long sbnr, ebnr;
 720        sector_t esector, nr_sectors;
 721
 722        if (mdev->state.disk == D_UP_TO_DATE)
 723                return 1;
 724        if (mdev->state.disk >= D_OUTDATED)
 725                return 0;
 726        if (mdev->state.disk <  D_INCONSISTENT)
 727                return 0;
 728        /* state.disk == D_INCONSISTENT   We will have a look at the BitMap */
 729        nr_sectors = drbd_get_capacity(mdev->this_bdev);
 730        esector = sector + (size >> 9) - 1;
 731
 732        D_ASSERT(sector  < nr_sectors);
 733        D_ASSERT(esector < nr_sectors);
 734
 735        sbnr = BM_SECT_TO_BIT(sector);
 736        ebnr = BM_SECT_TO_BIT(esector);
 737
 738        return 0 == drbd_bm_count_bits(mdev, sbnr, ebnr);
 739}
 740
 741static int drbd_make_request_common(struct drbd_conf *mdev, struct bio *bio)
 742{
 743        const int rw = bio_rw(bio);
 744        const int size = bio->bi_size;
 745        const sector_t sector = bio->bi_sector;
 746        struct drbd_tl_epoch *b = NULL;
 747        struct drbd_request *req;
 748        int local, remote;
 749        int err = -EIO;
 750        int ret = 0;
 751
 752        /* allocate outside of all locks; */
 753        req = drbd_req_new(mdev, bio);
 754        if (!req) {
 755                dec_ap_bio(mdev);
 756                /* only pass the error to the upper layers.
 757                 * if user cannot handle io errors, that's not our business. */
 758                dev_err(DEV, "could not kmalloc() req\n");
 759                bio_endio(bio, -ENOMEM);
 760                return 0;
 761        }
 762
 763        local = get_ldev(mdev);
 764        if (!local) {
 765                bio_put(req->private_bio); /* or we get a bio leak */
 766                req->private_bio = NULL;
 767        }
 768        if (rw == WRITE) {
 769                remote = 1;
 770        } else {
 771                /* READ || READA */
 772                if (local) {
 773                        if (!drbd_may_do_local_read(mdev, sector, size)) {
 774                                /* we could kick the syncer to
 775                                 * sync this extent asap, wait for
 776                                 * it, then continue locally.
 777                                 * Or just issue the request remotely.
 778                                 */
 779                                local = 0;
 780                                bio_put(req->private_bio);
 781                                req->private_bio = NULL;
 782                                put_ldev(mdev);
 783                        }
 784                }
 785                remote = !local && mdev->state.pdsk >= D_UP_TO_DATE;
 786        }
 787
 788        /* If we have a disk, but a READA request is mapped to remote,
 789         * we are R_PRIMARY, D_INCONSISTENT, SyncTarget.
 790         * Just fail that READA request right here.
 791         *
 792         * THINK: maybe fail all READA when not local?
 793         *        or make this configurable...
 794         *        if network is slow, READA won't do any good.
 795         */
 796        if (rw == READA && mdev->state.disk >= D_INCONSISTENT && !local) {
 797                err = -EWOULDBLOCK;
 798                goto fail_and_free_req;
 799        }
 800
 801        /* For WRITES going to the local disk, grab a reference on the target
 802         * extent.  This waits for any resync activity in the corresponding
 803         * resync extent to finish, and, if necessary, pulls in the target
 804         * extent into the activity log, which involves further disk io because
 805         * of transactional on-disk meta data updates. */
 806        if (rw == WRITE && local && !test_bit(AL_SUSPENDED, &mdev->flags)) {
 807                req->rq_state |= RQ_IN_ACT_LOG;
 808                drbd_al_begin_io(mdev, sector);
 809        }
 810
 811        remote = remote && (mdev->state.pdsk == D_UP_TO_DATE ||
 812                            (mdev->state.pdsk == D_INCONSISTENT &&
 813                             mdev->state.conn >= C_CONNECTED));
 814
 815        if (!(local || remote) && !is_susp(mdev->state)) {
 816                if (__ratelimit(&drbd_ratelimit_state))
 817                        dev_err(DEV, "IO ERROR: neither local nor remote disk\n");
 818                goto fail_free_complete;
 819        }
 820
 821        /* For WRITE request, we have to make sure that we have an
 822         * unused_spare_tle, in case we need to start a new epoch.
 823         * I try to be smart and avoid to pre-allocate always "just in case",
 824         * but there is a race between testing the bit and pointer outside the
 825         * spinlock, and grabbing the spinlock.
 826         * if we lost that race, we retry.  */
 827        if (rw == WRITE && remote &&
 828            mdev->unused_spare_tle == NULL &&
 829            test_bit(CREATE_BARRIER, &mdev->flags)) {
 830allocate_barrier:
 831                b = kmalloc(sizeof(struct drbd_tl_epoch), GFP_NOIO);
 832                if (!b) {
 833                        dev_err(DEV, "Failed to alloc barrier.\n");
 834                        err = -ENOMEM;
 835                        goto fail_free_complete;
 836                }
 837        }
 838
 839        /* GOOD, everything prepared, grab the spin_lock */
 840        spin_lock_irq(&mdev->req_lock);
 841
 842        if (is_susp(mdev->state)) {
 843                /* If we got suspended, use the retry mechanism of
 844                   generic_make_request() to restart processing of this
 845                   bio. In the next call to drbd_make_request_26
 846                   we sleep in inc_ap_bio() */
 847                ret = 1;
 848                spin_unlock_irq(&mdev->req_lock);
 849                goto fail_free_complete;
 850        }
 851
 852        if (remote) {
 853                remote = (mdev->state.pdsk == D_UP_TO_DATE ||
 854                            (mdev->state.pdsk == D_INCONSISTENT &&
 855                             mdev->state.conn >= C_CONNECTED));
 856                if (!remote)
 857                        dev_warn(DEV, "lost connection while grabbing the req_lock!\n");
 858                if (!(local || remote)) {
 859                        dev_err(DEV, "IO ERROR: neither local nor remote disk\n");
 860                        spin_unlock_irq(&mdev->req_lock);
 861                        goto fail_free_complete;
 862                }
 863        }
 864
 865        if (b && mdev->unused_spare_tle == NULL) {
 866                mdev->unused_spare_tle = b;
 867                b = NULL;
 868        }
 869        if (rw == WRITE && remote &&
 870            mdev->unused_spare_tle == NULL &&
 871            test_bit(CREATE_BARRIER, &mdev->flags)) {
 872                /* someone closed the current epoch
 873                 * while we were grabbing the spinlock */
 874                spin_unlock_irq(&mdev->req_lock);
 875                goto allocate_barrier;
 876        }
 877
 878
 879        /* Update disk stats */
 880        _drbd_start_io_acct(mdev, req, bio);
 881
 882        /* _maybe_start_new_epoch(mdev);
 883         * If we need to generate a write barrier packet, we have to add the
 884         * new epoch (barrier) object, and queue the barrier packet for sending,
 885         * and queue the req's data after it _within the same lock_, otherwise
 886         * we have race conditions were the reorder domains could be mixed up.
 887         *
 888         * Even read requests may start a new epoch and queue the corresponding
 889         * barrier packet.  To get the write ordering right, we only have to
 890         * make sure that, if this is a write request and it triggered a
 891         * barrier packet, this request is queued within the same spinlock. */
 892        if (remote && mdev->unused_spare_tle &&
 893            test_and_clear_bit(CREATE_BARRIER, &mdev->flags)) {
 894                _tl_add_barrier(mdev, mdev->unused_spare_tle);
 895                mdev->unused_spare_tle = NULL;
 896        } else {
 897                D_ASSERT(!(remote && rw == WRITE &&
 898                           test_bit(CREATE_BARRIER, &mdev->flags)));
 899        }
 900
 901        /* NOTE
 902         * Actually, 'local' may be wrong here already, since we may have failed
 903         * to write to the meta data, and may become wrong anytime because of
 904         * local io-error for some other request, which would lead to us
 905         * "detaching" the local disk.
 906         *
 907         * 'remote' may become wrong any time because the network could fail.
 908         *
 909         * This is a harmless race condition, though, since it is handled
 910         * correctly at the appropriate places; so it just defers the failure
 911         * of the respective operation.
 912         */
 913
 914        /* mark them early for readability.
 915         * this just sets some state flags. */
 916        if (remote)
 917                _req_mod(req, to_be_send);
 918        if (local)
 919                _req_mod(req, to_be_submitted);
 920
 921        /* check this request on the collision detection hash tables.
 922         * if we have a conflict, just complete it here.
 923         * THINK do we want to check reads, too? (I don't think so...) */
 924        if (rw == WRITE && _req_conflicts(req))
 925                goto fail_conflicting;
 926
 927        list_add_tail(&req->tl_requests, &mdev->newest_tle->requests);
 928
 929        /* NOTE remote first: to get the concurrent write detection right,
 930         * we must register the request before start of local IO.  */
 931        if (remote) {
 932                /* either WRITE and C_CONNECTED,
 933                 * or READ, and no local disk,
 934                 * or READ, but not in sync.
 935                 */
 936                _req_mod(req, (rw == WRITE)
 937                                ? queue_for_net_write
 938                                : queue_for_net_read);
 939        }
 940        spin_unlock_irq(&mdev->req_lock);
 941        kfree(b); /* if someone else has beaten us to it... */
 942
 943        if (local) {
 944                req->private_bio->bi_bdev = mdev->ldev->backing_bdev;
 945
 946                /* State may have changed since we grabbed our reference on the
 947                 * mdev->ldev member. Double check, and short-circuit to endio.
 948                 * In case the last activity log transaction failed to get on
 949                 * stable storage, and this is a WRITE, we may not even submit
 950                 * this bio. */
 951                if (get_ldev(mdev)) {
 952                        if (FAULT_ACTIVE(mdev, rw == WRITE ? DRBD_FAULT_DT_WR
 953                                             : rw == READ  ? DRBD_FAULT_DT_RD
 954                                             :               DRBD_FAULT_DT_RA))
 955                                bio_endio(req->private_bio, -EIO);
 956                        else
 957                                generic_make_request(req->private_bio);
 958                        put_ldev(mdev);
 959                } else
 960                        bio_endio(req->private_bio, -EIO);
 961        }
 962
 963        /* we need to plug ALWAYS since we possibly need to kick lo_dev.
 964         * we plug after submit, so we won't miss an unplug event */
 965        drbd_plug_device(mdev);
 966
 967        return 0;
 968
 969fail_conflicting:
 970        /* this is a conflicting request.
 971         * even though it may have been only _partially_
 972         * overlapping with one of the currently pending requests,
 973         * without even submitting or sending it, we will
 974         * pretend that it was successfully served right now.
 975         */
 976        _drbd_end_io_acct(mdev, req);
 977        spin_unlock_irq(&mdev->req_lock);
 978        if (remote)
 979                dec_ap_pending(mdev);
 980        /* THINK: do we want to fail it (-EIO), or pretend success?
 981         * this pretends success. */
 982        err = 0;
 983
 984fail_free_complete:
 985        if (rw == WRITE && local)
 986                drbd_al_complete_io(mdev, sector);
 987fail_and_free_req:
 988        if (local) {
 989                bio_put(req->private_bio);
 990                req->private_bio = NULL;
 991                put_ldev(mdev);
 992        }
 993        if (!ret)
 994                bio_endio(bio, err);
 995
 996        drbd_req_free(req);
 997        dec_ap_bio(mdev);
 998        kfree(b);
 999
1000        return ret;
1001}
1002
1003/* helper function for drbd_make_request
1004 * if we can determine just by the mdev (state) that this request will fail,
1005 * return 1
1006 * otherwise return 0
1007 */
1008static int drbd_fail_request_early(struct drbd_conf *mdev, int is_write)
1009{
1010        if (mdev->state.role != R_PRIMARY &&
1011                (!allow_oos || is_write)) {
1012                if (__ratelimit(&drbd_ratelimit_state)) {
1013                        dev_err(DEV, "Process %s[%u] tried to %s; "
1014                            "since we are not in Primary state, "
1015                            "we cannot allow this\n",
1016                            current->comm, current->pid,
1017                            is_write ? "WRITE" : "READ");
1018                }
1019                return 1;
1020        }
1021
1022        return 0;
1023}
1024
1025int drbd_make_request_26(struct request_queue *q, struct bio *bio)
1026{
1027        unsigned int s_enr, e_enr;
1028        struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;
1029
1030        if (drbd_fail_request_early(mdev, bio_data_dir(bio) & WRITE)) {
1031                bio_endio(bio, -EPERM);
1032                return 0;
1033        }
1034
1035        /*
1036         * what we "blindly" assume:
1037         */
1038        D_ASSERT(bio->bi_size > 0);
1039        D_ASSERT((bio->bi_size & 0x1ff) == 0);
1040        D_ASSERT(bio->bi_idx == 0);
1041
1042        /* to make some things easier, force alignment of requests within the
1043         * granularity of our hash tables */
1044        s_enr = bio->bi_sector >> HT_SHIFT;
1045        e_enr = (bio->bi_sector+(bio->bi_size>>9)-1) >> HT_SHIFT;
1046
1047        if (likely(s_enr == e_enr)) {
1048                inc_ap_bio(mdev, 1);
1049                return drbd_make_request_common(mdev, bio);
1050        }
1051
1052        /* can this bio be split generically?
1053         * Maybe add our own split-arbitrary-bios function. */
1054        if (bio->bi_vcnt != 1 || bio->bi_idx != 0 || bio->bi_size > DRBD_MAX_SEGMENT_SIZE) {
1055                /* rather error out here than BUG in bio_split */
1056                dev_err(DEV, "bio would need to, but cannot, be split: "
1057                    "(vcnt=%u,idx=%u,size=%u,sector=%llu)\n",
1058                    bio->bi_vcnt, bio->bi_idx, bio->bi_size,
1059                    (unsigned long long)bio->bi_sector);
1060                bio_endio(bio, -EINVAL);
1061        } else {
1062                /* This bio crosses some boundary, so we have to split it. */
1063                struct bio_pair *bp;
1064                /* works for the "do not cross hash slot boundaries" case
1065                 * e.g. sector 262269, size 4096
1066                 * s_enr = 262269 >> 6 = 4097
1067                 * e_enr = (262269+8-1) >> 6 = 4098
1068                 * HT_SHIFT = 6
1069                 * sps = 64, mask = 63
1070                 * first_sectors = 64 - (262269 & 63) = 3
1071                 */
1072                const sector_t sect = bio->bi_sector;
1073                const int sps = 1 << HT_SHIFT; /* sectors per slot */
1074                const int mask = sps - 1;
1075                const sector_t first_sectors = sps - (sect & mask);
1076                bp = bio_split(bio,
1077#if LINUX_VERSION_CODE < KERNEL_VERSION(2,6,28)
1078                                bio_split_pool,
1079#endif
1080                                first_sectors);
1081
1082                /* we need to get a "reference count" (ap_bio_cnt)
1083                 * to avoid races with the disconnect/reconnect/suspend code.
1084                 * In case we need to split the bio here, we need to get three references
1085                 * atomically, otherwise we might deadlock when trying to submit the
1086                 * second one! */
1087                inc_ap_bio(mdev, 3);
1088
1089                D_ASSERT(e_enr == s_enr + 1);
1090
1091                while (drbd_make_request_common(mdev, &bp->bio1))
1092                        inc_ap_bio(mdev, 1);
1093
1094                while (drbd_make_request_common(mdev, &bp->bio2))
1095                        inc_ap_bio(mdev, 1);
1096
1097                dec_ap_bio(mdev);
1098
1099                bio_pair_release(bp);
1100        }
1101        return 0;
1102}
1103
1104/* This is called by bio_add_page().  With this function we reduce
1105 * the number of BIOs that span over multiple DRBD_MAX_SEGMENT_SIZEs
1106 * units (was AL_EXTENTs).
1107 *
1108 * we do the calculation within the lower 32bit of the byte offsets,
1109 * since we don't care for actual offset, but only check whether it
1110 * would cross "activity log extent" boundaries.
1111 *
1112 * As long as the BIO is empty we have to allow at least one bvec,
1113 * regardless of size and offset.  so the resulting bio may still
1114 * cross extent boundaries.  those are dealt with (bio_split) in
1115 * drbd_make_request_26.
1116 */
1117int drbd_merge_bvec(struct request_queue *q, struct bvec_merge_data *bvm, struct bio_vec *bvec)
1118{
1119        struct drbd_conf *mdev = (struct drbd_conf *) q->queuedata;
1120        unsigned int bio_offset =
1121                (unsigned int)bvm->bi_sector << 9; /* 32 bit */
1122        unsigned int bio_size = bvm->bi_size;
1123        int limit, backing_limit;
1124
1125        limit = DRBD_MAX_SEGMENT_SIZE
1126              - ((bio_offset & (DRBD_MAX_SEGMENT_SIZE-1)) + bio_size);
1127        if (limit < 0)
1128                limit = 0;
1129        if (bio_size == 0) {
1130                if (limit <= bvec->bv_len)
1131                        limit = bvec->bv_len;
1132        } else if (limit && get_ldev(mdev)) {
1133                struct request_queue * const b =
1134                        mdev->ldev->backing_bdev->bd_disk->queue;
1135                if (b->merge_bvec_fn) {
1136                        backing_limit = b->merge_bvec_fn(b, bvm, bvec);
1137                        limit = min(limit, backing_limit);
1138                }
1139                put_ldev(mdev);
1140        }
1141        return limit;
1142}
1143