linux/drivers/block/drbd/drbd_receiver.c
<<
>>
Prefs
   1/*
   2   drbd_receiver.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23 */
  24
  25
  26#include <linux/module.h>
  27
  28#include <asm/uaccess.h>
  29#include <net/sock.h>
  30
  31#include <linux/drbd.h>
  32#include <linux/fs.h>
  33#include <linux/file.h>
  34#include <linux/in.h>
  35#include <linux/mm.h>
  36#include <linux/memcontrol.h>
  37#include <linux/mm_inline.h>
  38#include <linux/slab.h>
  39#include <linux/pkt_sched.h>
  40#define __KERNEL_SYSCALLS__
  41#include <linux/unistd.h>
  42#include <linux/vmalloc.h>
  43#include <linux/random.h>
  44#include <linux/string.h>
  45#include <linux/scatterlist.h>
  46#include "drbd_int.h"
  47#include "drbd_req.h"
  48
  49#include "drbd_vli.h"
  50
  51enum finish_epoch {
  52        FE_STILL_LIVE,
  53        FE_DESTROYED,
  54        FE_RECYCLED,
  55};
  56
  57static int drbd_do_handshake(struct drbd_conf *mdev);
  58static int drbd_do_auth(struct drbd_conf *mdev);
  59
  60static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *, struct drbd_epoch *, enum epoch_event);
  61static int e_end_block(struct drbd_conf *, struct drbd_work *, int);
  62
  63
  64#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
  65
  66/*
  67 * some helper functions to deal with single linked page lists,
  68 * page->private being our "next" pointer.
  69 */
  70
  71/* If at least n pages are linked at head, get n pages off.
  72 * Otherwise, don't modify head, and return NULL.
  73 * Locking is the responsibility of the caller.
  74 */
  75static struct page *page_chain_del(struct page **head, int n)
  76{
  77        struct page *page;
  78        struct page *tmp;
  79
  80        BUG_ON(!n);
  81        BUG_ON(!head);
  82
  83        page = *head;
  84
  85        if (!page)
  86                return NULL;
  87
  88        while (page) {
  89                tmp = page_chain_next(page);
  90                if (--n == 0)
  91                        break; /* found sufficient pages */
  92                if (tmp == NULL)
  93                        /* insufficient pages, don't use any of them. */
  94                        return NULL;
  95                page = tmp;
  96        }
  97
  98        /* add end of list marker for the returned list */
  99        set_page_private(page, 0);
 100        /* actual return value, and adjustment of head */
 101        page = *head;
 102        *head = tmp;
 103        return page;
 104}
 105
 106/* may be used outside of locks to find the tail of a (usually short)
 107 * "private" page chain, before adding it back to a global chain head
 108 * with page_chain_add() under a spinlock. */
 109static struct page *page_chain_tail(struct page *page, int *len)
 110{
 111        struct page *tmp;
 112        int i = 1;
 113        while ((tmp = page_chain_next(page)))
 114                ++i, page = tmp;
 115        if (len)
 116                *len = i;
 117        return page;
 118}
 119
 120static int page_chain_free(struct page *page)
 121{
 122        struct page *tmp;
 123        int i = 0;
 124        page_chain_for_each_safe(page, tmp) {
 125                put_page(page);
 126                ++i;
 127        }
 128        return i;
 129}
 130
 131static void page_chain_add(struct page **head,
 132                struct page *chain_first, struct page *chain_last)
 133{
 134#if 1
 135        struct page *tmp;
 136        tmp = page_chain_tail(chain_first, NULL);
 137        BUG_ON(tmp != chain_last);
 138#endif
 139
 140        /* add chain to head */
 141        set_page_private(chain_last, (unsigned long)*head);
 142        *head = chain_first;
 143}
 144
 145static struct page *drbd_pp_first_pages_or_try_alloc(struct drbd_conf *mdev, int number)
 146{
 147        struct page *page = NULL;
 148        struct page *tmp = NULL;
 149        int i = 0;
 150
 151        /* Yes, testing drbd_pp_vacant outside the lock is racy.
 152         * So what. It saves a spin_lock. */
 153        if (drbd_pp_vacant >= number) {
 154                spin_lock(&drbd_pp_lock);
 155                page = page_chain_del(&drbd_pp_pool, number);
 156                if (page)
 157                        drbd_pp_vacant -= number;
 158                spin_unlock(&drbd_pp_lock);
 159                if (page)
 160                        return page;
 161        }
 162
 163        /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
 164         * "criss-cross" setup, that might cause write-out on some other DRBD,
 165         * which in turn might block on the other node at this very place.  */
 166        for (i = 0; i < number; i++) {
 167                tmp = alloc_page(GFP_TRY);
 168                if (!tmp)
 169                        break;
 170                set_page_private(tmp, (unsigned long)page);
 171                page = tmp;
 172        }
 173
 174        if (i == number)
 175                return page;
 176
 177        /* Not enough pages immediately available this time.
 178         * No need to jump around here, drbd_pp_alloc will retry this
 179         * function "soon". */
 180        if (page) {
 181                tmp = page_chain_tail(page, NULL);
 182                spin_lock(&drbd_pp_lock);
 183                page_chain_add(&drbd_pp_pool, page, tmp);
 184                drbd_pp_vacant += i;
 185                spin_unlock(&drbd_pp_lock);
 186        }
 187        return NULL;
 188}
 189
 190/* kick lower level device, if we have more than (arbitrary number)
 191 * reference counts on it, which typically are locally submitted io
 192 * requests.  don't use unacked_cnt, so we speed up proto A and B, too. */
 193static void maybe_kick_lo(struct drbd_conf *mdev)
 194{
 195        if (atomic_read(&mdev->local_cnt) >= mdev->net_conf->unplug_watermark)
 196                drbd_kick_lo(mdev);
 197}
 198
 199static void reclaim_net_ee(struct drbd_conf *mdev, struct list_head *to_be_freed)
 200{
 201        struct drbd_epoch_entry *e;
 202        struct list_head *le, *tle;
 203
 204        /* The EEs are always appended to the end of the list. Since
 205           they are sent in order over the wire, they have to finish
 206           in order. As soon as we see the first not finished we can
 207           stop to examine the list... */
 208
 209        list_for_each_safe(le, tle, &mdev->net_ee) {
 210                e = list_entry(le, struct drbd_epoch_entry, w.list);
 211                if (drbd_ee_has_active_page(e))
 212                        break;
 213                list_move(le, to_be_freed);
 214        }
 215}
 216
 217static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
 218{
 219        LIST_HEAD(reclaimed);
 220        struct drbd_epoch_entry *e, *t;
 221
 222        maybe_kick_lo(mdev);
 223        spin_lock_irq(&mdev->req_lock);
 224        reclaim_net_ee(mdev, &reclaimed);
 225        spin_unlock_irq(&mdev->req_lock);
 226
 227        list_for_each_entry_safe(e, t, &reclaimed, w.list)
 228                drbd_free_net_ee(mdev, e);
 229}
 230
 231/**
 232 * drbd_pp_alloc() - Returns @number pages, retries forever (or until signalled)
 233 * @mdev:       DRBD device.
 234 * @number:     number of pages requested
 235 * @retry:      whether to retry, if not enough pages are available right now
 236 *
 237 * Tries to allocate number pages, first from our own page pool, then from
 238 * the kernel, unless this allocation would exceed the max_buffers setting.
 239 * Possibly retry until DRBD frees sufficient pages somewhere else.
 240 *
 241 * Returns a page chain linked via page->private.
 242 */
 243static struct page *drbd_pp_alloc(struct drbd_conf *mdev, unsigned number, bool retry)
 244{
 245        struct page *page = NULL;
 246        DEFINE_WAIT(wait);
 247
 248        /* Yes, we may run up to @number over max_buffers. If we
 249         * follow it strictly, the admin will get it wrong anyways. */
 250        if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers)
 251                page = drbd_pp_first_pages_or_try_alloc(mdev, number);
 252
 253        while (page == NULL) {
 254                prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
 255
 256                drbd_kick_lo_and_reclaim_net(mdev);
 257
 258                if (atomic_read(&mdev->pp_in_use) < mdev->net_conf->max_buffers) {
 259                        page = drbd_pp_first_pages_or_try_alloc(mdev, number);
 260                        if (page)
 261                                break;
 262                }
 263
 264                if (!retry)
 265                        break;
 266
 267                if (signal_pending(current)) {
 268                        dev_warn(DEV, "drbd_pp_alloc interrupted!\n");
 269                        break;
 270                }
 271
 272                schedule();
 273        }
 274        finish_wait(&drbd_pp_wait, &wait);
 275
 276        if (page)
 277                atomic_add(number, &mdev->pp_in_use);
 278        return page;
 279}
 280
 281/* Must not be used from irq, as that may deadlock: see drbd_pp_alloc.
 282 * Is also used from inside an other spin_lock_irq(&mdev->req_lock);
 283 * Either links the page chain back to the global pool,
 284 * or returns all pages to the system. */
 285static void drbd_pp_free(struct drbd_conf *mdev, struct page *page, int is_net)
 286{
 287        atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
 288        int i;
 289
 290        if (drbd_pp_vacant > (DRBD_MAX_SEGMENT_SIZE/PAGE_SIZE)*minor_count)
 291                i = page_chain_free(page);
 292        else {
 293                struct page *tmp;
 294                tmp = page_chain_tail(page, &i);
 295                spin_lock(&drbd_pp_lock);
 296                page_chain_add(&drbd_pp_pool, page, tmp);
 297                drbd_pp_vacant += i;
 298                spin_unlock(&drbd_pp_lock);
 299        }
 300        i = atomic_sub_return(i, a);
 301        if (i < 0)
 302                dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
 303                        is_net ? "pp_in_use_by_net" : "pp_in_use", i);
 304        wake_up(&drbd_pp_wait);
 305}
 306
 307/*
 308You need to hold the req_lock:
 309 _drbd_wait_ee_list_empty()
 310
 311You must not have the req_lock:
 312 drbd_free_ee()
 313 drbd_alloc_ee()
 314 drbd_init_ee()
 315 drbd_release_ee()
 316 drbd_ee_fix_bhs()
 317 drbd_process_done_ee()
 318 drbd_clear_done_ee()
 319 drbd_wait_ee_list_empty()
 320*/
 321
 322struct drbd_epoch_entry *drbd_alloc_ee(struct drbd_conf *mdev,
 323                                     u64 id,
 324                                     sector_t sector,
 325                                     unsigned int data_size,
 326                                     gfp_t gfp_mask) __must_hold(local)
 327{
 328        struct drbd_epoch_entry *e;
 329        struct page *page;
 330        unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
 331
 332        if (FAULT_ACTIVE(mdev, DRBD_FAULT_AL_EE))
 333                return NULL;
 334
 335        e = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
 336        if (!e) {
 337                if (!(gfp_mask & __GFP_NOWARN))
 338                        dev_err(DEV, "alloc_ee: Allocation of an EE failed\n");
 339                return NULL;
 340        }
 341
 342        page = drbd_pp_alloc(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
 343        if (!page)
 344                goto fail;
 345
 346        INIT_HLIST_NODE(&e->colision);
 347        e->epoch = NULL;
 348        e->mdev = mdev;
 349        e->pages = page;
 350        atomic_set(&e->pending_bios, 0);
 351        e->size = data_size;
 352        e->flags = 0;
 353        e->sector = sector;
 354        e->block_id = id;
 355
 356        return e;
 357
 358 fail:
 359        mempool_free(e, drbd_ee_mempool);
 360        return NULL;
 361}
 362
 363void drbd_free_some_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e, int is_net)
 364{
 365        if (e->flags & EE_HAS_DIGEST)
 366                kfree(e->digest);
 367        drbd_pp_free(mdev, e->pages, is_net);
 368        D_ASSERT(atomic_read(&e->pending_bios) == 0);
 369        D_ASSERT(hlist_unhashed(&e->colision));
 370        mempool_free(e, drbd_ee_mempool);
 371}
 372
 373int drbd_release_ee(struct drbd_conf *mdev, struct list_head *list)
 374{
 375        LIST_HEAD(work_list);
 376        struct drbd_epoch_entry *e, *t;
 377        int count = 0;
 378        int is_net = list == &mdev->net_ee;
 379
 380        spin_lock_irq(&mdev->req_lock);
 381        list_splice_init(list, &work_list);
 382        spin_unlock_irq(&mdev->req_lock);
 383
 384        list_for_each_entry_safe(e, t, &work_list, w.list) {
 385                drbd_free_some_ee(mdev, e, is_net);
 386                count++;
 387        }
 388        return count;
 389}
 390
 391
 392/*
 393 * This function is called from _asender only_
 394 * but see also comments in _req_mod(,barrier_acked)
 395 * and receive_Barrier.
 396 *
 397 * Move entries from net_ee to done_ee, if ready.
 398 * Grab done_ee, call all callbacks, free the entries.
 399 * The callbacks typically send out ACKs.
 400 */
 401static int drbd_process_done_ee(struct drbd_conf *mdev)
 402{
 403        LIST_HEAD(work_list);
 404        LIST_HEAD(reclaimed);
 405        struct drbd_epoch_entry *e, *t;
 406        int ok = (mdev->state.conn >= C_WF_REPORT_PARAMS);
 407
 408        spin_lock_irq(&mdev->req_lock);
 409        reclaim_net_ee(mdev, &reclaimed);
 410        list_splice_init(&mdev->done_ee, &work_list);
 411        spin_unlock_irq(&mdev->req_lock);
 412
 413        list_for_each_entry_safe(e, t, &reclaimed, w.list)
 414                drbd_free_net_ee(mdev, e);
 415
 416        /* possible callbacks here:
 417         * e_end_block, and e_end_resync_block, e_send_discard_ack.
 418         * all ignore the last argument.
 419         */
 420        list_for_each_entry_safe(e, t, &work_list, w.list) {
 421                /* list_del not necessary, next/prev members not touched */
 422                ok = e->w.cb(mdev, &e->w, !ok) && ok;
 423                drbd_free_ee(mdev, e);
 424        }
 425        wake_up(&mdev->ee_wait);
 426
 427        return ok;
 428}
 429
 430void _drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
 431{
 432        DEFINE_WAIT(wait);
 433
 434        /* avoids spin_lock/unlock
 435         * and calling prepare_to_wait in the fast path */
 436        while (!list_empty(head)) {
 437                prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
 438                spin_unlock_irq(&mdev->req_lock);
 439                drbd_kick_lo(mdev);
 440                schedule();
 441                finish_wait(&mdev->ee_wait, &wait);
 442                spin_lock_irq(&mdev->req_lock);
 443        }
 444}
 445
 446void drbd_wait_ee_list_empty(struct drbd_conf *mdev, struct list_head *head)
 447{
 448        spin_lock_irq(&mdev->req_lock);
 449        _drbd_wait_ee_list_empty(mdev, head);
 450        spin_unlock_irq(&mdev->req_lock);
 451}
 452
 453/* see also kernel_accept; which is only present since 2.6.18.
 454 * also we want to log which part of it failed, exactly */
 455static int drbd_accept(struct drbd_conf *mdev, const char **what,
 456                struct socket *sock, struct socket **newsock)
 457{
 458        struct sock *sk = sock->sk;
 459        int err = 0;
 460
 461        *what = "listen";
 462        err = sock->ops->listen(sock, 5);
 463        if (err < 0)
 464                goto out;
 465
 466        *what = "sock_create_lite";
 467        err = sock_create_lite(sk->sk_family, sk->sk_type, sk->sk_protocol,
 468                               newsock);
 469        if (err < 0)
 470                goto out;
 471
 472        *what = "accept";
 473        err = sock->ops->accept(sock, *newsock, 0);
 474        if (err < 0) {
 475                sock_release(*newsock);
 476                *newsock = NULL;
 477                goto out;
 478        }
 479        (*newsock)->ops  = sock->ops;
 480
 481out:
 482        return err;
 483}
 484
 485static int drbd_recv_short(struct drbd_conf *mdev, struct socket *sock,
 486                    void *buf, size_t size, int flags)
 487{
 488        mm_segment_t oldfs;
 489        struct kvec iov = {
 490                .iov_base = buf,
 491                .iov_len = size,
 492        };
 493        struct msghdr msg = {
 494                .msg_iovlen = 1,
 495                .msg_iov = (struct iovec *)&iov,
 496                .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
 497        };
 498        int rv;
 499
 500        oldfs = get_fs();
 501        set_fs(KERNEL_DS);
 502        rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
 503        set_fs(oldfs);
 504
 505        return rv;
 506}
 507
 508static int drbd_recv(struct drbd_conf *mdev, void *buf, size_t size)
 509{
 510        mm_segment_t oldfs;
 511        struct kvec iov = {
 512                .iov_base = buf,
 513                .iov_len = size,
 514        };
 515        struct msghdr msg = {
 516                .msg_iovlen = 1,
 517                .msg_iov = (struct iovec *)&iov,
 518                .msg_flags = MSG_WAITALL | MSG_NOSIGNAL
 519        };
 520        int rv;
 521
 522        oldfs = get_fs();
 523        set_fs(KERNEL_DS);
 524
 525        for (;;) {
 526                rv = sock_recvmsg(mdev->data.socket, &msg, size, msg.msg_flags);
 527                if (rv == size)
 528                        break;
 529
 530                /* Note:
 531                 * ECONNRESET   other side closed the connection
 532                 * ERESTARTSYS  (on  sock) we got a signal
 533                 */
 534
 535                if (rv < 0) {
 536                        if (rv == -ECONNRESET)
 537                                dev_info(DEV, "sock was reset by peer\n");
 538                        else if (rv != -ERESTARTSYS)
 539                                dev_err(DEV, "sock_recvmsg returned %d\n", rv);
 540                        break;
 541                } else if (rv == 0) {
 542                        dev_info(DEV, "sock was shut down by peer\n");
 543                        break;
 544                } else  {
 545                        /* signal came in, or peer/link went down,
 546                         * after we read a partial message
 547                         */
 548                        /* D_ASSERT(signal_pending(current)); */
 549                        break;
 550                }
 551        };
 552
 553        set_fs(oldfs);
 554
 555        if (rv != size)
 556                drbd_force_state(mdev, NS(conn, C_BROKEN_PIPE));
 557
 558        return rv;
 559}
 560
 561/* quoting tcp(7):
 562 *   On individual connections, the socket buffer size must be set prior to the
 563 *   listen(2) or connect(2) calls in order to have it take effect.
 564 * This is our wrapper to do so.
 565 */
 566static void drbd_setbufsize(struct socket *sock, unsigned int snd,
 567                unsigned int rcv)
 568{
 569        /* open coded SO_SNDBUF, SO_RCVBUF */
 570        if (snd) {
 571                sock->sk->sk_sndbuf = snd;
 572                sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
 573        }
 574        if (rcv) {
 575                sock->sk->sk_rcvbuf = rcv;
 576                sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
 577        }
 578}
 579
 580static struct socket *drbd_try_connect(struct drbd_conf *mdev)
 581{
 582        const char *what;
 583        struct socket *sock;
 584        struct sockaddr_in6 src_in6;
 585        int err;
 586        int disconnect_on_error = 1;
 587
 588        if (!get_net_conf(mdev))
 589                return NULL;
 590
 591        what = "sock_create_kern";
 592        err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
 593                SOCK_STREAM, IPPROTO_TCP, &sock);
 594        if (err < 0) {
 595                sock = NULL;
 596                goto out;
 597        }
 598
 599        sock->sk->sk_rcvtimeo =
 600        sock->sk->sk_sndtimeo =  mdev->net_conf->try_connect_int*HZ;
 601        drbd_setbufsize(sock, mdev->net_conf->sndbuf_size,
 602                        mdev->net_conf->rcvbuf_size);
 603
 604       /* explicitly bind to the configured IP as source IP
 605        *  for the outgoing connections.
 606        *  This is needed for multihomed hosts and to be
 607        *  able to use lo: interfaces for drbd.
 608        * Make sure to use 0 as port number, so linux selects
 609        *  a free one dynamically.
 610        */
 611        memcpy(&src_in6, mdev->net_conf->my_addr,
 612               min_t(int, mdev->net_conf->my_addr_len, sizeof(src_in6)));
 613        if (((struct sockaddr *)mdev->net_conf->my_addr)->sa_family == AF_INET6)
 614                src_in6.sin6_port = 0;
 615        else
 616                ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
 617
 618        what = "bind before connect";
 619        err = sock->ops->bind(sock,
 620                              (struct sockaddr *) &src_in6,
 621                              mdev->net_conf->my_addr_len);
 622        if (err < 0)
 623                goto out;
 624
 625        /* connect may fail, peer not yet available.
 626         * stay C_WF_CONNECTION, don't go Disconnecting! */
 627        disconnect_on_error = 0;
 628        what = "connect";
 629        err = sock->ops->connect(sock,
 630                                 (struct sockaddr *)mdev->net_conf->peer_addr,
 631                                 mdev->net_conf->peer_addr_len, 0);
 632
 633out:
 634        if (err < 0) {
 635                if (sock) {
 636                        sock_release(sock);
 637                        sock = NULL;
 638                }
 639                switch (-err) {
 640                        /* timeout, busy, signal pending */
 641                case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
 642                case EINTR: case ERESTARTSYS:
 643                        /* peer not (yet) available, network problem */
 644                case ECONNREFUSED: case ENETUNREACH:
 645                case EHOSTDOWN:    case EHOSTUNREACH:
 646                        disconnect_on_error = 0;
 647                        break;
 648                default:
 649                        dev_err(DEV, "%s failed, err = %d\n", what, err);
 650                }
 651                if (disconnect_on_error)
 652                        drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
 653        }
 654        put_net_conf(mdev);
 655        return sock;
 656}
 657
 658static struct socket *drbd_wait_for_connect(struct drbd_conf *mdev)
 659{
 660        int timeo, err;
 661        struct socket *s_estab = NULL, *s_listen;
 662        const char *what;
 663
 664        if (!get_net_conf(mdev))
 665                return NULL;
 666
 667        what = "sock_create_kern";
 668        err = sock_create_kern(((struct sockaddr *)mdev->net_conf->my_addr)->sa_family,
 669                SOCK_STREAM, IPPROTO_TCP, &s_listen);
 670        if (err) {
 671                s_listen = NULL;
 672                goto out;
 673        }
 674
 675        timeo = mdev->net_conf->try_connect_int * HZ;
 676        timeo += (random32() & 1) ? timeo / 7 : -timeo / 7; /* 28.5% random jitter */
 677
 678        s_listen->sk->sk_reuse    = 1; /* SO_REUSEADDR */
 679        s_listen->sk->sk_rcvtimeo = timeo;
 680        s_listen->sk->sk_sndtimeo = timeo;
 681        drbd_setbufsize(s_listen, mdev->net_conf->sndbuf_size,
 682                        mdev->net_conf->rcvbuf_size);
 683
 684        what = "bind before listen";
 685        err = s_listen->ops->bind(s_listen,
 686                              (struct sockaddr *) mdev->net_conf->my_addr,
 687                              mdev->net_conf->my_addr_len);
 688        if (err < 0)
 689                goto out;
 690
 691        err = drbd_accept(mdev, &what, s_listen, &s_estab);
 692
 693out:
 694        if (s_listen)
 695                sock_release(s_listen);
 696        if (err < 0) {
 697                if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
 698                        dev_err(DEV, "%s failed, err = %d\n", what, err);
 699                        drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
 700                }
 701        }
 702        put_net_conf(mdev);
 703
 704        return s_estab;
 705}
 706
 707static int drbd_send_fp(struct drbd_conf *mdev,
 708        struct socket *sock, enum drbd_packets cmd)
 709{
 710        struct p_header80 *h = &mdev->data.sbuf.header.h80;
 711
 712        return _drbd_send_cmd(mdev, sock, cmd, h, sizeof(*h), 0);
 713}
 714
 715static enum drbd_packets drbd_recv_fp(struct drbd_conf *mdev, struct socket *sock)
 716{
 717        struct p_header80 *h = &mdev->data.rbuf.header.h80;
 718        int rr;
 719
 720        rr = drbd_recv_short(mdev, sock, h, sizeof(*h), 0);
 721
 722        if (rr == sizeof(*h) && h->magic == BE_DRBD_MAGIC)
 723                return be16_to_cpu(h->command);
 724
 725        return 0xffff;
 726}
 727
 728/**
 729 * drbd_socket_okay() - Free the socket if its connection is not okay
 730 * @mdev:       DRBD device.
 731 * @sock:       pointer to the pointer to the socket.
 732 */
 733static int drbd_socket_okay(struct drbd_conf *mdev, struct socket **sock)
 734{
 735        int rr;
 736        char tb[4];
 737
 738        if (!*sock)
 739                return FALSE;
 740
 741        rr = drbd_recv_short(mdev, *sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
 742
 743        if (rr > 0 || rr == -EAGAIN) {
 744                return TRUE;
 745        } else {
 746                sock_release(*sock);
 747                *sock = NULL;
 748                return FALSE;
 749        }
 750}
 751
 752/*
 753 * return values:
 754 *   1 yes, we have a valid connection
 755 *   0 oops, did not work out, please try again
 756 *  -1 peer talks different language,
 757 *     no point in trying again, please go standalone.
 758 *  -2 We do not have a network config...
 759 */
 760static int drbd_connect(struct drbd_conf *mdev)
 761{
 762        struct socket *s, *sock, *msock;
 763        int try, h, ok;
 764
 765        D_ASSERT(!mdev->data.socket);
 766
 767        if (drbd_request_state(mdev, NS(conn, C_WF_CONNECTION)) < SS_SUCCESS)
 768                return -2;
 769
 770        clear_bit(DISCARD_CONCURRENT, &mdev->flags);
 771
 772        sock  = NULL;
 773        msock = NULL;
 774
 775        do {
 776                for (try = 0;;) {
 777                        /* 3 tries, this should take less than a second! */
 778                        s = drbd_try_connect(mdev);
 779                        if (s || ++try >= 3)
 780                                break;
 781                        /* give the other side time to call bind() & listen() */
 782                        __set_current_state(TASK_INTERRUPTIBLE);
 783                        schedule_timeout(HZ / 10);
 784                }
 785
 786                if (s) {
 787                        if (!sock) {
 788                                drbd_send_fp(mdev, s, P_HAND_SHAKE_S);
 789                                sock = s;
 790                                s = NULL;
 791                        } else if (!msock) {
 792                                drbd_send_fp(mdev, s, P_HAND_SHAKE_M);
 793                                msock = s;
 794                                s = NULL;
 795                        } else {
 796                                dev_err(DEV, "Logic error in drbd_connect()\n");
 797                                goto out_release_sockets;
 798                        }
 799                }
 800
 801                if (sock && msock) {
 802                        __set_current_state(TASK_INTERRUPTIBLE);
 803                        schedule_timeout(HZ / 10);
 804                        ok = drbd_socket_okay(mdev, &sock);
 805                        ok = drbd_socket_okay(mdev, &msock) && ok;
 806                        if (ok)
 807                                break;
 808                }
 809
 810retry:
 811                s = drbd_wait_for_connect(mdev);
 812                if (s) {
 813                        try = drbd_recv_fp(mdev, s);
 814                        drbd_socket_okay(mdev, &sock);
 815                        drbd_socket_okay(mdev, &msock);
 816                        switch (try) {
 817                        case P_HAND_SHAKE_S:
 818                                if (sock) {
 819                                        dev_warn(DEV, "initial packet S crossed\n");
 820                                        sock_release(sock);
 821                                }
 822                                sock = s;
 823                                break;
 824                        case P_HAND_SHAKE_M:
 825                                if (msock) {
 826                                        dev_warn(DEV, "initial packet M crossed\n");
 827                                        sock_release(msock);
 828                                }
 829                                msock = s;
 830                                set_bit(DISCARD_CONCURRENT, &mdev->flags);
 831                                break;
 832                        default:
 833                                dev_warn(DEV, "Error receiving initial packet\n");
 834                                sock_release(s);
 835                                if (random32() & 1)
 836                                        goto retry;
 837                        }
 838                }
 839
 840                if (mdev->state.conn <= C_DISCONNECTING)
 841                        goto out_release_sockets;
 842                if (signal_pending(current)) {
 843                        flush_signals(current);
 844                        smp_rmb();
 845                        if (get_t_state(&mdev->receiver) == Exiting)
 846                                goto out_release_sockets;
 847                }
 848
 849                if (sock && msock) {
 850                        ok = drbd_socket_okay(mdev, &sock);
 851                        ok = drbd_socket_okay(mdev, &msock) && ok;
 852                        if (ok)
 853                                break;
 854                }
 855        } while (1);
 856
 857        msock->sk->sk_reuse = 1; /* SO_REUSEADDR */
 858        sock->sk->sk_reuse = 1; /* SO_REUSEADDR */
 859
 860        sock->sk->sk_allocation = GFP_NOIO;
 861        msock->sk->sk_allocation = GFP_NOIO;
 862
 863        sock->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
 864        msock->sk->sk_priority = TC_PRIO_INTERACTIVE;
 865
 866        /* NOT YET ...
 867         * sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
 868         * sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
 869         * first set it to the P_HAND_SHAKE timeout,
 870         * which we set to 4x the configured ping_timeout. */
 871        sock->sk->sk_sndtimeo =
 872        sock->sk->sk_rcvtimeo = mdev->net_conf->ping_timeo*4*HZ/10;
 873
 874        msock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
 875        msock->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
 876
 877        /* we don't want delays.
 878         * we use TCP_CORK where apropriate, though */
 879        drbd_tcp_nodelay(sock);
 880        drbd_tcp_nodelay(msock);
 881
 882        mdev->data.socket = sock;
 883        mdev->meta.socket = msock;
 884        mdev->last_received = jiffies;
 885
 886        D_ASSERT(mdev->asender.task == NULL);
 887
 888        h = drbd_do_handshake(mdev);
 889        if (h <= 0)
 890                return h;
 891
 892        if (mdev->cram_hmac_tfm) {
 893                /* drbd_request_state(mdev, NS(conn, WFAuth)); */
 894                switch (drbd_do_auth(mdev)) {
 895                case -1:
 896                        dev_err(DEV, "Authentication of peer failed\n");
 897                        return -1;
 898                case 0:
 899                        dev_err(DEV, "Authentication of peer failed, trying again.\n");
 900                        return 0;
 901                }
 902        }
 903
 904        if (drbd_request_state(mdev, NS(conn, C_WF_REPORT_PARAMS)) < SS_SUCCESS)
 905                return 0;
 906
 907        sock->sk->sk_sndtimeo = mdev->net_conf->timeout*HZ/10;
 908        sock->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
 909
 910        atomic_set(&mdev->packet_seq, 0);
 911        mdev->peer_seq = 0;
 912
 913        drbd_thread_start(&mdev->asender);
 914
 915        if (mdev->agreed_pro_version < 95 && get_ldev(mdev)) {
 916                drbd_setup_queue_param(mdev, DRBD_MAX_SIZE_H80_PACKET);
 917                put_ldev(mdev);
 918        }
 919
 920        if (!drbd_send_protocol(mdev))
 921                return -1;
 922        drbd_send_sync_param(mdev, &mdev->sync_conf);
 923        drbd_send_sizes(mdev, 0, 0);
 924        drbd_send_uuids(mdev);
 925        drbd_send_state(mdev);
 926        clear_bit(USE_DEGR_WFC_T, &mdev->flags);
 927        clear_bit(RESIZE_PENDING, &mdev->flags);
 928
 929        return 1;
 930
 931out_release_sockets:
 932        if (sock)
 933                sock_release(sock);
 934        if (msock)
 935                sock_release(msock);
 936        return -1;
 937}
 938
 939static int drbd_recv_header(struct drbd_conf *mdev, enum drbd_packets *cmd, unsigned int *packet_size)
 940{
 941        union p_header *h = &mdev->data.rbuf.header;
 942        int r;
 943
 944        r = drbd_recv(mdev, h, sizeof(*h));
 945        if (unlikely(r != sizeof(*h))) {
 946                dev_err(DEV, "short read expecting header on sock: r=%d\n", r);
 947                return FALSE;
 948        }
 949
 950        if (likely(h->h80.magic == BE_DRBD_MAGIC)) {
 951                *cmd = be16_to_cpu(h->h80.command);
 952                *packet_size = be16_to_cpu(h->h80.length);
 953        } else if (h->h95.magic == BE_DRBD_MAGIC_BIG) {
 954                *cmd = be16_to_cpu(h->h95.command);
 955                *packet_size = be32_to_cpu(h->h95.length);
 956        } else {
 957                dev_err(DEV, "magic?? on data m: 0x%08x c: %d l: %d\n",
 958                    be32_to_cpu(h->h80.magic),
 959                    be16_to_cpu(h->h80.command),
 960                    be16_to_cpu(h->h80.length));
 961                return FALSE;
 962        }
 963        mdev->last_received = jiffies;
 964
 965        return TRUE;
 966}
 967
 968static void drbd_flush(struct drbd_conf *mdev)
 969{
 970        int rv;
 971
 972        if (mdev->write_ordering >= WO_bdev_flush && get_ldev(mdev)) {
 973                rv = blkdev_issue_flush(mdev->ldev->backing_bdev, GFP_KERNEL,
 974                                        NULL);
 975                if (rv) {
 976                        dev_err(DEV, "local disk flush failed with status %d\n", rv);
 977                        /* would rather check on EOPNOTSUPP, but that is not reliable.
 978                         * don't try again for ANY return value != 0
 979                         * if (rv == -EOPNOTSUPP) */
 980                        drbd_bump_write_ordering(mdev, WO_drain_io);
 981                }
 982                put_ldev(mdev);
 983        }
 984}
 985
 986/**
 987 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
 988 * @mdev:       DRBD device.
 989 * @epoch:      Epoch object.
 990 * @ev:         Epoch event.
 991 */
 992static enum finish_epoch drbd_may_finish_epoch(struct drbd_conf *mdev,
 993                                               struct drbd_epoch *epoch,
 994                                               enum epoch_event ev)
 995{
 996        int epoch_size;
 997        struct drbd_epoch *next_epoch;
 998        enum finish_epoch rv = FE_STILL_LIVE;
 999
1000        spin_lock(&mdev->epoch_lock);
1001        do {
1002                next_epoch = NULL;
1003
1004                epoch_size = atomic_read(&epoch->epoch_size);
1005
1006                switch (ev & ~EV_CLEANUP) {
1007                case EV_PUT:
1008                        atomic_dec(&epoch->active);
1009                        break;
1010                case EV_GOT_BARRIER_NR:
1011                        set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1012                        break;
1013                case EV_BECAME_LAST:
1014                        /* nothing to do*/
1015                        break;
1016                }
1017
1018                if (epoch_size != 0 &&
1019                    atomic_read(&epoch->active) == 0 &&
1020                    test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags)) {
1021                        if (!(ev & EV_CLEANUP)) {
1022                                spin_unlock(&mdev->epoch_lock);
1023                                drbd_send_b_ack(mdev, epoch->barrier_nr, epoch_size);
1024                                spin_lock(&mdev->epoch_lock);
1025                        }
1026                        dec_unacked(mdev);
1027
1028                        if (mdev->current_epoch != epoch) {
1029                                next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1030                                list_del(&epoch->list);
1031                                ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1032                                mdev->epochs--;
1033                                kfree(epoch);
1034
1035                                if (rv == FE_STILL_LIVE)
1036                                        rv = FE_DESTROYED;
1037                        } else {
1038                                epoch->flags = 0;
1039                                atomic_set(&epoch->epoch_size, 0);
1040                                /* atomic_set(&epoch->active, 0); is already zero */
1041                                if (rv == FE_STILL_LIVE)
1042                                        rv = FE_RECYCLED;
1043                                wake_up(&mdev->ee_wait);
1044                        }
1045                }
1046
1047                if (!next_epoch)
1048                        break;
1049
1050                epoch = next_epoch;
1051        } while (1);
1052
1053        spin_unlock(&mdev->epoch_lock);
1054
1055        return rv;
1056}
1057
1058/**
1059 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1060 * @mdev:       DRBD device.
1061 * @wo:         Write ordering method to try.
1062 */
1063void drbd_bump_write_ordering(struct drbd_conf *mdev, enum write_ordering_e wo) __must_hold(local)
1064{
1065        enum write_ordering_e pwo;
1066        static char *write_ordering_str[] = {
1067                [WO_none] = "none",
1068                [WO_drain_io] = "drain",
1069                [WO_bdev_flush] = "flush",
1070        };
1071
1072        pwo = mdev->write_ordering;
1073        wo = min(pwo, wo);
1074        if (wo == WO_bdev_flush && mdev->ldev->dc.no_disk_flush)
1075                wo = WO_drain_io;
1076        if (wo == WO_drain_io && mdev->ldev->dc.no_disk_drain)
1077                wo = WO_none;
1078        mdev->write_ordering = wo;
1079        if (pwo != mdev->write_ordering || wo == WO_bdev_flush)
1080                dev_info(DEV, "Method to ensure write ordering: %s\n", write_ordering_str[mdev->write_ordering]);
1081}
1082
1083/**
1084 * drbd_submit_ee()
1085 * @mdev:       DRBD device.
1086 * @e:          epoch entry
1087 * @rw:         flag field, see bio->bi_rw
1088 */
1089/* TODO allocate from our own bio_set. */
1090int drbd_submit_ee(struct drbd_conf *mdev, struct drbd_epoch_entry *e,
1091                const unsigned rw, const int fault_type)
1092{
1093        struct bio *bios = NULL;
1094        struct bio *bio;
1095        struct page *page = e->pages;
1096        sector_t sector = e->sector;
1097        unsigned ds = e->size;
1098        unsigned n_bios = 0;
1099        unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1100
1101        /* In most cases, we will only need one bio.  But in case the lower
1102         * level restrictions happen to be different at this offset on this
1103         * side than those of the sending peer, we may need to submit the
1104         * request in more than one bio. */
1105next_bio:
1106        bio = bio_alloc(GFP_NOIO, nr_pages);
1107        if (!bio) {
1108                dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1109                goto fail;
1110        }
1111        /* > e->sector, unless this is the first bio */
1112        bio->bi_sector = sector;
1113        bio->bi_bdev = mdev->ldev->backing_bdev;
1114        /* we special case some flags in the multi-bio case, see below
1115         * (REQ_UNPLUG) */
1116        bio->bi_rw = rw;
1117        bio->bi_private = e;
1118        bio->bi_end_io = drbd_endio_sec;
1119
1120        bio->bi_next = bios;
1121        bios = bio;
1122        ++n_bios;
1123
1124        page_chain_for_each(page) {
1125                unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1126                if (!bio_add_page(bio, page, len, 0)) {
1127                        /* a single page must always be possible! */
1128                        BUG_ON(bio->bi_vcnt == 0);
1129                        goto next_bio;
1130                }
1131                ds -= len;
1132                sector += len >> 9;
1133                --nr_pages;
1134        }
1135        D_ASSERT(page == NULL);
1136        D_ASSERT(ds == 0);
1137
1138        atomic_set(&e->pending_bios, n_bios);
1139        do {
1140                bio = bios;
1141                bios = bios->bi_next;
1142                bio->bi_next = NULL;
1143
1144                /* strip off REQ_UNPLUG unless it is the last bio */
1145                if (bios)
1146                        bio->bi_rw &= ~REQ_UNPLUG;
1147
1148                drbd_generic_make_request(mdev, fault_type, bio);
1149        } while (bios);
1150        maybe_kick_lo(mdev);
1151        return 0;
1152
1153fail:
1154        while (bios) {
1155                bio = bios;
1156                bios = bios->bi_next;
1157                bio_put(bio);
1158        }
1159        return -ENOMEM;
1160}
1161
1162static int receive_Barrier(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1163{
1164        int rv;
1165        struct p_barrier *p = &mdev->data.rbuf.barrier;
1166        struct drbd_epoch *epoch;
1167
1168        inc_unacked(mdev);
1169
1170        if (mdev->net_conf->wire_protocol != DRBD_PROT_C)
1171                drbd_kick_lo(mdev);
1172
1173        mdev->current_epoch->barrier_nr = p->barrier;
1174        rv = drbd_may_finish_epoch(mdev, mdev->current_epoch, EV_GOT_BARRIER_NR);
1175
1176        /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1177         * the activity log, which means it would not be resynced in case the
1178         * R_PRIMARY crashes now.
1179         * Therefore we must send the barrier_ack after the barrier request was
1180         * completed. */
1181        switch (mdev->write_ordering) {
1182        case WO_none:
1183                if (rv == FE_RECYCLED)
1184                        return TRUE;
1185
1186                /* receiver context, in the writeout path of the other node.
1187                 * avoid potential distributed deadlock */
1188                epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1189                if (epoch)
1190                        break;
1191                else
1192                        dev_warn(DEV, "Allocation of an epoch failed, slowing down\n");
1193                        /* Fall through */
1194
1195        case WO_bdev_flush:
1196        case WO_drain_io:
1197                drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1198                drbd_flush(mdev);
1199
1200                if (atomic_read(&mdev->current_epoch->epoch_size)) {
1201                        epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1202                        if (epoch)
1203                                break;
1204                }
1205
1206                epoch = mdev->current_epoch;
1207                wait_event(mdev->ee_wait, atomic_read(&epoch->epoch_size) == 0);
1208
1209                D_ASSERT(atomic_read(&epoch->active) == 0);
1210                D_ASSERT(epoch->flags == 0);
1211
1212                return TRUE;
1213        default:
1214                dev_err(DEV, "Strangeness in mdev->write_ordering %d\n", mdev->write_ordering);
1215                return FALSE;
1216        }
1217
1218        epoch->flags = 0;
1219        atomic_set(&epoch->epoch_size, 0);
1220        atomic_set(&epoch->active, 0);
1221
1222        spin_lock(&mdev->epoch_lock);
1223        if (atomic_read(&mdev->current_epoch->epoch_size)) {
1224                list_add(&epoch->list, &mdev->current_epoch->list);
1225                mdev->current_epoch = epoch;
1226                mdev->epochs++;
1227        } else {
1228                /* The current_epoch got recycled while we allocated this one... */
1229                kfree(epoch);
1230        }
1231        spin_unlock(&mdev->epoch_lock);
1232
1233        return TRUE;
1234}
1235
1236/* used from receive_RSDataReply (recv_resync_read)
1237 * and from receive_Data */
1238static struct drbd_epoch_entry *
1239read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector, int data_size) __must_hold(local)
1240{
1241        const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1242        struct drbd_epoch_entry *e;
1243        struct page *page;
1244        int dgs, ds, rr;
1245        void *dig_in = mdev->int_dig_in;
1246        void *dig_vv = mdev->int_dig_vv;
1247        unsigned long *data;
1248
1249        dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1250                crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1251
1252        if (dgs) {
1253                rr = drbd_recv(mdev, dig_in, dgs);
1254                if (rr != dgs) {
1255                        dev_warn(DEV, "short read receiving data digest: read %d expected %d\n",
1256                             rr, dgs);
1257                        return NULL;
1258                }
1259        }
1260
1261        data_size -= dgs;
1262
1263        ERR_IF(data_size &  0x1ff) return NULL;
1264        ERR_IF(data_size >  DRBD_MAX_SEGMENT_SIZE) return NULL;
1265
1266        /* even though we trust out peer,
1267         * we sometimes have to double check. */
1268        if (sector + (data_size>>9) > capacity) {
1269                dev_err(DEV, "capacity: %llus < sector: %llus + size: %u\n",
1270                        (unsigned long long)capacity,
1271                        (unsigned long long)sector, data_size);
1272                return NULL;
1273        }
1274
1275        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1276         * "criss-cross" setup, that might cause write-out on some other DRBD,
1277         * which in turn might block on the other node at this very place.  */
1278        e = drbd_alloc_ee(mdev, id, sector, data_size, GFP_NOIO);
1279        if (!e)
1280                return NULL;
1281
1282        ds = data_size;
1283        page = e->pages;
1284        page_chain_for_each(page) {
1285                unsigned len = min_t(int, ds, PAGE_SIZE);
1286                data = kmap(page);
1287                rr = drbd_recv(mdev, data, len);
1288                if (FAULT_ACTIVE(mdev, DRBD_FAULT_RECEIVE)) {
1289                        dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1290                        data[0] = data[0] ^ (unsigned long)-1;
1291                }
1292                kunmap(page);
1293                if (rr != len) {
1294                        drbd_free_ee(mdev, e);
1295                        dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1296                             rr, len);
1297                        return NULL;
1298                }
1299                ds -= rr;
1300        }
1301
1302        if (dgs) {
1303                drbd_csum_ee(mdev, mdev->integrity_r_tfm, e, dig_vv);
1304                if (memcmp(dig_in, dig_vv, dgs)) {
1305                        dev_err(DEV, "Digest integrity check FAILED.\n");
1306                        drbd_bcast_ee(mdev, "digest failed",
1307                                        dgs, dig_in, dig_vv, e);
1308                        drbd_free_ee(mdev, e);
1309                        return NULL;
1310                }
1311        }
1312        mdev->recv_cnt += data_size>>9;
1313        return e;
1314}
1315
1316/* drbd_drain_block() just takes a data block
1317 * out of the socket input buffer, and discards it.
1318 */
1319static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1320{
1321        struct page *page;
1322        int rr, rv = 1;
1323        void *data;
1324
1325        if (!data_size)
1326                return TRUE;
1327
1328        page = drbd_pp_alloc(mdev, 1, 1);
1329
1330        data = kmap(page);
1331        while (data_size) {
1332                rr = drbd_recv(mdev, data, min_t(int, data_size, PAGE_SIZE));
1333                if (rr != min_t(int, data_size, PAGE_SIZE)) {
1334                        rv = 0;
1335                        dev_warn(DEV, "short read receiving data: read %d expected %d\n",
1336                             rr, min_t(int, data_size, PAGE_SIZE));
1337                        break;
1338                }
1339                data_size -= rr;
1340        }
1341        kunmap(page);
1342        drbd_pp_free(mdev, page, 0);
1343        return rv;
1344}
1345
1346static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1347                           sector_t sector, int data_size)
1348{
1349        struct bio_vec *bvec;
1350        struct bio *bio;
1351        int dgs, rr, i, expect;
1352        void *dig_in = mdev->int_dig_in;
1353        void *dig_vv = mdev->int_dig_vv;
1354
1355        dgs = (mdev->agreed_pro_version >= 87 && mdev->integrity_r_tfm) ?
1356                crypto_hash_digestsize(mdev->integrity_r_tfm) : 0;
1357
1358        if (dgs) {
1359                rr = drbd_recv(mdev, dig_in, dgs);
1360                if (rr != dgs) {
1361                        dev_warn(DEV, "short read receiving data reply digest: read %d expected %d\n",
1362                             rr, dgs);
1363                        return 0;
1364                }
1365        }
1366
1367        data_size -= dgs;
1368
1369        /* optimistically update recv_cnt.  if receiving fails below,
1370         * we disconnect anyways, and counters will be reset. */
1371        mdev->recv_cnt += data_size>>9;
1372
1373        bio = req->master_bio;
1374        D_ASSERT(sector == bio->bi_sector);
1375
1376        bio_for_each_segment(bvec, bio, i) {
1377                expect = min_t(int, data_size, bvec->bv_len);
1378                rr = drbd_recv(mdev,
1379                             kmap(bvec->bv_page)+bvec->bv_offset,
1380                             expect);
1381                kunmap(bvec->bv_page);
1382                if (rr != expect) {
1383                        dev_warn(DEV, "short read receiving data reply: "
1384                             "read %d expected %d\n",
1385                             rr, expect);
1386                        return 0;
1387                }
1388                data_size -= rr;
1389        }
1390
1391        if (dgs) {
1392                drbd_csum_bio(mdev, mdev->integrity_r_tfm, bio, dig_vv);
1393                if (memcmp(dig_in, dig_vv, dgs)) {
1394                        dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1395                        return 0;
1396                }
1397        }
1398
1399        D_ASSERT(data_size == 0);
1400        return 1;
1401}
1402
1403/* e_end_resync_block() is called via
1404 * drbd_process_done_ee() by asender only */
1405static int e_end_resync_block(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1406{
1407        struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1408        sector_t sector = e->sector;
1409        int ok;
1410
1411        D_ASSERT(hlist_unhashed(&e->colision));
1412
1413        if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1414                drbd_set_in_sync(mdev, sector, e->size);
1415                ok = drbd_send_ack(mdev, P_RS_WRITE_ACK, e);
1416        } else {
1417                /* Record failure to sync */
1418                drbd_rs_failed_io(mdev, sector, e->size);
1419
1420                ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1421        }
1422        dec_unacked(mdev);
1423
1424        return ok;
1425}
1426
1427static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1428{
1429        struct drbd_epoch_entry *e;
1430
1431        e = read_in_block(mdev, ID_SYNCER, sector, data_size);
1432        if (!e)
1433                goto fail;
1434
1435        dec_rs_pending(mdev);
1436
1437        inc_unacked(mdev);
1438        /* corresponding dec_unacked() in e_end_resync_block()
1439         * respective _drbd_clear_done_ee */
1440
1441        e->w.cb = e_end_resync_block;
1442
1443        spin_lock_irq(&mdev->req_lock);
1444        list_add(&e->w.list, &mdev->sync_ee);
1445        spin_unlock_irq(&mdev->req_lock);
1446
1447        atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1448        if (drbd_submit_ee(mdev, e, WRITE, DRBD_FAULT_RS_WR) == 0)
1449                return TRUE;
1450
1451        /* drbd_submit_ee currently fails for one reason only:
1452         * not being able to allocate enough bios.
1453         * Is dropping the connection going to help? */
1454        spin_lock_irq(&mdev->req_lock);
1455        list_del(&e->w.list);
1456        spin_unlock_irq(&mdev->req_lock);
1457
1458        drbd_free_ee(mdev, e);
1459fail:
1460        put_ldev(mdev);
1461        return FALSE;
1462}
1463
1464static int receive_DataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1465{
1466        struct drbd_request *req;
1467        sector_t sector;
1468        int ok;
1469        struct p_data *p = &mdev->data.rbuf.data;
1470
1471        sector = be64_to_cpu(p->sector);
1472
1473        spin_lock_irq(&mdev->req_lock);
1474        req = _ar_id_to_req(mdev, p->block_id, sector);
1475        spin_unlock_irq(&mdev->req_lock);
1476        if (unlikely(!req)) {
1477                dev_err(DEV, "Got a corrupt block_id/sector pair(1).\n");
1478                return FALSE;
1479        }
1480
1481        /* hlist_del(&req->colision) is done in _req_may_be_done, to avoid
1482         * special casing it there for the various failure cases.
1483         * still no race with drbd_fail_pending_reads */
1484        ok = recv_dless_read(mdev, req, sector, data_size);
1485
1486        if (ok)
1487                req_mod(req, data_received);
1488        /* else: nothing. handled from drbd_disconnect...
1489         * I don't think we may complete this just yet
1490         * in case we are "on-disconnect: freeze" */
1491
1492        return ok;
1493}
1494
1495static int receive_RSDataReply(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1496{
1497        sector_t sector;
1498        int ok;
1499        struct p_data *p = &mdev->data.rbuf.data;
1500
1501        sector = be64_to_cpu(p->sector);
1502        D_ASSERT(p->block_id == ID_SYNCER);
1503
1504        if (get_ldev(mdev)) {
1505                /* data is submitted to disk within recv_resync_read.
1506                 * corresponding put_ldev done below on error,
1507                 * or in drbd_endio_write_sec. */
1508                ok = recv_resync_read(mdev, sector, data_size);
1509        } else {
1510                if (__ratelimit(&drbd_ratelimit_state))
1511                        dev_err(DEV, "Can not write resync data to local disk.\n");
1512
1513                ok = drbd_drain_block(mdev, data_size);
1514
1515                drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1516        }
1517
1518        atomic_add(data_size >> 9, &mdev->rs_sect_in);
1519
1520        return ok;
1521}
1522
1523/* e_end_block() is called via drbd_process_done_ee().
1524 * this means this function only runs in the asender thread
1525 */
1526static int e_end_block(struct drbd_conf *mdev, struct drbd_work *w, int cancel)
1527{
1528        struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1529        sector_t sector = e->sector;
1530        int ok = 1, pcmd;
1531
1532        if (mdev->net_conf->wire_protocol == DRBD_PROT_C) {
1533                if (likely((e->flags & EE_WAS_ERROR) == 0)) {
1534                        pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1535                                mdev->state.conn <= C_PAUSED_SYNC_T &&
1536                                e->flags & EE_MAY_SET_IN_SYNC) ?
1537                                P_RS_WRITE_ACK : P_WRITE_ACK;
1538                        ok &= drbd_send_ack(mdev, pcmd, e);
1539                        if (pcmd == P_RS_WRITE_ACK)
1540                                drbd_set_in_sync(mdev, sector, e->size);
1541                } else {
1542                        ok  = drbd_send_ack(mdev, P_NEG_ACK, e);
1543                        /* we expect it to be marked out of sync anyways...
1544                         * maybe assert this?  */
1545                }
1546                dec_unacked(mdev);
1547        }
1548        /* we delete from the conflict detection hash _after_ we sent out the
1549         * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1550        if (mdev->net_conf->two_primaries) {
1551                spin_lock_irq(&mdev->req_lock);
1552                D_ASSERT(!hlist_unhashed(&e->colision));
1553                hlist_del_init(&e->colision);
1554                spin_unlock_irq(&mdev->req_lock);
1555        } else {
1556                D_ASSERT(hlist_unhashed(&e->colision));
1557        }
1558
1559        drbd_may_finish_epoch(mdev, e->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1560
1561        return ok;
1562}
1563
1564static int e_send_discard_ack(struct drbd_conf *mdev, struct drbd_work *w, int unused)
1565{
1566        struct drbd_epoch_entry *e = (struct drbd_epoch_entry *)w;
1567        int ok = 1;
1568
1569        D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1570        ok = drbd_send_ack(mdev, P_DISCARD_ACK, e);
1571
1572        spin_lock_irq(&mdev->req_lock);
1573        D_ASSERT(!hlist_unhashed(&e->colision));
1574        hlist_del_init(&e->colision);
1575        spin_unlock_irq(&mdev->req_lock);
1576
1577        dec_unacked(mdev);
1578
1579        return ok;
1580}
1581
1582/* Called from receive_Data.
1583 * Synchronize packets on sock with packets on msock.
1584 *
1585 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1586 * packet traveling on msock, they are still processed in the order they have
1587 * been sent.
1588 *
1589 * Note: we don't care for Ack packets overtaking P_DATA packets.
1590 *
1591 * In case packet_seq is larger than mdev->peer_seq number, there are
1592 * outstanding packets on the msock. We wait for them to arrive.
1593 * In case we are the logically next packet, we update mdev->peer_seq
1594 * ourselves. Correctly handles 32bit wrap around.
1595 *
1596 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1597 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1598 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1599 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1600 *
1601 * returns 0 if we may process the packet,
1602 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1603static int drbd_wait_peer_seq(struct drbd_conf *mdev, const u32 packet_seq)
1604{
1605        DEFINE_WAIT(wait);
1606        unsigned int p_seq;
1607        long timeout;
1608        int ret = 0;
1609        spin_lock(&mdev->peer_seq_lock);
1610        for (;;) {
1611                prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1612                if (seq_le(packet_seq, mdev->peer_seq+1))
1613                        break;
1614                if (signal_pending(current)) {
1615                        ret = -ERESTARTSYS;
1616                        break;
1617                }
1618                p_seq = mdev->peer_seq;
1619                spin_unlock(&mdev->peer_seq_lock);
1620                timeout = schedule_timeout(30*HZ);
1621                spin_lock(&mdev->peer_seq_lock);
1622                if (timeout == 0 && p_seq == mdev->peer_seq) {
1623                        ret = -ETIMEDOUT;
1624                        dev_err(DEV, "ASSERT FAILED waited 30 seconds for sequence update, forcing reconnect\n");
1625                        break;
1626                }
1627        }
1628        finish_wait(&mdev->seq_wait, &wait);
1629        if (mdev->peer_seq+1 == packet_seq)
1630                mdev->peer_seq++;
1631        spin_unlock(&mdev->peer_seq_lock);
1632        return ret;
1633}
1634
1635static unsigned long write_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
1636{
1637        if (mdev->agreed_pro_version >= 95)
1638                return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
1639                        (dpf & DP_UNPLUG ? REQ_UNPLUG : 0) |
1640                        (dpf & DP_FUA ? REQ_FUA : 0) |
1641                        (dpf & DP_FLUSH ? REQ_FUA : 0) |
1642                        (dpf & DP_DISCARD ? REQ_DISCARD : 0);
1643        else
1644                return dpf & DP_RW_SYNC ? (REQ_SYNC | REQ_UNPLUG) : 0;
1645}
1646
1647/* mirrored write */
1648static int receive_Data(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
1649{
1650        sector_t sector;
1651        struct drbd_epoch_entry *e;
1652        struct p_data *p = &mdev->data.rbuf.data;
1653        int rw = WRITE;
1654        u32 dp_flags;
1655
1656        if (!get_ldev(mdev)) {
1657                if (__ratelimit(&drbd_ratelimit_state))
1658                        dev_err(DEV, "Can not write mirrored data block "
1659                            "to local disk.\n");
1660                spin_lock(&mdev->peer_seq_lock);
1661                if (mdev->peer_seq+1 == be32_to_cpu(p->seq_num))
1662                        mdev->peer_seq++;
1663                spin_unlock(&mdev->peer_seq_lock);
1664
1665                drbd_send_ack_dp(mdev, P_NEG_ACK, p, data_size);
1666                atomic_inc(&mdev->current_epoch->epoch_size);
1667                return drbd_drain_block(mdev, data_size);
1668        }
1669
1670        /* get_ldev(mdev) successful.
1671         * Corresponding put_ldev done either below (on various errors),
1672         * or in drbd_endio_write_sec, if we successfully submit the data at
1673         * the end of this function. */
1674
1675        sector = be64_to_cpu(p->sector);
1676        e = read_in_block(mdev, p->block_id, sector, data_size);
1677        if (!e) {
1678                put_ldev(mdev);
1679                return FALSE;
1680        }
1681
1682        e->w.cb = e_end_block;
1683
1684        spin_lock(&mdev->epoch_lock);
1685        e->epoch = mdev->current_epoch;
1686        atomic_inc(&e->epoch->epoch_size);
1687        atomic_inc(&e->epoch->active);
1688        spin_unlock(&mdev->epoch_lock);
1689
1690        dp_flags = be32_to_cpu(p->dp_flags);
1691        rw |= write_flags_to_bio(mdev, dp_flags);
1692
1693        if (dp_flags & DP_MAY_SET_IN_SYNC)
1694                e->flags |= EE_MAY_SET_IN_SYNC;
1695
1696        /* I'm the receiver, I do hold a net_cnt reference. */
1697        if (!mdev->net_conf->two_primaries) {
1698                spin_lock_irq(&mdev->req_lock);
1699        } else {
1700                /* don't get the req_lock yet,
1701                 * we may sleep in drbd_wait_peer_seq */
1702                const int size = e->size;
1703                const int discard = test_bit(DISCARD_CONCURRENT, &mdev->flags);
1704                DEFINE_WAIT(wait);
1705                struct drbd_request *i;
1706                struct hlist_node *n;
1707                struct hlist_head *slot;
1708                int first;
1709
1710                D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
1711                BUG_ON(mdev->ee_hash == NULL);
1712                BUG_ON(mdev->tl_hash == NULL);
1713
1714                /* conflict detection and handling:
1715                 * 1. wait on the sequence number,
1716                 *    in case this data packet overtook ACK packets.
1717                 * 2. check our hash tables for conflicting requests.
1718                 *    we only need to walk the tl_hash, since an ee can not
1719                 *    have a conflict with an other ee: on the submitting
1720                 *    node, the corresponding req had already been conflicting,
1721                 *    and a conflicting req is never sent.
1722                 *
1723                 * Note: for two_primaries, we are protocol C,
1724                 * so there cannot be any request that is DONE
1725                 * but still on the transfer log.
1726                 *
1727                 * unconditionally add to the ee_hash.
1728                 *
1729                 * if no conflicting request is found:
1730                 *    submit.
1731                 *
1732                 * if any conflicting request is found
1733                 * that has not yet been acked,
1734                 * AND I have the "discard concurrent writes" flag:
1735                 *       queue (via done_ee) the P_DISCARD_ACK; OUT.
1736                 *
1737                 * if any conflicting request is found:
1738                 *       block the receiver, waiting on misc_wait
1739                 *       until no more conflicting requests are there,
1740                 *       or we get interrupted (disconnect).
1741                 *
1742                 *       we do not just write after local io completion of those
1743                 *       requests, but only after req is done completely, i.e.
1744                 *       we wait for the P_DISCARD_ACK to arrive!
1745                 *
1746                 *       then proceed normally, i.e. submit.
1747                 */
1748                if (drbd_wait_peer_seq(mdev, be32_to_cpu(p->seq_num)))
1749                        goto out_interrupted;
1750
1751                spin_lock_irq(&mdev->req_lock);
1752
1753                hlist_add_head(&e->colision, ee_hash_slot(mdev, sector));
1754
1755#define OVERLAPS overlaps(i->sector, i->size, sector, size)
1756                slot = tl_hash_slot(mdev, sector);
1757                first = 1;
1758                for (;;) {
1759                        int have_unacked = 0;
1760                        int have_conflict = 0;
1761                        prepare_to_wait(&mdev->misc_wait, &wait,
1762                                TASK_INTERRUPTIBLE);
1763                        hlist_for_each_entry(i, n, slot, colision) {
1764                                if (OVERLAPS) {
1765                                        /* only ALERT on first iteration,
1766                                         * we may be woken up early... */
1767                                        if (first)
1768                                                dev_alert(DEV, "%s[%u] Concurrent local write detected!"
1769                                                      " new: %llus +%u; pending: %llus +%u\n",
1770                                                      current->comm, current->pid,
1771                                                      (unsigned long long)sector, size,
1772                                                      (unsigned long long)i->sector, i->size);
1773                                        if (i->rq_state & RQ_NET_PENDING)
1774                                                ++have_unacked;
1775                                        ++have_conflict;
1776                                }
1777                        }
1778#undef OVERLAPS
1779                        if (!have_conflict)
1780                                break;
1781
1782                        /* Discard Ack only for the _first_ iteration */
1783                        if (first && discard && have_unacked) {
1784                                dev_alert(DEV, "Concurrent write! [DISCARD BY FLAG] sec=%llus\n",
1785                                     (unsigned long long)sector);
1786                                inc_unacked(mdev);
1787                                e->w.cb = e_send_discard_ack;
1788                                list_add_tail(&e->w.list, &mdev->done_ee);
1789
1790                                spin_unlock_irq(&mdev->req_lock);
1791
1792                                /* we could probably send that P_DISCARD_ACK ourselves,
1793                                 * but I don't like the receiver using the msock */
1794
1795                                put_ldev(mdev);
1796                                wake_asender(mdev);
1797                                finish_wait(&mdev->misc_wait, &wait);
1798                                return TRUE;
1799                        }
1800
1801                        if (signal_pending(current)) {
1802                                hlist_del_init(&e->colision);
1803
1804                                spin_unlock_irq(&mdev->req_lock);
1805
1806                                finish_wait(&mdev->misc_wait, &wait);
1807                                goto out_interrupted;
1808                        }
1809
1810                        spin_unlock_irq(&mdev->req_lock);
1811                        if (first) {
1812                                first = 0;
1813                                dev_alert(DEV, "Concurrent write! [W AFTERWARDS] "
1814                                     "sec=%llus\n", (unsigned long long)sector);
1815                        } else if (discard) {
1816                                /* we had none on the first iteration.
1817                                 * there must be none now. */
1818                                D_ASSERT(have_unacked == 0);
1819                        }
1820                        schedule();
1821                        spin_lock_irq(&mdev->req_lock);
1822                }
1823                finish_wait(&mdev->misc_wait, &wait);
1824        }
1825
1826        list_add(&e->w.list, &mdev->active_ee);
1827        spin_unlock_irq(&mdev->req_lock);
1828
1829        switch (mdev->net_conf->wire_protocol) {
1830        case DRBD_PROT_C:
1831                inc_unacked(mdev);
1832                /* corresponding dec_unacked() in e_end_block()
1833                 * respective _drbd_clear_done_ee */
1834                break;
1835        case DRBD_PROT_B:
1836                /* I really don't like it that the receiver thread
1837                 * sends on the msock, but anyways */
1838                drbd_send_ack(mdev, P_RECV_ACK, e);
1839                break;
1840        case DRBD_PROT_A:
1841                /* nothing to do */
1842                break;
1843        }
1844
1845        if (mdev->state.pdsk < D_INCONSISTENT) {
1846                /* In case we have the only disk of the cluster, */
1847                drbd_set_out_of_sync(mdev, e->sector, e->size);
1848                e->flags |= EE_CALL_AL_COMPLETE_IO;
1849                e->flags &= ~EE_MAY_SET_IN_SYNC;
1850                drbd_al_begin_io(mdev, e->sector);
1851        }
1852
1853        if (drbd_submit_ee(mdev, e, rw, DRBD_FAULT_DT_WR) == 0)
1854                return TRUE;
1855
1856        /* drbd_submit_ee currently fails for one reason only:
1857         * not being able to allocate enough bios.
1858         * Is dropping the connection going to help? */
1859        spin_lock_irq(&mdev->req_lock);
1860        list_del(&e->w.list);
1861        hlist_del_init(&e->colision);
1862        spin_unlock_irq(&mdev->req_lock);
1863        if (e->flags & EE_CALL_AL_COMPLETE_IO)
1864                drbd_al_complete_io(mdev, e->sector);
1865
1866out_interrupted:
1867        /* yes, the epoch_size now is imbalanced.
1868         * but we drop the connection anyways, so we don't have a chance to
1869         * receive a barrier... atomic_inc(&mdev->epoch_size); */
1870        put_ldev(mdev);
1871        drbd_free_ee(mdev, e);
1872        return FALSE;
1873}
1874
1875/* We may throttle resync, if the lower device seems to be busy,
1876 * and current sync rate is above c_min_rate.
1877 *
1878 * To decide whether or not the lower device is busy, we use a scheme similar
1879 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
1880 * (more than 64 sectors) of activity we cannot account for with our own resync
1881 * activity, it obviously is "busy".
1882 *
1883 * The current sync rate used here uses only the most recent two step marks,
1884 * to have a short time average so we can react faster.
1885 */
1886int drbd_rs_should_slow_down(struct drbd_conf *mdev)
1887{
1888        struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
1889        unsigned long db, dt, dbdt;
1890        int curr_events;
1891        int throttle = 0;
1892
1893        /* feature disabled? */
1894        if (mdev->sync_conf.c_min_rate == 0)
1895                return 0;
1896
1897        curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
1898                      (int)part_stat_read(&disk->part0, sectors[1]) -
1899                        atomic_read(&mdev->rs_sect_ev);
1900        if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
1901                unsigned long rs_left;
1902                int i;
1903
1904                mdev->rs_last_events = curr_events;
1905
1906                /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
1907                 * approx. */
1908                i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-2) % DRBD_SYNC_MARKS;
1909                rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
1910
1911                dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
1912                if (!dt)
1913                        dt++;
1914                db = mdev->rs_mark_left[i] - rs_left;
1915                dbdt = Bit2KB(db/dt);
1916
1917                if (dbdt > mdev->sync_conf.c_min_rate)
1918                        throttle = 1;
1919        }
1920        return throttle;
1921}
1922
1923
1924static int receive_DataRequest(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int digest_size)
1925{
1926        sector_t sector;
1927        const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1928        struct drbd_epoch_entry *e;
1929        struct digest_info *di = NULL;
1930        int size, verb;
1931        unsigned int fault_type;
1932        struct p_block_req *p = &mdev->data.rbuf.block_req;
1933
1934        sector = be64_to_cpu(p->sector);
1935        size   = be32_to_cpu(p->blksize);
1936
1937        if (size <= 0 || (size & 0x1ff) != 0 || size > DRBD_MAX_SEGMENT_SIZE) {
1938                dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1939                                (unsigned long long)sector, size);
1940                return FALSE;
1941        }
1942        if (sector + (size>>9) > capacity) {
1943                dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
1944                                (unsigned long long)sector, size);
1945                return FALSE;
1946        }
1947
1948        if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
1949                verb = 1;
1950                switch (cmd) {
1951                case P_DATA_REQUEST:
1952                        drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
1953                        break;
1954                case P_RS_DATA_REQUEST:
1955                case P_CSUM_RS_REQUEST:
1956                case P_OV_REQUEST:
1957                        drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
1958                        break;
1959                case P_OV_REPLY:
1960                        verb = 0;
1961                        dec_rs_pending(mdev);
1962                        drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
1963                        break;
1964                default:
1965                        dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
1966                                cmdname(cmd));
1967                }
1968                if (verb && __ratelimit(&drbd_ratelimit_state))
1969                        dev_err(DEV, "Can not satisfy peer's read request, "
1970                            "no local data.\n");
1971
1972                /* drain possibly payload */
1973                return drbd_drain_block(mdev, digest_size);
1974        }
1975
1976        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1977         * "criss-cross" setup, that might cause write-out on some other DRBD,
1978         * which in turn might block on the other node at this very place.  */
1979        e = drbd_alloc_ee(mdev, p->block_id, sector, size, GFP_NOIO);
1980        if (!e) {
1981                put_ldev(mdev);
1982                return FALSE;
1983        }
1984
1985        switch (cmd) {
1986        case P_DATA_REQUEST:
1987                e->w.cb = w_e_end_data_req;
1988                fault_type = DRBD_FAULT_DT_RD;
1989                /* application IO, don't drbd_rs_begin_io */
1990                goto submit;
1991
1992        case P_RS_DATA_REQUEST:
1993                e->w.cb = w_e_end_rsdata_req;
1994                fault_type = DRBD_FAULT_RS_RD;
1995                break;
1996
1997        case P_OV_REPLY:
1998        case P_CSUM_RS_REQUEST:
1999                fault_type = DRBD_FAULT_RS_RD;
2000                di = kmalloc(sizeof(*di) + digest_size, GFP_NOIO);
2001                if (!di)
2002                        goto out_free_e;
2003
2004                di->digest_size = digest_size;
2005                di->digest = (((char *)di)+sizeof(struct digest_info));
2006
2007                e->digest = di;
2008                e->flags |= EE_HAS_DIGEST;
2009
2010                if (drbd_recv(mdev, di->digest, digest_size) != digest_size)
2011                        goto out_free_e;
2012
2013                if (cmd == P_CSUM_RS_REQUEST) {
2014                        D_ASSERT(mdev->agreed_pro_version >= 89);
2015                        e->w.cb = w_e_end_csum_rs_req;
2016                } else if (cmd == P_OV_REPLY) {
2017                        e->w.cb = w_e_end_ov_reply;
2018                        dec_rs_pending(mdev);
2019                        /* drbd_rs_begin_io done when we sent this request,
2020                         * but accounting still needs to be done. */
2021                        goto submit_for_resync;
2022                }
2023                break;
2024
2025        case P_OV_REQUEST:
2026                if (mdev->ov_start_sector == ~(sector_t)0 &&
2027                    mdev->agreed_pro_version >= 90) {
2028                        mdev->ov_start_sector = sector;
2029                        mdev->ov_position = sector;
2030                        mdev->ov_left = mdev->rs_total - BM_SECT_TO_BIT(sector);
2031                        dev_info(DEV, "Online Verify start sector: %llu\n",
2032                                        (unsigned long long)sector);
2033                }
2034                e->w.cb = w_e_end_ov_req;
2035                fault_type = DRBD_FAULT_RS_RD;
2036                break;
2037
2038        default:
2039                dev_err(DEV, "unexpected command (%s) in receive_DataRequest\n",
2040                    cmdname(cmd));
2041                fault_type = DRBD_FAULT_MAX;
2042                goto out_free_e;
2043        }
2044
2045        /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2046         * wrt the receiver, but it is not as straightforward as it may seem.
2047         * Various places in the resync start and stop logic assume resync
2048         * requests are processed in order, requeuing this on the worker thread
2049         * introduces a bunch of new code for synchronization between threads.
2050         *
2051         * Unlimited throttling before drbd_rs_begin_io may stall the resync
2052         * "forever", throttling after drbd_rs_begin_io will lock that extent
2053         * for application writes for the same time.  For now, just throttle
2054         * here, where the rest of the code expects the receiver to sleep for
2055         * a while, anyways.
2056         */
2057
2058        /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2059         * this defers syncer requests for some time, before letting at least
2060         * on request through.  The resync controller on the receiving side
2061         * will adapt to the incoming rate accordingly.
2062         *
2063         * We cannot throttle here if remote is Primary/SyncTarget:
2064         * we would also throttle its application reads.
2065         * In that case, throttling is done on the SyncTarget only.
2066         */
2067        if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev))
2068                msleep(100);
2069        if (drbd_rs_begin_io(mdev, e->sector))
2070                goto out_free_e;
2071
2072submit_for_resync:
2073        atomic_add(size >> 9, &mdev->rs_sect_ev);
2074
2075submit:
2076        inc_unacked(mdev);
2077        spin_lock_irq(&mdev->req_lock);
2078        list_add_tail(&e->w.list, &mdev->read_ee);
2079        spin_unlock_irq(&mdev->req_lock);
2080
2081        if (drbd_submit_ee(mdev, e, READ, fault_type) == 0)
2082                return TRUE;
2083
2084        /* drbd_submit_ee currently fails for one reason only:
2085         * not being able to allocate enough bios.
2086         * Is dropping the connection going to help? */
2087        spin_lock_irq(&mdev->req_lock);
2088        list_del(&e->w.list);
2089        spin_unlock_irq(&mdev->req_lock);
2090        /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2091
2092out_free_e:
2093        put_ldev(mdev);
2094        drbd_free_ee(mdev, e);
2095        return FALSE;
2096}
2097
2098static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2099{
2100        int self, peer, rv = -100;
2101        unsigned long ch_self, ch_peer;
2102
2103        self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2104        peer = mdev->p_uuid[UI_BITMAP] & 1;
2105
2106        ch_peer = mdev->p_uuid[UI_SIZE];
2107        ch_self = mdev->comm_bm_set;
2108
2109        switch (mdev->net_conf->after_sb_0p) {
2110        case ASB_CONSENSUS:
2111        case ASB_DISCARD_SECONDARY:
2112        case ASB_CALL_HELPER:
2113                dev_err(DEV, "Configuration error.\n");
2114                break;
2115        case ASB_DISCONNECT:
2116                break;
2117        case ASB_DISCARD_YOUNGER_PRI:
2118                if (self == 0 && peer == 1) {
2119                        rv = -1;
2120                        break;
2121                }
2122                if (self == 1 && peer == 0) {
2123                        rv =  1;
2124                        break;
2125                }
2126                /* Else fall through to one of the other strategies... */
2127        case ASB_DISCARD_OLDER_PRI:
2128                if (self == 0 && peer == 1) {
2129                        rv = 1;
2130                        break;
2131                }
2132                if (self == 1 && peer == 0) {
2133                        rv = -1;
2134                        break;
2135                }
2136                /* Else fall through to one of the other strategies... */
2137                dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2138                     "Using discard-least-changes instead\n");
2139        case ASB_DISCARD_ZERO_CHG:
2140                if (ch_peer == 0 && ch_self == 0) {
2141                        rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2142                                ? -1 : 1;
2143                        break;
2144                } else {
2145                        if (ch_peer == 0) { rv =  1; break; }
2146                        if (ch_self == 0) { rv = -1; break; }
2147                }
2148                if (mdev->net_conf->after_sb_0p == ASB_DISCARD_ZERO_CHG)
2149                        break;
2150        case ASB_DISCARD_LEAST_CHG:
2151                if      (ch_self < ch_peer)
2152                        rv = -1;
2153                else if (ch_self > ch_peer)
2154                        rv =  1;
2155                else /* ( ch_self == ch_peer ) */
2156                     /* Well, then use something else. */
2157                        rv = test_bit(DISCARD_CONCURRENT, &mdev->flags)
2158                                ? -1 : 1;
2159                break;
2160        case ASB_DISCARD_LOCAL:
2161                rv = -1;
2162                break;
2163        case ASB_DISCARD_REMOTE:
2164                rv =  1;
2165        }
2166
2167        return rv;
2168}
2169
2170static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2171{
2172        int self, peer, hg, rv = -100;
2173
2174        self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2175        peer = mdev->p_uuid[UI_BITMAP] & 1;
2176
2177        switch (mdev->net_conf->after_sb_1p) {
2178        case ASB_DISCARD_YOUNGER_PRI:
2179        case ASB_DISCARD_OLDER_PRI:
2180        case ASB_DISCARD_LEAST_CHG:
2181        case ASB_DISCARD_LOCAL:
2182        case ASB_DISCARD_REMOTE:
2183                dev_err(DEV, "Configuration error.\n");
2184                break;
2185        case ASB_DISCONNECT:
2186                break;
2187        case ASB_CONSENSUS:
2188                hg = drbd_asb_recover_0p(mdev);
2189                if (hg == -1 && mdev->state.role == R_SECONDARY)
2190                        rv = hg;
2191                if (hg == 1  && mdev->state.role == R_PRIMARY)
2192                        rv = hg;
2193                break;
2194        case ASB_VIOLENTLY:
2195                rv = drbd_asb_recover_0p(mdev);
2196                break;
2197        case ASB_DISCARD_SECONDARY:
2198                return mdev->state.role == R_PRIMARY ? 1 : -1;
2199        case ASB_CALL_HELPER:
2200                hg = drbd_asb_recover_0p(mdev);
2201                if (hg == -1 && mdev->state.role == R_PRIMARY) {
2202                        self = drbd_set_role(mdev, R_SECONDARY, 0);
2203                         /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2204                          * we might be here in C_WF_REPORT_PARAMS which is transient.
2205                          * we do not need to wait for the after state change work either. */
2206                        self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2207                        if (self != SS_SUCCESS) {
2208                                drbd_khelper(mdev, "pri-lost-after-sb");
2209                        } else {
2210                                dev_warn(DEV, "Successfully gave up primary role.\n");
2211                                rv = hg;
2212                        }
2213                } else
2214                        rv = hg;
2215        }
2216
2217        return rv;
2218}
2219
2220static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2221{
2222        int self, peer, hg, rv = -100;
2223
2224        self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2225        peer = mdev->p_uuid[UI_BITMAP] & 1;
2226
2227        switch (mdev->net_conf->after_sb_2p) {
2228        case ASB_DISCARD_YOUNGER_PRI:
2229        case ASB_DISCARD_OLDER_PRI:
2230        case ASB_DISCARD_LEAST_CHG:
2231        case ASB_DISCARD_LOCAL:
2232        case ASB_DISCARD_REMOTE:
2233        case ASB_CONSENSUS:
2234        case ASB_DISCARD_SECONDARY:
2235                dev_err(DEV, "Configuration error.\n");
2236                break;
2237        case ASB_VIOLENTLY:
2238                rv = drbd_asb_recover_0p(mdev);
2239                break;
2240        case ASB_DISCONNECT:
2241                break;
2242        case ASB_CALL_HELPER:
2243                hg = drbd_asb_recover_0p(mdev);
2244                if (hg == -1) {
2245                         /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2246                          * we might be here in C_WF_REPORT_PARAMS which is transient.
2247                          * we do not need to wait for the after state change work either. */
2248                        self = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2249                        if (self != SS_SUCCESS) {
2250                                drbd_khelper(mdev, "pri-lost-after-sb");
2251                        } else {
2252                                dev_warn(DEV, "Successfully gave up primary role.\n");
2253                                rv = hg;
2254                        }
2255                } else
2256                        rv = hg;
2257        }
2258
2259        return rv;
2260}
2261
2262static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2263                           u64 bits, u64 flags)
2264{
2265        if (!uuid) {
2266                dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2267                return;
2268        }
2269        dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2270             text,
2271             (unsigned long long)uuid[UI_CURRENT],
2272             (unsigned long long)uuid[UI_BITMAP],
2273             (unsigned long long)uuid[UI_HISTORY_START],
2274             (unsigned long long)uuid[UI_HISTORY_END],
2275             (unsigned long long)bits,
2276             (unsigned long long)flags);
2277}
2278
2279/*
2280  100   after split brain try auto recover
2281    2   C_SYNC_SOURCE set BitMap
2282    1   C_SYNC_SOURCE use BitMap
2283    0   no Sync
2284   -1   C_SYNC_TARGET use BitMap
2285   -2   C_SYNC_TARGET set BitMap
2286 -100   after split brain, disconnect
2287-1000   unrelated data
2288 */
2289static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2290{
2291        u64 self, peer;
2292        int i, j;
2293
2294        self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2295        peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2296
2297        *rule_nr = 10;
2298        if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2299                return 0;
2300
2301        *rule_nr = 20;
2302        if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2303             peer != UUID_JUST_CREATED)
2304                return -2;
2305
2306        *rule_nr = 30;
2307        if (self != UUID_JUST_CREATED &&
2308            (peer == UUID_JUST_CREATED || peer == (u64)0))
2309                return 2;
2310
2311        if (self == peer) {
2312                int rct, dc; /* roles at crash time */
2313
2314                if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2315
2316                        if (mdev->agreed_pro_version < 91)
2317                                return -1001;
2318
2319                        if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2320                            (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2321                                dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2322                                drbd_uuid_set_bm(mdev, 0UL);
2323
2324                                drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2325                                               mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2326                                *rule_nr = 34;
2327                        } else {
2328                                dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2329                                *rule_nr = 36;
2330                        }
2331
2332                        return 1;
2333                }
2334
2335                if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2336
2337                        if (mdev->agreed_pro_version < 91)
2338                                return -1001;
2339
2340                        if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2341                            (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2342                                dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2343
2344                                mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2345                                mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2346                                mdev->p_uuid[UI_BITMAP] = 0UL;
2347
2348                                drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2349                                *rule_nr = 35;
2350                        } else {
2351                                dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2352                                *rule_nr = 37;
2353                        }
2354
2355                        return -1;
2356                }
2357
2358                /* Common power [off|failure] */
2359                rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2360                        (mdev->p_uuid[UI_FLAGS] & 2);
2361                /* lowest bit is set when we were primary,
2362                 * next bit (weight 2) is set when peer was primary */
2363                *rule_nr = 40;
2364
2365                switch (rct) {
2366                case 0: /* !self_pri && !peer_pri */ return 0;
2367                case 1: /*  self_pri && !peer_pri */ return 1;
2368                case 2: /* !self_pri &&  peer_pri */ return -1;
2369                case 3: /*  self_pri &&  peer_pri */
2370                        dc = test_bit(DISCARD_CONCURRENT, &mdev->flags);
2371                        return dc ? -1 : 1;
2372                }
2373        }
2374
2375        *rule_nr = 50;
2376        peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2377        if (self == peer)
2378                return -1;
2379
2380        *rule_nr = 51;
2381        peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2382        if (self == peer) {
2383                self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2384                peer = mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1);
2385                if (self == peer) {
2386                        /* The last P_SYNC_UUID did not get though. Undo the last start of
2387                           resync as sync source modifications of the peer's UUIDs. */
2388
2389                        if (mdev->agreed_pro_version < 91)
2390                                return -1001;
2391
2392                        mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2393                        mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2394                        return -1;
2395                }
2396        }
2397
2398        *rule_nr = 60;
2399        self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2400        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2401                peer = mdev->p_uuid[i] & ~((u64)1);
2402                if (self == peer)
2403                        return -2;
2404        }
2405
2406        *rule_nr = 70;
2407        self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2408        peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2409        if (self == peer)
2410                return 1;
2411
2412        *rule_nr = 71;
2413        self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2414        if (self == peer) {
2415                self = mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1);
2416                peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2417                if (self == peer) {
2418                        /* The last P_SYNC_UUID did not get though. Undo the last start of
2419                           resync as sync source modifications of our UUIDs. */
2420
2421                        if (mdev->agreed_pro_version < 91)
2422                                return -1001;
2423
2424                        _drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2425                        _drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2426
2427                        dev_info(DEV, "Undid last start of resync:\n");
2428
2429                        drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2430                                       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2431
2432                        return 1;
2433                }
2434        }
2435
2436
2437        *rule_nr = 80;
2438        peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2439        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2440                self = mdev->ldev->md.uuid[i] & ~((u64)1);
2441                if (self == peer)
2442                        return 2;
2443        }
2444
2445        *rule_nr = 90;
2446        self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2447        peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2448        if (self == peer && self != ((u64)0))
2449                return 100;
2450
2451        *rule_nr = 100;
2452        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2453                self = mdev->ldev->md.uuid[i] & ~((u64)1);
2454                for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2455                        peer = mdev->p_uuid[j] & ~((u64)1);
2456                        if (self == peer)
2457                                return -100;
2458                }
2459        }
2460
2461        return -1000;
2462}
2463
2464/* drbd_sync_handshake() returns the new conn state on success, or
2465   CONN_MASK (-1) on failure.
2466 */
2467static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2468                                           enum drbd_disk_state peer_disk) __must_hold(local)
2469{
2470        int hg, rule_nr;
2471        enum drbd_conns rv = C_MASK;
2472        enum drbd_disk_state mydisk;
2473
2474        mydisk = mdev->state.disk;
2475        if (mydisk == D_NEGOTIATING)
2476                mydisk = mdev->new_state_tmp.disk;
2477
2478        dev_info(DEV, "drbd_sync_handshake:\n");
2479        drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2480        drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2481                       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2482
2483        hg = drbd_uuid_compare(mdev, &rule_nr);
2484
2485        dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2486
2487        if (hg == -1000) {
2488                dev_alert(DEV, "Unrelated data, aborting!\n");
2489                return C_MASK;
2490        }
2491        if (hg == -1001) {
2492                dev_alert(DEV, "To resolve this both sides have to support at least protocol\n");
2493                return C_MASK;
2494        }
2495
2496        if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2497            (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2498                int f = (hg == -100) || abs(hg) == 2;
2499                hg = mydisk > D_INCONSISTENT ? 1 : -1;
2500                if (f)
2501                        hg = hg*2;
2502                dev_info(DEV, "Becoming sync %s due to disk states.\n",
2503                     hg > 0 ? "source" : "target");
2504        }
2505
2506        if (abs(hg) == 100)
2507                drbd_khelper(mdev, "initial-split-brain");
2508
2509        if (hg == 100 || (hg == -100 && mdev->net_conf->always_asbp)) {
2510                int pcount = (mdev->state.role == R_PRIMARY)
2511                           + (peer_role == R_PRIMARY);
2512                int forced = (hg == -100);
2513
2514                switch (pcount) {
2515                case 0:
2516                        hg = drbd_asb_recover_0p(mdev);
2517                        break;
2518                case 1:
2519                        hg = drbd_asb_recover_1p(mdev);
2520                        break;
2521                case 2:
2522                        hg = drbd_asb_recover_2p(mdev);
2523                        break;
2524                }
2525                if (abs(hg) < 100) {
2526                        dev_warn(DEV, "Split-Brain detected, %d primaries, "
2527                             "automatically solved. Sync from %s node\n",
2528                             pcount, (hg < 0) ? "peer" : "this");
2529                        if (forced) {
2530                                dev_warn(DEV, "Doing a full sync, since"
2531                                     " UUIDs where ambiguous.\n");
2532                                hg = hg*2;
2533                        }
2534                }
2535        }
2536
2537        if (hg == -100) {
2538                if (mdev->net_conf->want_lose && !(mdev->p_uuid[UI_FLAGS]&1))
2539                        hg = -1;
2540                if (!mdev->net_conf->want_lose && (mdev->p_uuid[UI_FLAGS]&1))
2541                        hg = 1;
2542
2543                if (abs(hg) < 100)
2544                        dev_warn(DEV, "Split-Brain detected, manually solved. "
2545                             "Sync from %s node\n",
2546                             (hg < 0) ? "peer" : "this");
2547        }
2548
2549        if (hg == -100) {
2550                /* FIXME this log message is not correct if we end up here
2551                 * after an attempted attach on a diskless node.
2552                 * We just refuse to attach -- well, we drop the "connection"
2553                 * to that disk, in a way... */
2554                dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
2555                drbd_khelper(mdev, "split-brain");
2556                return C_MASK;
2557        }
2558
2559        if (hg > 0 && mydisk <= D_INCONSISTENT) {
2560                dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
2561                return C_MASK;
2562        }
2563
2564        if (hg < 0 && /* by intention we do not use mydisk here. */
2565            mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
2566                switch (mdev->net_conf->rr_conflict) {
2567                case ASB_CALL_HELPER:
2568                        drbd_khelper(mdev, "pri-lost");
2569                        /* fall through */
2570                case ASB_DISCONNECT:
2571                        dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
2572                        return C_MASK;
2573                case ASB_VIOLENTLY:
2574                        dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
2575                             "assumption\n");
2576                }
2577        }
2578
2579        if (mdev->net_conf->dry_run || test_bit(CONN_DRY_RUN, &mdev->flags)) {
2580                if (hg == 0)
2581                        dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
2582                else
2583                        dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
2584                                 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
2585                                 abs(hg) >= 2 ? "full" : "bit-map based");
2586                return C_MASK;
2587        }
2588
2589        if (abs(hg) >= 2) {
2590                dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
2591                if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake"))
2592                        return C_MASK;
2593        }
2594
2595        if (hg > 0) { /* become sync source. */
2596                rv = C_WF_BITMAP_S;
2597        } else if (hg < 0) { /* become sync target */
2598                rv = C_WF_BITMAP_T;
2599        } else {
2600                rv = C_CONNECTED;
2601                if (drbd_bm_total_weight(mdev)) {
2602                        dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
2603                             drbd_bm_total_weight(mdev));
2604                }
2605        }
2606
2607        return rv;
2608}
2609
2610/* returns 1 if invalid */
2611static int cmp_after_sb(enum drbd_after_sb_p peer, enum drbd_after_sb_p self)
2612{
2613        /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
2614        if ((peer == ASB_DISCARD_REMOTE && self == ASB_DISCARD_LOCAL) ||
2615            (self == ASB_DISCARD_REMOTE && peer == ASB_DISCARD_LOCAL))
2616                return 0;
2617
2618        /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
2619        if (peer == ASB_DISCARD_REMOTE || peer == ASB_DISCARD_LOCAL ||
2620            self == ASB_DISCARD_REMOTE || self == ASB_DISCARD_LOCAL)
2621                return 1;
2622
2623        /* everything else is valid if they are equal on both sides. */
2624        if (peer == self)
2625                return 0;
2626
2627        /* everything es is invalid. */
2628        return 1;
2629}
2630
2631static int receive_protocol(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2632{
2633        struct p_protocol *p = &mdev->data.rbuf.protocol;
2634        int p_proto, p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
2635        int p_want_lose, p_two_primaries, cf;
2636        char p_integrity_alg[SHARED_SECRET_MAX] = "";
2637
2638        p_proto         = be32_to_cpu(p->protocol);
2639        p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
2640        p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
2641        p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
2642        p_two_primaries = be32_to_cpu(p->two_primaries);
2643        cf              = be32_to_cpu(p->conn_flags);
2644        p_want_lose = cf & CF_WANT_LOSE;
2645
2646        clear_bit(CONN_DRY_RUN, &mdev->flags);
2647
2648        if (cf & CF_DRY_RUN)
2649                set_bit(CONN_DRY_RUN, &mdev->flags);
2650
2651        if (p_proto != mdev->net_conf->wire_protocol) {
2652                dev_err(DEV, "incompatible communication protocols\n");
2653                goto disconnect;
2654        }
2655
2656        if (cmp_after_sb(p_after_sb_0p, mdev->net_conf->after_sb_0p)) {
2657                dev_err(DEV, "incompatible after-sb-0pri settings\n");
2658                goto disconnect;
2659        }
2660
2661        if (cmp_after_sb(p_after_sb_1p, mdev->net_conf->after_sb_1p)) {
2662                dev_err(DEV, "incompatible after-sb-1pri settings\n");
2663                goto disconnect;
2664        }
2665
2666        if (cmp_after_sb(p_after_sb_2p, mdev->net_conf->after_sb_2p)) {
2667                dev_err(DEV, "incompatible after-sb-2pri settings\n");
2668                goto disconnect;
2669        }
2670
2671        if (p_want_lose && mdev->net_conf->want_lose) {
2672                dev_err(DEV, "both sides have the 'want_lose' flag set\n");
2673                goto disconnect;
2674        }
2675
2676        if (p_two_primaries != mdev->net_conf->two_primaries) {
2677                dev_err(DEV, "incompatible setting of the two-primaries options\n");
2678                goto disconnect;
2679        }
2680
2681        if (mdev->agreed_pro_version >= 87) {
2682                unsigned char *my_alg = mdev->net_conf->integrity_alg;
2683
2684                if (drbd_recv(mdev, p_integrity_alg, data_size) != data_size)
2685                        return FALSE;
2686
2687                p_integrity_alg[SHARED_SECRET_MAX-1] = 0;
2688                if (strcmp(p_integrity_alg, my_alg)) {
2689                        dev_err(DEV, "incompatible setting of the data-integrity-alg\n");
2690                        goto disconnect;
2691                }
2692                dev_info(DEV, "data-integrity-alg: %s\n",
2693                     my_alg[0] ? my_alg : (unsigned char *)"<not-used>");
2694        }
2695
2696        return TRUE;
2697
2698disconnect:
2699        drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2700        return FALSE;
2701}
2702
2703/* helper function
2704 * input: alg name, feature name
2705 * return: NULL (alg name was "")
2706 *         ERR_PTR(error) if something goes wrong
2707 *         or the crypto hash ptr, if it worked out ok. */
2708struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
2709                const char *alg, const char *name)
2710{
2711        struct crypto_hash *tfm;
2712
2713        if (!alg[0])
2714                return NULL;
2715
2716        tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
2717        if (IS_ERR(tfm)) {
2718                dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
2719                        alg, name, PTR_ERR(tfm));
2720                return tfm;
2721        }
2722        if (!drbd_crypto_is_hash(crypto_hash_tfm(tfm))) {
2723                crypto_free_hash(tfm);
2724                dev_err(DEV, "\"%s\" is not a digest (%s)\n", alg, name);
2725                return ERR_PTR(-EINVAL);
2726        }
2727        return tfm;
2728}
2729
2730static int receive_SyncParam(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int packet_size)
2731{
2732        int ok = TRUE;
2733        struct p_rs_param_95 *p = &mdev->data.rbuf.rs_param_95;
2734        unsigned int header_size, data_size, exp_max_sz;
2735        struct crypto_hash *verify_tfm = NULL;
2736        struct crypto_hash *csums_tfm = NULL;
2737        const int apv = mdev->agreed_pro_version;
2738        int *rs_plan_s = NULL;
2739        int fifo_size = 0;
2740
2741        exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
2742                    : apv == 88 ? sizeof(struct p_rs_param)
2743                                        + SHARED_SECRET_MAX
2744                    : apv <= 94 ? sizeof(struct p_rs_param_89)
2745                    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
2746
2747        if (packet_size > exp_max_sz) {
2748                dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
2749                    packet_size, exp_max_sz);
2750                return FALSE;
2751        }
2752
2753        if (apv <= 88) {
2754                header_size = sizeof(struct p_rs_param) - sizeof(struct p_header80);
2755                data_size   = packet_size  - header_size;
2756        } else if (apv <= 94) {
2757                header_size = sizeof(struct p_rs_param_89) - sizeof(struct p_header80);
2758                data_size   = packet_size  - header_size;
2759                D_ASSERT(data_size == 0);
2760        } else {
2761                header_size = sizeof(struct p_rs_param_95) - sizeof(struct p_header80);
2762                data_size   = packet_size  - header_size;
2763                D_ASSERT(data_size == 0);
2764        }
2765
2766        /* initialize verify_alg and csums_alg */
2767        memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
2768
2769        if (drbd_recv(mdev, &p->head.payload, header_size) != header_size)
2770                return FALSE;
2771
2772        mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2773
2774        if (apv >= 88) {
2775                if (apv == 88) {
2776                        if (data_size > SHARED_SECRET_MAX) {
2777                                dev_err(DEV, "verify-alg too long, "
2778                                    "peer wants %u, accepting only %u byte\n",
2779                                                data_size, SHARED_SECRET_MAX);
2780                                return FALSE;
2781                        }
2782
2783                        if (drbd_recv(mdev, p->verify_alg, data_size) != data_size)
2784                                return FALSE;
2785
2786                        /* we expect NUL terminated string */
2787                        /* but just in case someone tries to be evil */
2788                        D_ASSERT(p->verify_alg[data_size-1] == 0);
2789                        p->verify_alg[data_size-1] = 0;
2790
2791                } else /* apv >= 89 */ {
2792                        /* we still expect NUL terminated strings */
2793                        /* but just in case someone tries to be evil */
2794                        D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
2795                        D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
2796                        p->verify_alg[SHARED_SECRET_MAX-1] = 0;
2797                        p->csums_alg[SHARED_SECRET_MAX-1] = 0;
2798                }
2799
2800                if (strcmp(mdev->sync_conf.verify_alg, p->verify_alg)) {
2801                        if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2802                                dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
2803                                    mdev->sync_conf.verify_alg, p->verify_alg);
2804                                goto disconnect;
2805                        }
2806                        verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
2807                                        p->verify_alg, "verify-alg");
2808                        if (IS_ERR(verify_tfm)) {
2809                                verify_tfm = NULL;
2810                                goto disconnect;
2811                        }
2812                }
2813
2814                if (apv >= 89 && strcmp(mdev->sync_conf.csums_alg, p->csums_alg)) {
2815                        if (mdev->state.conn == C_WF_REPORT_PARAMS) {
2816                                dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
2817                                    mdev->sync_conf.csums_alg, p->csums_alg);
2818                                goto disconnect;
2819                        }
2820                        csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
2821                                        p->csums_alg, "csums-alg");
2822                        if (IS_ERR(csums_tfm)) {
2823                                csums_tfm = NULL;
2824                                goto disconnect;
2825                        }
2826                }
2827
2828                if (apv > 94) {
2829                        mdev->sync_conf.rate      = be32_to_cpu(p->rate);
2830                        mdev->sync_conf.c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
2831                        mdev->sync_conf.c_delay_target = be32_to_cpu(p->c_delay_target);
2832                        mdev->sync_conf.c_fill_target = be32_to_cpu(p->c_fill_target);
2833                        mdev->sync_conf.c_max_rate = be32_to_cpu(p->c_max_rate);
2834
2835                        fifo_size = (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ;
2836                        if (fifo_size != mdev->rs_plan_s.size && fifo_size > 0) {
2837                                rs_plan_s   = kzalloc(sizeof(int) * fifo_size, GFP_KERNEL);
2838                                if (!rs_plan_s) {
2839                                        dev_err(DEV, "kmalloc of fifo_buffer failed");
2840                                        goto disconnect;
2841                                }
2842                        }
2843                }
2844
2845                spin_lock(&mdev->peer_seq_lock);
2846                /* lock against drbd_nl_syncer_conf() */
2847                if (verify_tfm) {
2848                        strcpy(mdev->sync_conf.verify_alg, p->verify_alg);
2849                        mdev->sync_conf.verify_alg_len = strlen(p->verify_alg) + 1;
2850                        crypto_free_hash(mdev->verify_tfm);
2851                        mdev->verify_tfm = verify_tfm;
2852                        dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
2853                }
2854                if (csums_tfm) {
2855                        strcpy(mdev->sync_conf.csums_alg, p->csums_alg);
2856                        mdev->sync_conf.csums_alg_len = strlen(p->csums_alg) + 1;
2857                        crypto_free_hash(mdev->csums_tfm);
2858                        mdev->csums_tfm = csums_tfm;
2859                        dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
2860                }
2861                if (fifo_size != mdev->rs_plan_s.size) {
2862                        kfree(mdev->rs_plan_s.values);
2863                        mdev->rs_plan_s.values = rs_plan_s;
2864                        mdev->rs_plan_s.size   = fifo_size;
2865                        mdev->rs_planed = 0;
2866                }
2867                spin_unlock(&mdev->peer_seq_lock);
2868        }
2869
2870        return ok;
2871disconnect:
2872        /* just for completeness: actually not needed,
2873         * as this is not reached if csums_tfm was ok. */
2874        crypto_free_hash(csums_tfm);
2875        /* but free the verify_tfm again, if csums_tfm did not work out */
2876        crypto_free_hash(verify_tfm);
2877        drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2878        return FALSE;
2879}
2880
2881static void drbd_setup_order_type(struct drbd_conf *mdev, int peer)
2882{
2883        /* sorry, we currently have no working implementation
2884         * of distributed TCQ */
2885}
2886
2887/* warn if the arguments differ by more than 12.5% */
2888static void warn_if_differ_considerably(struct drbd_conf *mdev,
2889        const char *s, sector_t a, sector_t b)
2890{
2891        sector_t d;
2892        if (a == 0 || b == 0)
2893                return;
2894        d = (a > b) ? (a - b) : (b - a);
2895        if (d > (a>>3) || d > (b>>3))
2896                dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
2897                     (unsigned long long)a, (unsigned long long)b);
2898}
2899
2900static int receive_sizes(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
2901{
2902        struct p_sizes *p = &mdev->data.rbuf.sizes;
2903        enum determine_dev_size dd = unchanged;
2904        unsigned int max_seg_s;
2905        sector_t p_size, p_usize, my_usize;
2906        int ldsc = 0; /* local disk size changed */
2907        enum dds_flags ddsf;
2908
2909        p_size = be64_to_cpu(p->d_size);
2910        p_usize = be64_to_cpu(p->u_size);
2911
2912        if (p_size == 0 && mdev->state.disk == D_DISKLESS) {
2913                dev_err(DEV, "some backing storage is needed\n");
2914                drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2915                return FALSE;
2916        }
2917
2918        /* just store the peer's disk size for now.
2919         * we still need to figure out whether we accept that. */
2920        mdev->p_size = p_size;
2921
2922        if (get_ldev(mdev)) {
2923                warn_if_differ_considerably(mdev, "lower level device sizes",
2924                           p_size, drbd_get_max_capacity(mdev->ldev));
2925                warn_if_differ_considerably(mdev, "user requested size",
2926                                            p_usize, mdev->ldev->dc.disk_size);
2927
2928                /* if this is the first connect, or an otherwise expected
2929                 * param exchange, choose the minimum */
2930                if (mdev->state.conn == C_WF_REPORT_PARAMS)
2931                        p_usize = min_not_zero((sector_t)mdev->ldev->dc.disk_size,
2932                                             p_usize);
2933
2934                my_usize = mdev->ldev->dc.disk_size;
2935
2936                if (mdev->ldev->dc.disk_size != p_usize) {
2937                        mdev->ldev->dc.disk_size = p_usize;
2938                        dev_info(DEV, "Peer sets u_size to %lu sectors\n",
2939                             (unsigned long)mdev->ldev->dc.disk_size);
2940                }
2941
2942                /* Never shrink a device with usable data during connect.
2943                   But allow online shrinking if we are connected. */
2944                if (drbd_new_dev_size(mdev, mdev->ldev, 0) <
2945                   drbd_get_capacity(mdev->this_bdev) &&
2946                   mdev->state.disk >= D_OUTDATED &&
2947                   mdev->state.conn < C_CONNECTED) {
2948                        dev_err(DEV, "The peer's disk size is too small!\n");
2949                        drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
2950                        mdev->ldev->dc.disk_size = my_usize;
2951                        put_ldev(mdev);
2952                        return FALSE;
2953                }
2954                put_ldev(mdev);
2955        }
2956#undef min_not_zero
2957
2958        ddsf = be16_to_cpu(p->dds_flags);
2959        if (get_ldev(mdev)) {
2960                dd = drbd_determin_dev_size(mdev, ddsf);
2961                put_ldev(mdev);
2962                if (dd == dev_size_error)
2963                        return FALSE;
2964                drbd_md_sync(mdev);
2965        } else {
2966                /* I am diskless, need to accept the peer's size. */
2967                drbd_set_my_capacity(mdev, p_size);
2968        }
2969
2970        if (get_ldev(mdev)) {
2971                if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
2972                        mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
2973                        ldsc = 1;
2974                }
2975
2976                if (mdev->agreed_pro_version < 94)
2977                        max_seg_s = be32_to_cpu(p->max_segment_size);
2978                else if (mdev->agreed_pro_version == 94)
2979                        max_seg_s = DRBD_MAX_SIZE_H80_PACKET;
2980                else /* drbd 8.3.8 onwards */
2981                        max_seg_s = DRBD_MAX_SEGMENT_SIZE;
2982
2983                if (max_seg_s != queue_max_segment_size(mdev->rq_queue))
2984                        drbd_setup_queue_param(mdev, max_seg_s);
2985
2986                drbd_setup_order_type(mdev, be16_to_cpu(p->queue_order_type));
2987                put_ldev(mdev);
2988        }
2989
2990        if (mdev->state.conn > C_WF_REPORT_PARAMS) {
2991                if (be64_to_cpu(p->c_size) !=
2992                    drbd_get_capacity(mdev->this_bdev) || ldsc) {
2993                        /* we have different sizes, probably peer
2994                         * needs to know my new size... */
2995                        drbd_send_sizes(mdev, 0, ddsf);
2996                }
2997                if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
2998                    (dd == grew && mdev->state.conn == C_CONNECTED)) {
2999                        if (mdev->state.pdsk >= D_INCONSISTENT &&
3000                            mdev->state.disk >= D_INCONSISTENT) {
3001                                if (ddsf & DDSF_NO_RESYNC)
3002                                        dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3003                                else
3004                                        resync_after_online_grow(mdev);
3005                        } else
3006                                set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3007                }
3008        }
3009
3010        return TRUE;
3011}
3012
3013static int receive_uuids(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3014{
3015        struct p_uuids *p = &mdev->data.rbuf.uuids;
3016        u64 *p_uuid;
3017        int i;
3018
3019        p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3020
3021        for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3022                p_uuid[i] = be64_to_cpu(p->uuid[i]);
3023
3024        kfree(mdev->p_uuid);
3025        mdev->p_uuid = p_uuid;
3026
3027        if (mdev->state.conn < C_CONNECTED &&
3028            mdev->state.disk < D_INCONSISTENT &&
3029            mdev->state.role == R_PRIMARY &&
3030            (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3031                dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3032                    (unsigned long long)mdev->ed_uuid);
3033                drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3034                return FALSE;
3035        }
3036
3037        if (get_ldev(mdev)) {
3038                int skip_initial_sync =
3039                        mdev->state.conn == C_CONNECTED &&
3040                        mdev->agreed_pro_version >= 90 &&
3041                        mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3042                        (p_uuid[UI_FLAGS] & 8);
3043                if (skip_initial_sync) {
3044                        dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3045                        drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3046                                        "clear_n_write from receive_uuids");
3047                        _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3048                        _drbd_uuid_set(mdev, UI_BITMAP, 0);
3049                        _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3050                                        CS_VERBOSE, NULL);
3051                        drbd_md_sync(mdev);
3052                }
3053                put_ldev(mdev);
3054        } else if (mdev->state.disk < D_INCONSISTENT &&
3055                   mdev->state.role == R_PRIMARY) {
3056                /* I am a diskless primary, the peer just created a new current UUID
3057                   for me. */
3058                drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3059        }
3060
3061        /* Before we test for the disk state, we should wait until an eventually
3062           ongoing cluster wide state change is finished. That is important if
3063           we are primary and are detaching from our disk. We need to see the
3064           new disk state... */
3065        wait_event(mdev->misc_wait, !test_bit(CLUSTER_ST_CHANGE, &mdev->flags));
3066        if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3067                drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3068
3069        return TRUE;
3070}
3071
3072/**
3073 * convert_state() - Converts the peer's view of the cluster state to our point of view
3074 * @ps:         The state as seen by the peer.
3075 */
3076static union drbd_state convert_state(union drbd_state ps)
3077{
3078        union drbd_state ms;
3079
3080        static enum drbd_conns c_tab[] = {
3081                [C_CONNECTED] = C_CONNECTED,
3082
3083                [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3084                [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3085                [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3086                [C_VERIFY_S]       = C_VERIFY_T,
3087                [C_MASK]   = C_MASK,
3088        };
3089
3090        ms.i = ps.i;
3091
3092        ms.conn = c_tab[ps.conn];
3093        ms.peer = ps.role;
3094        ms.role = ps.peer;
3095        ms.pdsk = ps.disk;
3096        ms.disk = ps.pdsk;
3097        ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3098
3099        return ms;
3100}
3101
3102static int receive_req_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3103{
3104        struct p_req_state *p = &mdev->data.rbuf.req_state;
3105        union drbd_state mask, val;
3106        int rv;
3107
3108        mask.i = be32_to_cpu(p->mask);
3109        val.i = be32_to_cpu(p->val);
3110
3111        if (test_bit(DISCARD_CONCURRENT, &mdev->flags) &&
3112            test_bit(CLUSTER_ST_CHANGE, &mdev->flags)) {
3113                drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3114                return TRUE;
3115        }
3116
3117        mask = convert_state(mask);
3118        val = convert_state(val);
3119
3120        rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3121
3122        drbd_send_sr_reply(mdev, rv);
3123        drbd_md_sync(mdev);
3124
3125        return TRUE;
3126}
3127
3128static int receive_state(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3129{
3130        struct p_state *p = &mdev->data.rbuf.state;
3131        union drbd_state os, ns, peer_state;
3132        enum drbd_disk_state real_peer_disk;
3133        enum chg_state_flags cs_flags;
3134        int rv;
3135
3136        peer_state.i = be32_to_cpu(p->state);
3137
3138        real_peer_disk = peer_state.disk;
3139        if (peer_state.disk == D_NEGOTIATING) {
3140                real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3141                dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3142        }
3143
3144        spin_lock_irq(&mdev->req_lock);
3145 retry:
3146        os = ns = mdev->state;
3147        spin_unlock_irq(&mdev->req_lock);
3148
3149        /* peer says his disk is uptodate, while we think it is inconsistent,
3150         * and this happens while we think we have a sync going on. */
3151        if (os.pdsk == D_INCONSISTENT && real_peer_disk == D_UP_TO_DATE &&
3152            os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3153                /* If we are (becoming) SyncSource, but peer is still in sync
3154                 * preparation, ignore its uptodate-ness to avoid flapping, it
3155                 * will change to inconsistent once the peer reaches active
3156                 * syncing states.
3157                 * It may have changed syncer-paused flags, however, so we
3158                 * cannot ignore this completely. */
3159                if (peer_state.conn > C_CONNECTED &&
3160                    peer_state.conn < C_SYNC_SOURCE)
3161                        real_peer_disk = D_INCONSISTENT;
3162
3163                /* if peer_state changes to connected at the same time,
3164                 * it explicitly notifies us that it finished resync.
3165                 * Maybe we should finish it up, too? */
3166                else if (os.conn >= C_SYNC_SOURCE &&
3167                         peer_state.conn == C_CONNECTED) {
3168                        if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3169                                drbd_resync_finished(mdev);
3170                        return TRUE;
3171                }
3172        }
3173
3174        /* peer says his disk is inconsistent, while we think it is uptodate,
3175         * and this happens while the peer still thinks we have a sync going on,
3176         * but we think we are already done with the sync.
3177         * We ignore this to avoid flapping pdsk.
3178         * This should not happen, if the peer is a recent version of drbd. */
3179        if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3180            os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3181                real_peer_disk = D_UP_TO_DATE;
3182
3183        if (ns.conn == C_WF_REPORT_PARAMS)
3184                ns.conn = C_CONNECTED;
3185
3186        if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3187            get_ldev_if_state(mdev, D_NEGOTIATING)) {
3188                int cr; /* consider resync */
3189
3190                /* if we established a new connection */
3191                cr  = (os.conn < C_CONNECTED);
3192                /* if we had an established connection
3193                 * and one of the nodes newly attaches a disk */
3194                cr |= (os.conn == C_CONNECTED &&
3195                       (peer_state.disk == D_NEGOTIATING ||
3196                        os.disk == D_NEGOTIATING));
3197                /* if we have both been inconsistent, and the peer has been
3198                 * forced to be UpToDate with --overwrite-data */
3199                cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3200                /* if we had been plain connected, and the admin requested to
3201                 * start a sync by "invalidate" or "invalidate-remote" */
3202                cr |= (os.conn == C_CONNECTED &&
3203                                (peer_state.conn >= C_STARTING_SYNC_S &&
3204                                 peer_state.conn <= C_WF_BITMAP_T));
3205
3206                if (cr)
3207                        ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3208
3209                put_ldev(mdev);
3210                if (ns.conn == C_MASK) {
3211                        ns.conn = C_CONNECTED;
3212                        if (mdev->state.disk == D_NEGOTIATING) {
3213                                drbd_force_state(mdev, NS(disk, D_FAILED));
3214                        } else if (peer_state.disk == D_NEGOTIATING) {
3215                                dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3216                                peer_state.disk = D_DISKLESS;
3217                                real_peer_disk = D_DISKLESS;
3218                        } else {
3219                                if (test_and_clear_bit(CONN_DRY_RUN, &mdev->flags))
3220                                        return FALSE;
3221                                D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3222                                drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3223                                return FALSE;
3224                        }
3225                }
3226        }
3227
3228        spin_lock_irq(&mdev->req_lock);
3229        if (mdev->state.i != os.i)
3230                goto retry;
3231        clear_bit(CONSIDER_RESYNC, &mdev->flags);
3232        ns.peer = peer_state.role;
3233        ns.pdsk = real_peer_disk;
3234        ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3235        if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3236                ns.disk = mdev->new_state_tmp.disk;
3237        cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3238        if (ns.pdsk == D_CONSISTENT && is_susp(ns) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3239            test_bit(NEW_CUR_UUID, &mdev->flags)) {
3240                /* Do not allow tl_restart(resend) for a rebooted peer. We can only allow this
3241                   for temporal network outages! */
3242                spin_unlock_irq(&mdev->req_lock);
3243                dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3244                tl_clear(mdev);
3245                drbd_uuid_new_current(mdev);
3246                clear_bit(NEW_CUR_UUID, &mdev->flags);
3247                drbd_force_state(mdev, NS2(conn, C_PROTOCOL_ERROR, susp, 0));
3248                return FALSE;
3249        }
3250        rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3251        ns = mdev->state;
3252        spin_unlock_irq(&mdev->req_lock);
3253
3254        if (rv < SS_SUCCESS) {
3255                drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
3256                return FALSE;
3257        }
3258
3259        if (os.conn > C_WF_REPORT_PARAMS) {
3260                if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3261                    peer_state.disk != D_NEGOTIATING ) {
3262                        /* we want resync, peer has not yet decided to sync... */
3263                        /* Nowadays only used when forcing a node into primary role and
3264                           setting its disk to UpToDate with that */
3265                        drbd_send_uuids(mdev);
3266                        drbd_send_state(mdev);
3267                }
3268        }
3269
3270        mdev->net_conf->want_lose = 0;
3271
3272        drbd_md_sync(mdev); /* update connected indicator, la_size, ... */
3273
3274        return TRUE;
3275}
3276
3277static int receive_sync_uuid(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3278{
3279        struct p_rs_uuid *p = &mdev->data.rbuf.rs_uuid;
3280
3281        wait_event(mdev->misc_wait,
3282                   mdev->state.conn == C_WF_SYNC_UUID ||
3283                   mdev->state.conn < C_CONNECTED ||
3284                   mdev->state.disk < D_NEGOTIATING);
3285
3286        /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
3287
3288        /* Here the _drbd_uuid_ functions are right, current should
3289           _not_ be rotated into the history */
3290        if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
3291                _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
3292                _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
3293
3294                drbd_start_resync(mdev, C_SYNC_TARGET);
3295
3296                put_ldev(mdev);
3297        } else
3298                dev_err(DEV, "Ignoring SyncUUID packet!\n");
3299
3300        return TRUE;
3301}
3302
3303enum receive_bitmap_ret { OK, DONE, FAILED };
3304
3305static enum receive_bitmap_ret
3306receive_bitmap_plain(struct drbd_conf *mdev, unsigned int data_size,
3307                     unsigned long *buffer, struct bm_xfer_ctx *c)
3308{
3309        unsigned num_words = min_t(size_t, BM_PACKET_WORDS, c->bm_words - c->word_offset);
3310        unsigned want = num_words * sizeof(long);
3311
3312        if (want != data_size) {
3313                dev_err(DEV, "%s:want (%u) != data_size (%u)\n", __func__, want, data_size);
3314                return FAILED;
3315        }
3316        if (want == 0)
3317                return DONE;
3318        if (drbd_recv(mdev, buffer, want) != want)
3319                return FAILED;
3320
3321        drbd_bm_merge_lel(mdev, c->word_offset, num_words, buffer);
3322
3323        c->word_offset += num_words;
3324        c->bit_offset = c->word_offset * BITS_PER_LONG;
3325        if (c->bit_offset > c->bm_bits)
3326                c->bit_offset = c->bm_bits;
3327
3328        return OK;
3329}
3330
3331static enum receive_bitmap_ret
3332recv_bm_rle_bits(struct drbd_conf *mdev,
3333                struct p_compressed_bm *p,
3334                struct bm_xfer_ctx *c)
3335{
3336        struct bitstream bs;
3337        u64 look_ahead;
3338        u64 rl;
3339        u64 tmp;
3340        unsigned long s = c->bit_offset;
3341        unsigned long e;
3342        int len = be16_to_cpu(p->head.length) - (sizeof(*p) - sizeof(p->head));
3343        int toggle = DCBP_get_start(p);
3344        int have;
3345        int bits;
3346
3347        bitstream_init(&bs, p->code, len, DCBP_get_pad_bits(p));
3348
3349        bits = bitstream_get_bits(&bs, &look_ahead, 64);
3350        if (bits < 0)
3351                return FAILED;
3352
3353        for (have = bits; have > 0; s += rl, toggle = !toggle) {
3354                bits = vli_decode_bits(&rl, look_ahead);
3355                if (bits <= 0)
3356                        return FAILED;
3357
3358                if (toggle) {
3359                        e = s + rl -1;
3360                        if (e >= c->bm_bits) {
3361                                dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
3362                                return FAILED;
3363                        }
3364                        _drbd_bm_set_bits(mdev, s, e);
3365                }
3366
3367                if (have < bits) {
3368                        dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
3369                                have, bits, look_ahead,
3370                                (unsigned int)(bs.cur.b - p->code),
3371                                (unsigned int)bs.buf_len);
3372                        return FAILED;
3373                }
3374                look_ahead >>= bits;
3375                have -= bits;
3376
3377                bits = bitstream_get_bits(&bs, &tmp, 64 - have);
3378                if (bits < 0)
3379                        return FAILED;
3380                look_ahead |= tmp << have;
3381                have += bits;
3382        }
3383
3384        c->bit_offset = s;
3385        bm_xfer_ctx_bit_to_word_offset(c);
3386
3387        return (s == c->bm_bits) ? DONE : OK;
3388}
3389
3390static enum receive_bitmap_ret
3391decode_bitmap_c(struct drbd_conf *mdev,
3392                struct p_compressed_bm *p,
3393                struct bm_xfer_ctx *c)
3394{
3395        if (DCBP_get_code(p) == RLE_VLI_Bits)
3396                return recv_bm_rle_bits(mdev, p, c);
3397
3398        /* other variants had been implemented for evaluation,
3399         * but have been dropped as this one turned out to be "best"
3400         * during all our tests. */
3401
3402        dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
3403        drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3404        return FAILED;
3405}
3406
3407void INFO_bm_xfer_stats(struct drbd_conf *mdev,
3408                const char *direction, struct bm_xfer_ctx *c)
3409{
3410        /* what would it take to transfer it "plaintext" */
3411        unsigned plain = sizeof(struct p_header80) *
3412                ((c->bm_words+BM_PACKET_WORDS-1)/BM_PACKET_WORDS+1)
3413                + c->bm_words * sizeof(long);
3414        unsigned total = c->bytes[0] + c->bytes[1];
3415        unsigned r;
3416
3417        /* total can not be zero. but just in case: */
3418        if (total == 0)
3419                return;
3420
3421        /* don't report if not compressed */
3422        if (total >= plain)
3423                return;
3424
3425        /* total < plain. check for overflow, still */
3426        r = (total > UINT_MAX/1000) ? (total / (plain/1000))
3427                                    : (1000 * total / plain);
3428
3429        if (r > 1000)
3430                r = 1000;
3431
3432        r = 1000 - r;
3433        dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
3434             "total %u; compression: %u.%u%%\n",
3435                        direction,
3436                        c->bytes[1], c->packets[1],
3437                        c->bytes[0], c->packets[0],
3438                        total, r/10, r % 10);
3439}
3440
3441/* Since we are processing the bitfield from lower addresses to higher,
3442   it does not matter if the process it in 32 bit chunks or 64 bit
3443   chunks as long as it is little endian. (Understand it as byte stream,
3444   beginning with the lowest byte...) If we would use big endian
3445   we would need to process it from the highest address to the lowest,
3446   in order to be agnostic to the 32 vs 64 bits issue.
3447
3448   returns 0 on failure, 1 if we successfully received it. */
3449static int receive_bitmap(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3450{
3451        struct bm_xfer_ctx c;
3452        void *buffer;
3453        enum receive_bitmap_ret ret;
3454        int ok = FALSE;
3455        struct p_header80 *h = &mdev->data.rbuf.header.h80;
3456
3457        wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3458
3459        drbd_bm_lock(mdev, "receive bitmap");
3460
3461        /* maybe we should use some per thread scratch page,
3462         * and allocate that during initial device creation? */
3463        buffer   = (unsigned long *) __get_free_page(GFP_NOIO);
3464        if (!buffer) {
3465                dev_err(DEV, "failed to allocate one page buffer in %s\n", __func__);
3466                goto out;
3467        }
3468
3469        c = (struct bm_xfer_ctx) {
3470                .bm_bits = drbd_bm_bits(mdev),
3471                .bm_words = drbd_bm_words(mdev),
3472        };
3473
3474        do {
3475                if (cmd == P_BITMAP) {
3476                        ret = receive_bitmap_plain(mdev, data_size, buffer, &c);
3477                } else if (cmd == P_COMPRESSED_BITMAP) {
3478                        /* MAYBE: sanity check that we speak proto >= 90,
3479                         * and the feature is enabled! */
3480                        struct p_compressed_bm *p;
3481
3482                        if (data_size > BM_PACKET_PAYLOAD_BYTES) {
3483                                dev_err(DEV, "ReportCBitmap packet too large\n");
3484                                goto out;
3485                        }
3486                        /* use the page buff */
3487                        p = buffer;
3488                        memcpy(p, h, sizeof(*h));
3489                        if (drbd_recv(mdev, p->head.payload, data_size) != data_size)
3490                                goto out;
3491                        if (data_size <= (sizeof(*p) - sizeof(p->head))) {
3492                                dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", data_size);
3493                                return FAILED;
3494                        }
3495                        ret = decode_bitmap_c(mdev, p, &c);
3496                } else {
3497                        dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", cmd);
3498                        goto out;
3499                }
3500
3501                c.packets[cmd == P_BITMAP]++;
3502                c.bytes[cmd == P_BITMAP] += sizeof(struct p_header80) + data_size;
3503
3504                if (ret != OK)
3505                        break;
3506
3507                if (!drbd_recv_header(mdev, &cmd, &data_size))
3508                        goto out;
3509        } while (ret == OK);
3510        if (ret == FAILED)
3511                goto out;
3512
3513        INFO_bm_xfer_stats(mdev, "receive", &c);
3514
3515        if (mdev->state.conn == C_WF_BITMAP_T) {
3516                ok = !drbd_send_bitmap(mdev);
3517                if (!ok)
3518                        goto out;
3519                /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
3520                ok = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
3521                D_ASSERT(ok == SS_SUCCESS);
3522        } else if (mdev->state.conn != C_WF_BITMAP_S) {
3523                /* admin may have requested C_DISCONNECTING,
3524                 * other threads may have noticed network errors */
3525                dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
3526                    drbd_conn_str(mdev->state.conn));
3527        }
3528
3529        ok = TRUE;
3530 out:
3531        drbd_bm_unlock(mdev);
3532        if (ok && mdev->state.conn == C_WF_BITMAP_S)
3533                drbd_start_resync(mdev, C_SYNC_SOURCE);
3534        free_page((unsigned long) buffer);
3535        return ok;
3536}
3537
3538static int receive_skip(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3539{
3540        /* TODO zero copy sink :) */
3541        static char sink[128];
3542        int size, want, r;
3543
3544        dev_warn(DEV, "skipping unknown optional packet type %d, l: %d!\n",
3545                 cmd, data_size);
3546
3547        size = data_size;
3548        while (size > 0) {
3549                want = min_t(int, size, sizeof(sink));
3550                r = drbd_recv(mdev, sink, want);
3551                ERR_IF(r <= 0) break;
3552                size -= r;
3553        }
3554        return size == 0;
3555}
3556
3557static int receive_UnplugRemote(struct drbd_conf *mdev, enum drbd_packets cmd, unsigned int data_size)
3558{
3559        if (mdev->state.disk >= D_INCONSISTENT)
3560                drbd_kick_lo(mdev);
3561
3562        /* Make sure we've acked all the TCP data associated
3563         * with the data requests being unplugged */
3564        drbd_tcp_quickack(mdev->data.socket);
3565
3566        return TRUE;
3567}
3568
3569typedef int (*drbd_cmd_handler_f)(struct drbd_conf *, enum drbd_packets cmd, unsigned int to_receive);
3570
3571struct data_cmd {
3572        int expect_payload;
3573        size_t pkt_size;
3574        drbd_cmd_handler_f function;
3575};
3576
3577static struct data_cmd drbd_cmd_handler[] = {
3578        [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
3579        [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
3580        [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
3581        [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
3582        [P_BITMAP]          = { 1, sizeof(struct p_header80), receive_bitmap } ,
3583        [P_COMPRESSED_BITMAP] = { 1, sizeof(struct p_header80), receive_bitmap } ,
3584        [P_UNPLUG_REMOTE]   = { 0, sizeof(struct p_header80), receive_UnplugRemote },
3585        [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
3586        [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
3587        [P_SYNC_PARAM]      = { 1, sizeof(struct p_header80), receive_SyncParam },
3588        [P_SYNC_PARAM89]    = { 1, sizeof(struct p_header80), receive_SyncParam },
3589        [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
3590        [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
3591        [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
3592        [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
3593        [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
3594        [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
3595        [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
3596        [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
3597        [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
3598        [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
3599        /* anything missing from this table is in
3600         * the asender_tbl, see get_asender_cmd */
3601        [P_MAX_CMD]         = { 0, 0, NULL },
3602};
3603
3604/* All handler functions that expect a sub-header get that sub-heder in
3605   mdev->data.rbuf.header.head.payload.
3606
3607   Usually in mdev->data.rbuf.header.head the callback can find the usual
3608   p_header, but they may not rely on that. Since there is also p_header95 !
3609 */
3610
3611static void drbdd(struct drbd_conf *mdev)
3612{
3613        union p_header *header = &mdev->data.rbuf.header;
3614        unsigned int packet_size;
3615        enum drbd_packets cmd;
3616        size_t shs; /* sub header size */
3617        int rv;
3618
3619        while (get_t_state(&mdev->receiver) == Running) {
3620                drbd_thread_current_set_cpu(mdev);
3621                if (!drbd_recv_header(mdev, &cmd, &packet_size))
3622                        goto err_out;
3623
3624                if (unlikely(cmd >= P_MAX_CMD || !drbd_cmd_handler[cmd].function)) {
3625                        dev_err(DEV, "unknown packet type %d, l: %d!\n", cmd, packet_size);
3626                        goto err_out;
3627                }
3628
3629                shs = drbd_cmd_handler[cmd].pkt_size - sizeof(union p_header);
3630                if (packet_size - shs > 0 && !drbd_cmd_handler[cmd].expect_payload) {
3631                        dev_err(DEV, "No payload expected %s l:%d\n", cmdname(cmd), packet_size);
3632                        goto err_out;
3633                }
3634
3635                if (shs) {
3636                        rv = drbd_recv(mdev, &header->h80.payload, shs);
3637                        if (unlikely(rv != shs)) {
3638                                dev_err(DEV, "short read while reading sub header: rv=%d\n", rv);
3639                                goto err_out;
3640                        }
3641                }
3642
3643                rv = drbd_cmd_handler[cmd].function(mdev, cmd, packet_size - shs);
3644
3645                if (unlikely(!rv)) {
3646                        dev_err(DEV, "error receiving %s, l: %d!\n",
3647                            cmdname(cmd), packet_size);
3648                        goto err_out;
3649                }
3650        }
3651
3652        if (0) {
3653        err_out:
3654                drbd_force_state(mdev, NS(conn, C_PROTOCOL_ERROR));
3655        }
3656        /* If we leave here, we probably want to update at least the
3657         * "Connected" indicator on stable storage. Do so explicitly here. */
3658        drbd_md_sync(mdev);
3659}
3660
3661void drbd_flush_workqueue(struct drbd_conf *mdev)
3662{
3663        struct drbd_wq_barrier barr;
3664
3665        barr.w.cb = w_prev_work_done;
3666        init_completion(&barr.done);
3667        drbd_queue_work(&mdev->data.work, &barr.w);
3668        wait_for_completion(&barr.done);
3669}
3670
3671void drbd_free_tl_hash(struct drbd_conf *mdev)
3672{
3673        struct hlist_head *h;
3674
3675        spin_lock_irq(&mdev->req_lock);
3676
3677        if (!mdev->tl_hash || mdev->state.conn != C_STANDALONE) {
3678                spin_unlock_irq(&mdev->req_lock);
3679                return;
3680        }
3681        /* paranoia code */
3682        for (h = mdev->ee_hash; h < mdev->ee_hash + mdev->ee_hash_s; h++)
3683                if (h->first)
3684                        dev_err(DEV, "ASSERT FAILED ee_hash[%u].first == %p, expected NULL\n",
3685                                (int)(h - mdev->ee_hash), h->first);
3686        kfree(mdev->ee_hash);
3687        mdev->ee_hash = NULL;
3688        mdev->ee_hash_s = 0;
3689
3690        /* paranoia code */
3691        for (h = mdev->tl_hash; h < mdev->tl_hash + mdev->tl_hash_s; h++)
3692                if (h->first)
3693                        dev_err(DEV, "ASSERT FAILED tl_hash[%u] == %p, expected NULL\n",
3694                                (int)(h - mdev->tl_hash), h->first);
3695        kfree(mdev->tl_hash);
3696        mdev->tl_hash = NULL;
3697        mdev->tl_hash_s = 0;
3698        spin_unlock_irq(&mdev->req_lock);
3699}
3700
3701static void drbd_disconnect(struct drbd_conf *mdev)
3702{
3703        enum drbd_fencing_p fp;
3704        union drbd_state os, ns;
3705        int rv = SS_UNKNOWN_ERROR;
3706        unsigned int i;
3707
3708        if (mdev->state.conn == C_STANDALONE)
3709                return;
3710        if (mdev->state.conn >= C_WF_CONNECTION)
3711                dev_err(DEV, "ASSERT FAILED cstate = %s, expected < WFConnection\n",
3712                                drbd_conn_str(mdev->state.conn));
3713
3714        /* asender does not clean up anything. it must not interfere, either */
3715        drbd_thread_stop(&mdev->asender);
3716        drbd_free_sock(mdev);
3717
3718        /* wait for current activity to cease. */
3719        spin_lock_irq(&mdev->req_lock);
3720        _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
3721        _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
3722        _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
3723        spin_unlock_irq(&mdev->req_lock);
3724
3725        /* We do not have data structures that would allow us to
3726         * get the rs_pending_cnt down to 0 again.
3727         *  * On C_SYNC_TARGET we do not have any data structures describing
3728         *    the pending RSDataRequest's we have sent.
3729         *  * On C_SYNC_SOURCE there is no data structure that tracks
3730         *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
3731         *  And no, it is not the sum of the reference counts in the
3732         *  resync_LRU. The resync_LRU tracks the whole operation including
3733         *  the disk-IO, while the rs_pending_cnt only tracks the blocks
3734         *  on the fly. */
3735        drbd_rs_cancel_all(mdev);
3736        mdev->rs_total = 0;
3737        mdev->rs_failed = 0;
3738        atomic_set(&mdev->rs_pending_cnt, 0);
3739        wake_up(&mdev->misc_wait);
3740
3741        /* make sure syncer is stopped and w_resume_next_sg queued */
3742        del_timer_sync(&mdev->resync_timer);
3743        resync_timer_fn((unsigned long)mdev);
3744
3745        /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
3746         * w_make_resync_request etc. which may still be on the worker queue
3747         * to be "canceled" */
3748        drbd_flush_workqueue(mdev);
3749
3750        /* This also does reclaim_net_ee().  If we do this too early, we might
3751         * miss some resync ee and pages.*/
3752        drbd_process_done_ee(mdev);
3753
3754        kfree(mdev->p_uuid);
3755        mdev->p_uuid = NULL;
3756
3757        if (!is_susp(mdev->state))
3758                tl_clear(mdev);
3759
3760        dev_info(DEV, "Connection closed\n");
3761
3762        drbd_md_sync(mdev);
3763
3764        fp = FP_DONT_CARE;
3765        if (get_ldev(mdev)) {
3766                fp = mdev->ldev->dc.fencing;
3767                put_ldev(mdev);
3768        }
3769
3770        if (mdev->state.role == R_PRIMARY && fp >= FP_RESOURCE && mdev->state.pdsk >= D_UNKNOWN)
3771                drbd_try_outdate_peer_async(mdev);
3772
3773        spin_lock_irq(&mdev->req_lock);
3774        os = mdev->state;
3775        if (os.conn >= C_UNCONNECTED) {
3776                /* Do not restart in case we are C_DISCONNECTING */
3777                ns = os;
3778                ns.conn = C_UNCONNECTED;
3779                rv = _drbd_set_state(mdev, ns, CS_VERBOSE, NULL);
3780        }
3781        spin_unlock_irq(&mdev->req_lock);
3782
3783        if (os.conn == C_DISCONNECTING) {
3784                wait_event(mdev->net_cnt_wait, atomic_read(&mdev->net_cnt) == 0);
3785
3786                if (!is_susp(mdev->state)) {
3787                        /* we must not free the tl_hash
3788                         * while application io is still on the fly */
3789                        wait_event(mdev->misc_wait, !atomic_read(&mdev->ap_bio_cnt));
3790                        drbd_free_tl_hash(mdev);
3791                }
3792
3793                crypto_free_hash(mdev->cram_hmac_tfm);
3794                mdev->cram_hmac_tfm = NULL;
3795
3796                kfree(mdev->net_conf);
3797                mdev->net_conf = NULL;
3798                drbd_request_state(mdev, NS(conn, C_STANDALONE));
3799        }
3800
3801        /* tcp_close and release of sendpage pages can be deferred.  I don't
3802         * want to use SO_LINGER, because apparently it can be deferred for
3803         * more than 20 seconds (longest time I checked).
3804         *
3805         * Actually we don't care for exactly when the network stack does its
3806         * put_page(), but release our reference on these pages right here.
3807         */
3808        i = drbd_release_ee(mdev, &mdev->net_ee);
3809        if (i)
3810                dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
3811        i = atomic_read(&mdev->pp_in_use_by_net);
3812        if (i)
3813                dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
3814        i = atomic_read(&mdev->pp_in_use);
3815        if (i)
3816                dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
3817
3818        D_ASSERT(list_empty(&mdev->read_ee));
3819        D_ASSERT(list_empty(&mdev->active_ee));
3820        D_ASSERT(list_empty(&mdev->sync_ee));
3821        D_ASSERT(list_empty(&mdev->done_ee));
3822
3823        /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
3824        atomic_set(&mdev->current_epoch->epoch_size, 0);
3825        D_ASSERT(list_empty(&mdev->current_epoch->list));
3826}
3827
3828/*
3829 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
3830 * we can agree on is stored in agreed_pro_version.
3831 *
3832 * feature flags and the reserved array should be enough room for future
3833 * enhancements of the handshake protocol, and possible plugins...
3834 *
3835 * for now, they are expected to be zero, but ignored.
3836 */
3837static int drbd_send_handshake(struct drbd_conf *mdev)
3838{
3839        /* ASSERT current == mdev->receiver ... */
3840        struct p_handshake *p = &mdev->data.sbuf.handshake;
3841        int ok;
3842
3843        if (mutex_lock_interruptible(&mdev->data.mutex)) {
3844                dev_err(DEV, "interrupted during initial handshake\n");
3845                return 0; /* interrupted. not ok. */
3846        }
3847
3848        if (mdev->data.socket == NULL) {
3849                mutex_unlock(&mdev->data.mutex);
3850                return 0;
3851        }
3852
3853        memset(p, 0, sizeof(*p));
3854        p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
3855        p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
3856        ok = _drbd_send_cmd( mdev, mdev->data.socket, P_HAND_SHAKE,
3857                             (struct p_header80 *)p, sizeof(*p), 0 );
3858        mutex_unlock(&mdev->data.mutex);
3859        return ok;
3860}
3861
3862/*
3863 * return values:
3864 *   1 yes, we have a valid connection
3865 *   0 oops, did not work out, please try again
3866 *  -1 peer talks different language,
3867 *     no point in trying again, please go standalone.
3868 */
3869static int drbd_do_handshake(struct drbd_conf *mdev)
3870{
3871        /* ASSERT current == mdev->receiver ... */
3872        struct p_handshake *p = &mdev->data.rbuf.handshake;
3873        const int expect = sizeof(struct p_handshake) - sizeof(struct p_header80);
3874        unsigned int length;
3875        enum drbd_packets cmd;
3876        int rv;
3877
3878        rv = drbd_send_handshake(mdev);
3879        if (!rv)
3880                return 0;
3881
3882        rv = drbd_recv_header(mdev, &cmd, &length);
3883        if (!rv)
3884                return 0;
3885
3886        if (cmd != P_HAND_SHAKE) {
3887                dev_err(DEV, "expected HandShake packet, received: %s (0x%04x)\n",
3888                     cmdname(cmd), cmd);
3889                return -1;
3890        }
3891
3892        if (length != expect) {
3893                dev_err(DEV, "expected HandShake length: %u, received: %u\n",
3894                     expect, length);
3895                return -1;
3896        }
3897
3898        rv = drbd_recv(mdev, &p->head.payload, expect);
3899
3900        if (rv != expect) {
3901                dev_err(DEV, "short read receiving handshake packet: l=%u\n", rv);
3902                return 0;
3903        }
3904
3905        p->protocol_min = be32_to_cpu(p->protocol_min);
3906        p->protocol_max = be32_to_cpu(p->protocol_max);
3907        if (p->protocol_max == 0)
3908                p->protocol_max = p->protocol_min;
3909
3910        if (PRO_VERSION_MAX < p->protocol_min ||
3911            PRO_VERSION_MIN > p->protocol_max)
3912                goto incompat;
3913
3914        mdev->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
3915
3916        dev_info(DEV, "Handshake successful: "
3917             "Agreed network protocol version %d\n", mdev->agreed_pro_version);
3918
3919        return 1;
3920
3921 incompat:
3922        dev_err(DEV, "incompatible DRBD dialects: "
3923            "I support %d-%d, peer supports %d-%d\n",
3924            PRO_VERSION_MIN, PRO_VERSION_MAX,
3925            p->protocol_min, p->protocol_max);
3926        return -1;
3927}
3928
3929#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
3930static int drbd_do_auth(struct drbd_conf *mdev)
3931{
3932        dev_err(DEV, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
3933        dev_err(DEV, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
3934        return -1;
3935}
3936#else
3937#define CHALLENGE_LEN 64
3938
3939/* Return value:
3940        1 - auth succeeded,
3941        0 - failed, try again (network error),
3942        -1 - auth failed, don't try again.
3943*/
3944
3945static int drbd_do_auth(struct drbd_conf *mdev)
3946{
3947        char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
3948        struct scatterlist sg;
3949        char *response = NULL;
3950        char *right_response = NULL;
3951        char *peers_ch = NULL;
3952        unsigned int key_len = strlen(mdev->net_conf->shared_secret);
3953        unsigned int resp_size;
3954        struct hash_desc desc;
3955        enum drbd_packets cmd;
3956        unsigned int length;
3957        int rv;
3958
3959        desc.tfm = mdev->cram_hmac_tfm;
3960        desc.flags = 0;
3961
3962        rv = crypto_hash_setkey(mdev->cram_hmac_tfm,
3963                                (u8 *)mdev->net_conf->shared_secret, key_len);
3964        if (rv) {
3965                dev_err(DEV, "crypto_hash_setkey() failed with %d\n", rv);
3966                rv = -1;
3967                goto fail;
3968        }
3969
3970        get_random_bytes(my_challenge, CHALLENGE_LEN);
3971
3972        rv = drbd_send_cmd2(mdev, P_AUTH_CHALLENGE, my_challenge, CHALLENGE_LEN);
3973        if (!rv)
3974                goto fail;
3975
3976        rv = drbd_recv_header(mdev, &cmd, &length);
3977        if (!rv)
3978                goto fail;
3979
3980        if (cmd != P_AUTH_CHALLENGE) {
3981                dev_err(DEV, "expected AuthChallenge packet, received: %s (0x%04x)\n",
3982                    cmdname(cmd), cmd);
3983                rv = 0;
3984                goto fail;
3985        }
3986
3987        if (length > CHALLENGE_LEN * 2) {
3988                dev_err(DEV, "expected AuthChallenge payload too big.\n");
3989                rv = -1;
3990                goto fail;
3991        }
3992
3993        peers_ch = kmalloc(length, GFP_NOIO);
3994        if (peers_ch == NULL) {
3995                dev_err(DEV, "kmalloc of peers_ch failed\n");
3996                rv = -1;
3997                goto fail;
3998        }
3999
4000        rv = drbd_recv(mdev, peers_ch, length);
4001
4002        if (rv != length) {
4003                dev_err(DEV, "short read AuthChallenge: l=%u\n", rv);
4004                rv = 0;
4005                goto fail;
4006        }
4007
4008        resp_size = crypto_hash_digestsize(mdev->cram_hmac_tfm);
4009        response = kmalloc(resp_size, GFP_NOIO);
4010        if (response == NULL) {
4011                dev_err(DEV, "kmalloc of response failed\n");
4012                rv = -1;
4013                goto fail;
4014        }
4015
4016        sg_init_table(&sg, 1);
4017        sg_set_buf(&sg, peers_ch, length);
4018
4019        rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4020        if (rv) {
4021                dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4022                rv = -1;
4023                goto fail;
4024        }
4025
4026        rv = drbd_send_cmd2(mdev, P_AUTH_RESPONSE, response, resp_size);
4027        if (!rv)
4028                goto fail;
4029
4030        rv = drbd_recv_header(mdev, &cmd, &length);
4031        if (!rv)
4032                goto fail;
4033
4034        if (cmd != P_AUTH_RESPONSE) {
4035                dev_err(DEV, "expected AuthResponse packet, received: %s (0x%04x)\n",
4036                        cmdname(cmd), cmd);
4037                rv = 0;
4038                goto fail;
4039        }
4040
4041        if (length != resp_size) {
4042                dev_err(DEV, "expected AuthResponse payload of wrong size\n");
4043                rv = 0;
4044                goto fail;
4045        }
4046
4047        rv = drbd_recv(mdev, response , resp_size);
4048
4049        if (rv != resp_size) {
4050                dev_err(DEV, "short read receiving AuthResponse: l=%u\n", rv);
4051                rv = 0;
4052                goto fail;
4053        }
4054
4055        right_response = kmalloc(resp_size, GFP_NOIO);
4056        if (right_response == NULL) {
4057                dev_err(DEV, "kmalloc of right_response failed\n");
4058                rv = -1;
4059                goto fail;
4060        }
4061
4062        sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4063
4064        rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4065        if (rv) {
4066                dev_err(DEV, "crypto_hash_digest() failed with %d\n", rv);
4067                rv = -1;
4068                goto fail;
4069        }
4070
4071        rv = !memcmp(response, right_response, resp_size);
4072
4073        if (rv)
4074                dev_info(DEV, "Peer authenticated using %d bytes of '%s' HMAC\n",
4075                     resp_size, mdev->net_conf->cram_hmac_alg);
4076        else
4077                rv = -1;
4078
4079 fail:
4080        kfree(peers_ch);
4081        kfree(response);
4082        kfree(right_response);
4083
4084        return rv;
4085}
4086#endif
4087
4088int drbdd_init(struct drbd_thread *thi)
4089{
4090        struct drbd_conf *mdev = thi->mdev;
4091        unsigned int minor = mdev_to_minor(mdev);
4092        int h;
4093
4094        sprintf(current->comm, "drbd%d_receiver", minor);
4095
4096        dev_info(DEV, "receiver (re)started\n");
4097
4098        do {
4099                h = drbd_connect(mdev);
4100                if (h == 0) {
4101                        drbd_disconnect(mdev);
4102                        __set_current_state(TASK_INTERRUPTIBLE);
4103                        schedule_timeout(HZ);
4104                }
4105                if (h == -1) {
4106                        dev_warn(DEV, "Discarding network configuration.\n");
4107                        drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4108                }
4109        } while (h == 0);
4110
4111        if (h > 0) {
4112                if (get_net_conf(mdev)) {
4113                        drbdd(mdev);
4114                        put_net_conf(mdev);
4115                }
4116        }
4117
4118        drbd_disconnect(mdev);
4119
4120        dev_info(DEV, "receiver terminated\n");
4121        return 0;
4122}
4123
4124/* ********* acknowledge sender ******** */
4125
4126static int got_RqSReply(struct drbd_conf *mdev, struct p_header80 *h)
4127{
4128        struct p_req_state_reply *p = (struct p_req_state_reply *)h;
4129
4130        int retcode = be32_to_cpu(p->retcode);
4131
4132        if (retcode >= SS_SUCCESS) {
4133                set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4134        } else {
4135                set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4136                dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4137                    drbd_set_st_err_str(retcode), retcode);
4138        }
4139        wake_up(&mdev->state_wait);
4140
4141        return TRUE;
4142}
4143
4144static int got_Ping(struct drbd_conf *mdev, struct p_header80 *h)
4145{
4146        return drbd_send_ping_ack(mdev);
4147
4148}
4149
4150static int got_PingAck(struct drbd_conf *mdev, struct p_header80 *h)
4151{
4152        /* restore idle timeout */
4153        mdev->meta.socket->sk->sk_rcvtimeo = mdev->net_conf->ping_int*HZ;
4154        if (!test_and_set_bit(GOT_PING_ACK, &mdev->flags))
4155                wake_up(&mdev->misc_wait);
4156
4157        return TRUE;
4158}
4159
4160static int got_IsInSync(struct drbd_conf *mdev, struct p_header80 *h)
4161{
4162        struct p_block_ack *p = (struct p_block_ack *)h;
4163        sector_t sector = be64_to_cpu(p->sector);
4164        int blksize = be32_to_cpu(p->blksize);
4165
4166        D_ASSERT(mdev->agreed_pro_version >= 89);
4167
4168        update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4169
4170        if (get_ldev(mdev)) {
4171                drbd_rs_complete_io(mdev, sector);
4172                drbd_set_in_sync(mdev, sector, blksize);
4173                /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4174                mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4175                put_ldev(mdev);
4176        }
4177        dec_rs_pending(mdev);
4178        atomic_add(blksize >> 9, &mdev->rs_sect_in);
4179
4180        return TRUE;
4181}
4182
4183/* when we receive the ACK for a write request,
4184 * verify that we actually know about it */
4185static struct drbd_request *_ack_id_to_req(struct drbd_conf *mdev,
4186        u64 id, sector_t sector)
4187{
4188        struct hlist_head *slot = tl_hash_slot(mdev, sector);
4189        struct hlist_node *n;
4190        struct drbd_request *req;
4191
4192        hlist_for_each_entry(req, n, slot, colision) {
4193                if ((unsigned long)req == (unsigned long)id) {
4194                        if (req->sector != sector) {
4195                                dev_err(DEV, "_ack_id_to_req: found req %p but it has "
4196                                    "wrong sector (%llus versus %llus)\n", req,
4197                                    (unsigned long long)req->sector,
4198                                    (unsigned long long)sector);
4199                                break;
4200                        }
4201                        return req;
4202                }
4203        }
4204        dev_err(DEV, "_ack_id_to_req: failed to find req %p, sector %llus in list\n",
4205                (void *)(unsigned long)id, (unsigned long long)sector);
4206        return NULL;
4207}
4208
4209typedef struct drbd_request *(req_validator_fn)
4210        (struct drbd_conf *mdev, u64 id, sector_t sector);
4211
4212static int validate_req_change_req_state(struct drbd_conf *mdev,
4213        u64 id, sector_t sector, req_validator_fn validator,
4214        const char *func, enum drbd_req_event what)
4215{
4216        struct drbd_request *req;
4217        struct bio_and_error m;
4218
4219        spin_lock_irq(&mdev->req_lock);
4220        req = validator(mdev, id, sector);
4221        if (unlikely(!req)) {
4222                spin_unlock_irq(&mdev->req_lock);
4223                dev_err(DEV, "%s: got a corrupt block_id/sector pair\n", func);
4224                return FALSE;
4225        }
4226        __req_mod(req, what, &m);
4227        spin_unlock_irq(&mdev->req_lock);
4228
4229        if (m.bio)
4230                complete_master_bio(mdev, &m);
4231        return TRUE;
4232}
4233
4234static int got_BlockAck(struct drbd_conf *mdev, struct p_header80 *h)
4235{
4236        struct p_block_ack *p = (struct p_block_ack *)h;
4237        sector_t sector = be64_to_cpu(p->sector);
4238        int blksize = be32_to_cpu(p->blksize);
4239        enum drbd_req_event what;
4240
4241        update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4242
4243        if (is_syncer_block_id(p->block_id)) {
4244                drbd_set_in_sync(mdev, sector, blksize);
4245                dec_rs_pending(mdev);
4246                return TRUE;
4247        }
4248        switch (be16_to_cpu(h->command)) {
4249        case P_RS_WRITE_ACK:
4250                D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4251                what = write_acked_by_peer_and_sis;
4252                break;
4253        case P_WRITE_ACK:
4254                D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4255                what = write_acked_by_peer;
4256                break;
4257        case P_RECV_ACK:
4258                D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_B);
4259                what = recv_acked_by_peer;
4260                break;
4261        case P_DISCARD_ACK:
4262                D_ASSERT(mdev->net_conf->wire_protocol == DRBD_PROT_C);
4263                what = conflict_discarded_by_peer;
4264                break;
4265        default:
4266                D_ASSERT(0);
4267                return FALSE;
4268        }
4269
4270        return validate_req_change_req_state(mdev, p->block_id, sector,
4271                _ack_id_to_req, __func__ , what);
4272}
4273
4274static int got_NegAck(struct drbd_conf *mdev, struct p_header80 *h)
4275{
4276        struct p_block_ack *p = (struct p_block_ack *)h;
4277        sector_t sector = be64_to_cpu(p->sector);
4278
4279        if (__ratelimit(&drbd_ratelimit_state))
4280                dev_warn(DEV, "Got NegAck packet. Peer is in troubles?\n");
4281
4282        update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4283
4284        if (is_syncer_block_id(p->block_id)) {
4285                int size = be32_to_cpu(p->blksize);
4286                dec_rs_pending(mdev);
4287                drbd_rs_failed_io(mdev, sector, size);
4288                return TRUE;
4289        }
4290        return validate_req_change_req_state(mdev, p->block_id, sector,
4291                _ack_id_to_req, __func__ , neg_acked);
4292}
4293
4294static int got_NegDReply(struct drbd_conf *mdev, struct p_header80 *h)
4295{
4296        struct p_block_ack *p = (struct p_block_ack *)h;
4297        sector_t sector = be64_to_cpu(p->sector);
4298
4299        update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4300        dev_err(DEV, "Got NegDReply; Sector %llus, len %u; Fail original request.\n",
4301            (unsigned long long)sector, be32_to_cpu(p->blksize));
4302
4303        return validate_req_change_req_state(mdev, p->block_id, sector,
4304                _ar_id_to_req, __func__ , neg_acked);
4305}
4306
4307static int got_NegRSDReply(struct drbd_conf *mdev, struct p_header80 *h)
4308{
4309        sector_t sector;
4310        int size;
4311        struct p_block_ack *p = (struct p_block_ack *)h;
4312
4313        sector = be64_to_cpu(p->sector);
4314        size = be32_to_cpu(p->blksize);
4315
4316        update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4317
4318        dec_rs_pending(mdev);
4319
4320        if (get_ldev_if_state(mdev, D_FAILED)) {
4321                drbd_rs_complete_io(mdev, sector);
4322                drbd_rs_failed_io(mdev, sector, size);
4323                put_ldev(mdev);
4324        }
4325
4326        return TRUE;
4327}
4328
4329static int got_BarrierAck(struct drbd_conf *mdev, struct p_header80 *h)
4330{
4331        struct p_barrier_ack *p = (struct p_barrier_ack *)h;
4332
4333        tl_release(mdev, p->barrier, be32_to_cpu(p->set_size));
4334
4335        return TRUE;
4336}
4337
4338static int got_OVResult(struct drbd_conf *mdev, struct p_header80 *h)
4339{
4340        struct p_block_ack *p = (struct p_block_ack *)h;
4341        struct drbd_work *w;
4342        sector_t sector;
4343        int size;
4344
4345        sector = be64_to_cpu(p->sector);
4346        size = be32_to_cpu(p->blksize);
4347
4348        update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4349
4350        if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
4351                drbd_ov_oos_found(mdev, sector, size);
4352        else
4353                ov_oos_print(mdev);
4354
4355        if (!get_ldev(mdev))
4356                return TRUE;
4357
4358        drbd_rs_complete_io(mdev, sector);
4359        dec_rs_pending(mdev);
4360
4361        if (--mdev->ov_left == 0) {
4362                w = kmalloc(sizeof(*w), GFP_NOIO);
4363                if (w) {
4364                        w->cb = w_ov_finished;
4365                        drbd_queue_work_front(&mdev->data.work, w);
4366                } else {
4367                        dev_err(DEV, "kmalloc(w) failed.");
4368                        ov_oos_print(mdev);
4369                        drbd_resync_finished(mdev);
4370                }
4371        }
4372        put_ldev(mdev);
4373        return TRUE;
4374}
4375
4376static int got_skip(struct drbd_conf *mdev, struct p_header80 *h)
4377{
4378        return TRUE;
4379}
4380
4381struct asender_cmd {
4382        size_t pkt_size;
4383        int (*process)(struct drbd_conf *mdev, struct p_header80 *h);
4384};
4385
4386static struct asender_cmd *get_asender_cmd(int cmd)
4387{
4388        static struct asender_cmd asender_tbl[] = {
4389                /* anything missing from this table is in
4390                 * the drbd_cmd_handler (drbd_default_handler) table,
4391                 * see the beginning of drbdd() */
4392        [P_PING]            = { sizeof(struct p_header80), got_Ping },
4393        [P_PING_ACK]        = { sizeof(struct p_header80), got_PingAck },
4394        [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
4395        [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
4396        [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
4397        [P_DISCARD_ACK]     = { sizeof(struct p_block_ack), got_BlockAck },
4398        [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
4399        [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
4400        [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply},
4401        [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
4402        [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
4403        [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
4404        [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
4405        [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
4406        [P_MAX_CMD]         = { 0, NULL },
4407        };
4408        if (cmd > P_MAX_CMD || asender_tbl[cmd].process == NULL)
4409                return NULL;
4410        return &asender_tbl[cmd];
4411}
4412
4413int drbd_asender(struct drbd_thread *thi)
4414{
4415        struct drbd_conf *mdev = thi->mdev;
4416        struct p_header80 *h = &mdev->meta.rbuf.header.h80;
4417        struct asender_cmd *cmd = NULL;
4418
4419        int rv, len;
4420        void *buf    = h;
4421        int received = 0;
4422        int expect   = sizeof(struct p_header80);
4423        int empty;
4424
4425        sprintf(current->comm, "drbd%d_asender", mdev_to_minor(mdev));
4426
4427        current->policy = SCHED_RR;  /* Make this a realtime task! */
4428        current->rt_priority = 2;    /* more important than all other tasks */
4429
4430        while (get_t_state(thi) == Running) {
4431                drbd_thread_current_set_cpu(mdev);
4432                if (test_and_clear_bit(SEND_PING, &mdev->flags)) {
4433                        ERR_IF(!drbd_send_ping(mdev)) goto reconnect;
4434                        mdev->meta.socket->sk->sk_rcvtimeo =
4435                                mdev->net_conf->ping_timeo*HZ/10;
4436                }
4437
4438                /* conditionally cork;
4439                 * it may hurt latency if we cork without much to send */
4440                if (!mdev->net_conf->no_cork &&
4441                        3 < atomic_read(&mdev->unacked_cnt))
4442                        drbd_tcp_cork(mdev->meta.socket);
4443                while (1) {
4444                        clear_bit(SIGNAL_ASENDER, &mdev->flags);
4445                        flush_signals(current);
4446                        if (!drbd_process_done_ee(mdev))
4447                                goto reconnect;
4448                        /* to avoid race with newly queued ACKs */
4449                        set_bit(SIGNAL_ASENDER, &mdev->flags);
4450                        spin_lock_irq(&mdev->req_lock);
4451                        empty = list_empty(&mdev->done_ee);
4452                        spin_unlock_irq(&mdev->req_lock);
4453                        /* new ack may have been queued right here,
4454                         * but then there is also a signal pending,
4455                         * and we start over... */
4456                        if (empty)
4457                                break;
4458                }
4459                /* but unconditionally uncork unless disabled */
4460                if (!mdev->net_conf->no_cork)
4461                        drbd_tcp_uncork(mdev->meta.socket);
4462
4463                /* short circuit, recv_msg would return EINTR anyways. */
4464                if (signal_pending(current))
4465                        continue;
4466
4467                rv = drbd_recv_short(mdev, mdev->meta.socket,
4468                                     buf, expect-received, 0);
4469                clear_bit(SIGNAL_ASENDER, &mdev->flags);
4470
4471                flush_signals(current);
4472
4473                /* Note:
4474                 * -EINTR        (on meta) we got a signal
4475                 * -EAGAIN       (on meta) rcvtimeo expired
4476                 * -ECONNRESET   other side closed the connection
4477                 * -ERESTARTSYS  (on data) we got a signal
4478                 * rv <  0       other than above: unexpected error!
4479                 * rv == expected: full header or command
4480                 * rv <  expected: "woken" by signal during receive
4481                 * rv == 0       : "connection shut down by peer"
4482                 */
4483                if (likely(rv > 0)) {
4484                        received += rv;
4485                        buf      += rv;
4486                } else if (rv == 0) {
4487                        dev_err(DEV, "meta connection shut down by peer.\n");
4488                        goto reconnect;
4489                } else if (rv == -EAGAIN) {
4490                        if (mdev->meta.socket->sk->sk_rcvtimeo ==
4491                            mdev->net_conf->ping_timeo*HZ/10) {
4492                                dev_err(DEV, "PingAck did not arrive in time.\n");
4493                                goto reconnect;
4494                        }
4495                        set_bit(SEND_PING, &mdev->flags);
4496                        continue;
4497                } else if (rv == -EINTR) {
4498                        continue;
4499                } else {
4500                        dev_err(DEV, "sock_recvmsg returned %d\n", rv);
4501                        goto reconnect;
4502                }
4503
4504                if (received == expect && cmd == NULL) {
4505                        if (unlikely(h->magic != BE_DRBD_MAGIC)) {
4506                                dev_err(DEV, "magic?? on meta m: 0x%08x c: %d l: %d\n",
4507                                    be32_to_cpu(h->magic),
4508                                    be16_to_cpu(h->command),
4509                                    be16_to_cpu(h->length));
4510                                goto reconnect;
4511                        }
4512                        cmd = get_asender_cmd(be16_to_cpu(h->command));
4513                        len = be16_to_cpu(h->length);
4514                        if (unlikely(cmd == NULL)) {
4515                                dev_err(DEV, "unknown command?? on meta m: 0x%08x c: %d l: %d\n",
4516                                    be32_to_cpu(h->magic),
4517                                    be16_to_cpu(h->command),
4518                                    be16_to_cpu(h->length));
4519                                goto disconnect;
4520                        }
4521                        expect = cmd->pkt_size;
4522                        ERR_IF(len != expect-sizeof(struct p_header80))
4523                                goto reconnect;
4524                }
4525                if (received == expect) {
4526                        D_ASSERT(cmd != NULL);
4527                        if (!cmd->process(mdev, h))
4528                                goto reconnect;
4529
4530                        buf      = h;
4531                        received = 0;
4532                        expect   = sizeof(struct p_header80);
4533                        cmd      = NULL;
4534                }
4535        }
4536
4537        if (0) {
4538reconnect:
4539                drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE));
4540                drbd_md_sync(mdev);
4541        }
4542        if (0) {
4543disconnect:
4544                drbd_force_state(mdev, NS(conn, C_DISCONNECTING));
4545                drbd_md_sync(mdev);
4546        }
4547        clear_bit(SIGNAL_ASENDER, &mdev->flags);
4548
4549        D_ASSERT(mdev->state.conn < C_CONNECTED);
4550        dev_info(DEV, "asender terminated\n");
4551
4552        return 0;
4553}
4554