linux/drivers/block/drbd/drbd_receiver.c
<<
>>
Prefs
   1/*
   2   drbd_receiver.c
   3
   4   This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
   5
   6   Copyright (C) 2001-2008, LINBIT Information Technologies GmbH.
   7   Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>.
   8   Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>.
   9
  10   drbd is free software; you can redistribute it and/or modify
  11   it under the terms of the GNU General Public License as published by
  12   the Free Software Foundation; either version 2, or (at your option)
  13   any later version.
  14
  15   drbd is distributed in the hope that it will be useful,
  16   but WITHOUT ANY WARRANTY; without even the implied warranty of
  17   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  18   GNU General Public License for more details.
  19
  20   You should have received a copy of the GNU General Public License
  21   along with drbd; see the file COPYING.  If not, write to
  22   the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
  23 */
  24
  25
  26#include <linux/module.h>
  27
  28#include <asm/uaccess.h>
  29#include <net/sock.h>
  30
  31#include <linux/drbd.h>
  32#include <linux/fs.h>
  33#include <linux/file.h>
  34#include <linux/in.h>
  35#include <linux/mm.h>
  36#include <linux/memcontrol.h>
  37#include <linux/mm_inline.h>
  38#include <linux/slab.h>
  39#include <linux/pkt_sched.h>
  40#define __KERNEL_SYSCALLS__
  41#include <linux/unistd.h>
  42#include <linux/vmalloc.h>
  43#include <linux/random.h>
  44#include <linux/string.h>
  45#include <linux/scatterlist.h>
  46#include "drbd_int.h"
  47#include "drbd_req.h"
  48
  49#include "drbd_vli.h"
  50
  51struct packet_info {
  52        enum drbd_packet cmd;
  53        unsigned int size;
  54        unsigned int vnr;
  55        void *data;
  56};
  57
  58enum finish_epoch {
  59        FE_STILL_LIVE,
  60        FE_DESTROYED,
  61        FE_RECYCLED,
  62};
  63
  64static int drbd_do_features(struct drbd_tconn *tconn);
  65static int drbd_do_auth(struct drbd_tconn *tconn);
  66static int drbd_disconnected(struct drbd_conf *mdev);
  67
  68static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *, struct drbd_epoch *, enum epoch_event);
  69static int e_end_block(struct drbd_work *, int);
  70
  71
  72#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN)
  73
  74/*
  75 * some helper functions to deal with single linked page lists,
  76 * page->private being our "next" pointer.
  77 */
  78
  79/* If at least n pages are linked at head, get n pages off.
  80 * Otherwise, don't modify head, and return NULL.
  81 * Locking is the responsibility of the caller.
  82 */
  83static struct page *page_chain_del(struct page **head, int n)
  84{
  85        struct page *page;
  86        struct page *tmp;
  87
  88        BUG_ON(!n);
  89        BUG_ON(!head);
  90
  91        page = *head;
  92
  93        if (!page)
  94                return NULL;
  95
  96        while (page) {
  97                tmp = page_chain_next(page);
  98                if (--n == 0)
  99                        break; /* found sufficient pages */
 100                if (tmp == NULL)
 101                        /* insufficient pages, don't use any of them. */
 102                        return NULL;
 103                page = tmp;
 104        }
 105
 106        /* add end of list marker for the returned list */
 107        set_page_private(page, 0);
 108        /* actual return value, and adjustment of head */
 109        page = *head;
 110        *head = tmp;
 111        return page;
 112}
 113
 114/* may be used outside of locks to find the tail of a (usually short)
 115 * "private" page chain, before adding it back to a global chain head
 116 * with page_chain_add() under a spinlock. */
 117static struct page *page_chain_tail(struct page *page, int *len)
 118{
 119        struct page *tmp;
 120        int i = 1;
 121        while ((tmp = page_chain_next(page)))
 122                ++i, page = tmp;
 123        if (len)
 124                *len = i;
 125        return page;
 126}
 127
 128static int page_chain_free(struct page *page)
 129{
 130        struct page *tmp;
 131        int i = 0;
 132        page_chain_for_each_safe(page, tmp) {
 133                put_page(page);
 134                ++i;
 135        }
 136        return i;
 137}
 138
 139static void page_chain_add(struct page **head,
 140                struct page *chain_first, struct page *chain_last)
 141{
 142#if 1
 143        struct page *tmp;
 144        tmp = page_chain_tail(chain_first, NULL);
 145        BUG_ON(tmp != chain_last);
 146#endif
 147
 148        /* add chain to head */
 149        set_page_private(chain_last, (unsigned long)*head);
 150        *head = chain_first;
 151}
 152
 153static struct page *__drbd_alloc_pages(struct drbd_conf *mdev,
 154                                       unsigned int number)
 155{
 156        struct page *page = NULL;
 157        struct page *tmp = NULL;
 158        unsigned int i = 0;
 159
 160        /* Yes, testing drbd_pp_vacant outside the lock is racy.
 161         * So what. It saves a spin_lock. */
 162        if (drbd_pp_vacant >= number) {
 163                spin_lock(&drbd_pp_lock);
 164                page = page_chain_del(&drbd_pp_pool, number);
 165                if (page)
 166                        drbd_pp_vacant -= number;
 167                spin_unlock(&drbd_pp_lock);
 168                if (page)
 169                        return page;
 170        }
 171
 172        /* GFP_TRY, because we must not cause arbitrary write-out: in a DRBD
 173         * "criss-cross" setup, that might cause write-out on some other DRBD,
 174         * which in turn might block on the other node at this very place.  */
 175        for (i = 0; i < number; i++) {
 176                tmp = alloc_page(GFP_TRY);
 177                if (!tmp)
 178                        break;
 179                set_page_private(tmp, (unsigned long)page);
 180                page = tmp;
 181        }
 182
 183        if (i == number)
 184                return page;
 185
 186        /* Not enough pages immediately available this time.
 187         * No need to jump around here, drbd_alloc_pages will retry this
 188         * function "soon". */
 189        if (page) {
 190                tmp = page_chain_tail(page, NULL);
 191                spin_lock(&drbd_pp_lock);
 192                page_chain_add(&drbd_pp_pool, page, tmp);
 193                drbd_pp_vacant += i;
 194                spin_unlock(&drbd_pp_lock);
 195        }
 196        return NULL;
 197}
 198
 199static void reclaim_finished_net_peer_reqs(struct drbd_conf *mdev,
 200                                           struct list_head *to_be_freed)
 201{
 202        struct drbd_peer_request *peer_req;
 203        struct list_head *le, *tle;
 204
 205        /* The EEs are always appended to the end of the list. Since
 206           they are sent in order over the wire, they have to finish
 207           in order. As soon as we see the first not finished we can
 208           stop to examine the list... */
 209
 210        list_for_each_safe(le, tle, &mdev->net_ee) {
 211                peer_req = list_entry(le, struct drbd_peer_request, w.list);
 212                if (drbd_peer_req_has_active_page(peer_req))
 213                        break;
 214                list_move(le, to_be_freed);
 215        }
 216}
 217
 218static void drbd_kick_lo_and_reclaim_net(struct drbd_conf *mdev)
 219{
 220        LIST_HEAD(reclaimed);
 221        struct drbd_peer_request *peer_req, *t;
 222
 223        spin_lock_irq(&mdev->tconn->req_lock);
 224        reclaim_finished_net_peer_reqs(mdev, &reclaimed);
 225        spin_unlock_irq(&mdev->tconn->req_lock);
 226
 227        list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
 228                drbd_free_net_peer_req(mdev, peer_req);
 229}
 230
 231/**
 232 * drbd_alloc_pages() - Returns @number pages, retries forever (or until signalled)
 233 * @mdev:       DRBD device.
 234 * @number:     number of pages requested
 235 * @retry:      whether to retry, if not enough pages are available right now
 236 *
 237 * Tries to allocate number pages, first from our own page pool, then from
 238 * the kernel, unless this allocation would exceed the max_buffers setting.
 239 * Possibly retry until DRBD frees sufficient pages somewhere else.
 240 *
 241 * Returns a page chain linked via page->private.
 242 */
 243struct page *drbd_alloc_pages(struct drbd_conf *mdev, unsigned int number,
 244                              bool retry)
 245{
 246        struct page *page = NULL;
 247        struct net_conf *nc;
 248        DEFINE_WAIT(wait);
 249        int mxb;
 250
 251        /* Yes, we may run up to @number over max_buffers. If we
 252         * follow it strictly, the admin will get it wrong anyways. */
 253        rcu_read_lock();
 254        nc = rcu_dereference(mdev->tconn->net_conf);
 255        mxb = nc ? nc->max_buffers : 1000000;
 256        rcu_read_unlock();
 257
 258        if (atomic_read(&mdev->pp_in_use) < mxb)
 259                page = __drbd_alloc_pages(mdev, number);
 260
 261        while (page == NULL) {
 262                prepare_to_wait(&drbd_pp_wait, &wait, TASK_INTERRUPTIBLE);
 263
 264                drbd_kick_lo_and_reclaim_net(mdev);
 265
 266                if (atomic_read(&mdev->pp_in_use) < mxb) {
 267                        page = __drbd_alloc_pages(mdev, number);
 268                        if (page)
 269                                break;
 270                }
 271
 272                if (!retry)
 273                        break;
 274
 275                if (signal_pending(current)) {
 276                        dev_warn(DEV, "drbd_alloc_pages interrupted!\n");
 277                        break;
 278                }
 279
 280                schedule();
 281        }
 282        finish_wait(&drbd_pp_wait, &wait);
 283
 284        if (page)
 285                atomic_add(number, &mdev->pp_in_use);
 286        return page;
 287}
 288
 289/* Must not be used from irq, as that may deadlock: see drbd_alloc_pages.
 290 * Is also used from inside an other spin_lock_irq(&mdev->tconn->req_lock);
 291 * Either links the page chain back to the global pool,
 292 * or returns all pages to the system. */
 293static void drbd_free_pages(struct drbd_conf *mdev, struct page *page, int is_net)
 294{
 295        atomic_t *a = is_net ? &mdev->pp_in_use_by_net : &mdev->pp_in_use;
 296        int i;
 297
 298        if (page == NULL)
 299                return;
 300
 301        if (drbd_pp_vacant > (DRBD_MAX_BIO_SIZE/PAGE_SIZE) * minor_count)
 302                i = page_chain_free(page);
 303        else {
 304                struct page *tmp;
 305                tmp = page_chain_tail(page, &i);
 306                spin_lock(&drbd_pp_lock);
 307                page_chain_add(&drbd_pp_pool, page, tmp);
 308                drbd_pp_vacant += i;
 309                spin_unlock(&drbd_pp_lock);
 310        }
 311        i = atomic_sub_return(i, a);
 312        if (i < 0)
 313                dev_warn(DEV, "ASSERTION FAILED: %s: %d < 0\n",
 314                        is_net ? "pp_in_use_by_net" : "pp_in_use", i);
 315        wake_up(&drbd_pp_wait);
 316}
 317
 318/*
 319You need to hold the req_lock:
 320 _drbd_wait_ee_list_empty()
 321
 322You must not have the req_lock:
 323 drbd_free_peer_req()
 324 drbd_alloc_peer_req()
 325 drbd_free_peer_reqs()
 326 drbd_ee_fix_bhs()
 327 drbd_finish_peer_reqs()
 328 drbd_clear_done_ee()
 329 drbd_wait_ee_list_empty()
 330*/
 331
 332struct drbd_peer_request *
 333drbd_alloc_peer_req(struct drbd_conf *mdev, u64 id, sector_t sector,
 334                    unsigned int data_size, gfp_t gfp_mask) __must_hold(local)
 335{
 336        struct drbd_peer_request *peer_req;
 337        struct page *page = NULL;
 338        unsigned nr_pages = (data_size + PAGE_SIZE -1) >> PAGE_SHIFT;
 339
 340        if (drbd_insert_fault(mdev, DRBD_FAULT_AL_EE))
 341                return NULL;
 342
 343        peer_req = mempool_alloc(drbd_ee_mempool, gfp_mask & ~__GFP_HIGHMEM);
 344        if (!peer_req) {
 345                if (!(gfp_mask & __GFP_NOWARN))
 346                        dev_err(DEV, "%s: allocation failed\n", __func__);
 347                return NULL;
 348        }
 349
 350        if (data_size) {
 351                page = drbd_alloc_pages(mdev, nr_pages, (gfp_mask & __GFP_WAIT));
 352                if (!page)
 353                        goto fail;
 354        }
 355
 356        drbd_clear_interval(&peer_req->i);
 357        peer_req->i.size = data_size;
 358        peer_req->i.sector = sector;
 359        peer_req->i.local = false;
 360        peer_req->i.waiting = false;
 361
 362        peer_req->epoch = NULL;
 363        peer_req->w.mdev = mdev;
 364        peer_req->pages = page;
 365        atomic_set(&peer_req->pending_bios, 0);
 366        peer_req->flags = 0;
 367        /*
 368         * The block_id is opaque to the receiver.  It is not endianness
 369         * converted, and sent back to the sender unchanged.
 370         */
 371        peer_req->block_id = id;
 372
 373        return peer_req;
 374
 375 fail:
 376        mempool_free(peer_req, drbd_ee_mempool);
 377        return NULL;
 378}
 379
 380void __drbd_free_peer_req(struct drbd_conf *mdev, struct drbd_peer_request *peer_req,
 381                       int is_net)
 382{
 383        if (peer_req->flags & EE_HAS_DIGEST)
 384                kfree(peer_req->digest);
 385        drbd_free_pages(mdev, peer_req->pages, is_net);
 386        D_ASSERT(atomic_read(&peer_req->pending_bios) == 0);
 387        D_ASSERT(drbd_interval_empty(&peer_req->i));
 388        mempool_free(peer_req, drbd_ee_mempool);
 389}
 390
 391int drbd_free_peer_reqs(struct drbd_conf *mdev, struct list_head *list)
 392{
 393        LIST_HEAD(work_list);
 394        struct drbd_peer_request *peer_req, *t;
 395        int count = 0;
 396        int is_net = list == &mdev->net_ee;
 397
 398        spin_lock_irq(&mdev->tconn->req_lock);
 399        list_splice_init(list, &work_list);
 400        spin_unlock_irq(&mdev->tconn->req_lock);
 401
 402        list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
 403                __drbd_free_peer_req(mdev, peer_req, is_net);
 404                count++;
 405        }
 406        return count;
 407}
 408
 409/*
 410 * See also comments in _req_mod(,BARRIER_ACKED) and receive_Barrier.
 411 */
 412static int drbd_finish_peer_reqs(struct drbd_conf *mdev)
 413{
 414        LIST_HEAD(work_list);
 415        LIST_HEAD(reclaimed);
 416        struct drbd_peer_request *peer_req, *t;
 417        int err = 0;
 418
 419        spin_lock_irq(&mdev->tconn->req_lock);
 420        reclaim_finished_net_peer_reqs(mdev, &reclaimed);
 421        list_splice_init(&mdev->done_ee, &work_list);
 422        spin_unlock_irq(&mdev->tconn->req_lock);
 423
 424        list_for_each_entry_safe(peer_req, t, &reclaimed, w.list)
 425                drbd_free_net_peer_req(mdev, peer_req);
 426
 427        /* possible callbacks here:
 428         * e_end_block, and e_end_resync_block, e_send_superseded.
 429         * all ignore the last argument.
 430         */
 431        list_for_each_entry_safe(peer_req, t, &work_list, w.list) {
 432                int err2;
 433
 434                /* list_del not necessary, next/prev members not touched */
 435                err2 = peer_req->w.cb(&peer_req->w, !!err);
 436                if (!err)
 437                        err = err2;
 438                drbd_free_peer_req(mdev, peer_req);
 439        }
 440        wake_up(&mdev->ee_wait);
 441
 442        return err;
 443}
 444
 445static void _drbd_wait_ee_list_empty(struct drbd_conf *mdev,
 446                                     struct list_head *head)
 447{
 448        DEFINE_WAIT(wait);
 449
 450        /* avoids spin_lock/unlock
 451         * and calling prepare_to_wait in the fast path */
 452        while (!list_empty(head)) {
 453                prepare_to_wait(&mdev->ee_wait, &wait, TASK_UNINTERRUPTIBLE);
 454                spin_unlock_irq(&mdev->tconn->req_lock);
 455                io_schedule();
 456                finish_wait(&mdev->ee_wait, &wait);
 457                spin_lock_irq(&mdev->tconn->req_lock);
 458        }
 459}
 460
 461static void drbd_wait_ee_list_empty(struct drbd_conf *mdev,
 462                                    struct list_head *head)
 463{
 464        spin_lock_irq(&mdev->tconn->req_lock);
 465        _drbd_wait_ee_list_empty(mdev, head);
 466        spin_unlock_irq(&mdev->tconn->req_lock);
 467}
 468
 469static int drbd_recv_short(struct socket *sock, void *buf, size_t size, int flags)
 470{
 471        mm_segment_t oldfs;
 472        struct kvec iov = {
 473                .iov_base = buf,
 474                .iov_len = size,
 475        };
 476        struct msghdr msg = {
 477                .msg_iovlen = 1,
 478                .msg_iov = (struct iovec *)&iov,
 479                .msg_flags = (flags ? flags : MSG_WAITALL | MSG_NOSIGNAL)
 480        };
 481        int rv;
 482
 483        oldfs = get_fs();
 484        set_fs(KERNEL_DS);
 485        rv = sock_recvmsg(sock, &msg, size, msg.msg_flags);
 486        set_fs(oldfs);
 487
 488        return rv;
 489}
 490
 491static int drbd_recv(struct drbd_tconn *tconn, void *buf, size_t size)
 492{
 493        int rv;
 494
 495        rv = drbd_recv_short(tconn->data.socket, buf, size, 0);
 496
 497        if (rv < 0) {
 498                if (rv == -ECONNRESET)
 499                        conn_info(tconn, "sock was reset by peer\n");
 500                else if (rv != -ERESTARTSYS)
 501                        conn_err(tconn, "sock_recvmsg returned %d\n", rv);
 502        } else if (rv == 0) {
 503                if (test_bit(DISCONNECT_SENT, &tconn->flags)) {
 504                        long t;
 505                        rcu_read_lock();
 506                        t = rcu_dereference(tconn->net_conf)->ping_timeo * HZ/10;
 507                        rcu_read_unlock();
 508
 509                        t = wait_event_timeout(tconn->ping_wait, tconn->cstate < C_WF_REPORT_PARAMS, t);
 510
 511                        if (t)
 512                                goto out;
 513                }
 514                conn_info(tconn, "sock was shut down by peer\n");
 515        }
 516
 517        if (rv != size)
 518                conn_request_state(tconn, NS(conn, C_BROKEN_PIPE), CS_HARD);
 519
 520out:
 521        return rv;
 522}
 523
 524static int drbd_recv_all(struct drbd_tconn *tconn, void *buf, size_t size)
 525{
 526        int err;
 527
 528        err = drbd_recv(tconn, buf, size);
 529        if (err != size) {
 530                if (err >= 0)
 531                        err = -EIO;
 532        } else
 533                err = 0;
 534        return err;
 535}
 536
 537static int drbd_recv_all_warn(struct drbd_tconn *tconn, void *buf, size_t size)
 538{
 539        int err;
 540
 541        err = drbd_recv_all(tconn, buf, size);
 542        if (err && !signal_pending(current))
 543                conn_warn(tconn, "short read (expected size %d)\n", (int)size);
 544        return err;
 545}
 546
 547/* quoting tcp(7):
 548 *   On individual connections, the socket buffer size must be set prior to the
 549 *   listen(2) or connect(2) calls in order to have it take effect.
 550 * This is our wrapper to do so.
 551 */
 552static void drbd_setbufsize(struct socket *sock, unsigned int snd,
 553                unsigned int rcv)
 554{
 555        /* open coded SO_SNDBUF, SO_RCVBUF */
 556        if (snd) {
 557                sock->sk->sk_sndbuf = snd;
 558                sock->sk->sk_userlocks |= SOCK_SNDBUF_LOCK;
 559        }
 560        if (rcv) {
 561                sock->sk->sk_rcvbuf = rcv;
 562                sock->sk->sk_userlocks |= SOCK_RCVBUF_LOCK;
 563        }
 564}
 565
 566static struct socket *drbd_try_connect(struct drbd_tconn *tconn)
 567{
 568        const char *what;
 569        struct socket *sock;
 570        struct sockaddr_in6 src_in6;
 571        struct sockaddr_in6 peer_in6;
 572        struct net_conf *nc;
 573        int err, peer_addr_len, my_addr_len;
 574        int sndbuf_size, rcvbuf_size, connect_int;
 575        int disconnect_on_error = 1;
 576
 577        rcu_read_lock();
 578        nc = rcu_dereference(tconn->net_conf);
 579        if (!nc) {
 580                rcu_read_unlock();
 581                return NULL;
 582        }
 583        sndbuf_size = nc->sndbuf_size;
 584        rcvbuf_size = nc->rcvbuf_size;
 585        connect_int = nc->connect_int;
 586        rcu_read_unlock();
 587
 588        my_addr_len = min_t(int, tconn->my_addr_len, sizeof(src_in6));
 589        memcpy(&src_in6, &tconn->my_addr, my_addr_len);
 590
 591        if (((struct sockaddr *)&tconn->my_addr)->sa_family == AF_INET6)
 592                src_in6.sin6_port = 0;
 593        else
 594                ((struct sockaddr_in *)&src_in6)->sin_port = 0; /* AF_INET & AF_SCI */
 595
 596        peer_addr_len = min_t(int, tconn->peer_addr_len, sizeof(src_in6));
 597        memcpy(&peer_in6, &tconn->peer_addr, peer_addr_len);
 598
 599        what = "sock_create_kern";
 600        err = sock_create_kern(((struct sockaddr *)&src_in6)->sa_family,
 601                               SOCK_STREAM, IPPROTO_TCP, &sock);
 602        if (err < 0) {
 603                sock = NULL;
 604                goto out;
 605        }
 606
 607        sock->sk->sk_rcvtimeo =
 608        sock->sk->sk_sndtimeo = connect_int * HZ;
 609        drbd_setbufsize(sock, sndbuf_size, rcvbuf_size);
 610
 611       /* explicitly bind to the configured IP as source IP
 612        *  for the outgoing connections.
 613        *  This is needed for multihomed hosts and to be
 614        *  able to use lo: interfaces for drbd.
 615        * Make sure to use 0 as port number, so linux selects
 616        *  a free one dynamically.
 617        */
 618        what = "bind before connect";
 619        err = sock->ops->bind(sock, (struct sockaddr *) &src_in6, my_addr_len);
 620        if (err < 0)
 621                goto out;
 622
 623        /* connect may fail, peer not yet available.
 624         * stay C_WF_CONNECTION, don't go Disconnecting! */
 625        disconnect_on_error = 0;
 626        what = "connect";
 627        err = sock->ops->connect(sock, (struct sockaddr *) &peer_in6, peer_addr_len, 0);
 628
 629out:
 630        if (err < 0) {
 631                if (sock) {
 632                        sock_release(sock);
 633                        sock = NULL;
 634                }
 635                switch (-err) {
 636                        /* timeout, busy, signal pending */
 637                case ETIMEDOUT: case EAGAIN: case EINPROGRESS:
 638                case EINTR: case ERESTARTSYS:
 639                        /* peer not (yet) available, network problem */
 640                case ECONNREFUSED: case ENETUNREACH:
 641                case EHOSTDOWN:    case EHOSTUNREACH:
 642                        disconnect_on_error = 0;
 643                        break;
 644                default:
 645                        conn_err(tconn, "%s failed, err = %d\n", what, err);
 646                }
 647                if (disconnect_on_error)
 648                        conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
 649        }
 650
 651        return sock;
 652}
 653
 654struct accept_wait_data {
 655        struct drbd_tconn *tconn;
 656        struct socket *s_listen;
 657        struct completion door_bell;
 658        void (*original_sk_state_change)(struct sock *sk);
 659
 660};
 661
 662static void drbd_incoming_connection(struct sock *sk)
 663{
 664        struct accept_wait_data *ad = sk->sk_user_data;
 665        void (*state_change)(struct sock *sk);
 666
 667        state_change = ad->original_sk_state_change;
 668        if (sk->sk_state == TCP_ESTABLISHED)
 669                complete(&ad->door_bell);
 670        state_change(sk);
 671}
 672
 673static int prepare_listen_socket(struct drbd_tconn *tconn, struct accept_wait_data *ad)
 674{
 675        int err, sndbuf_size, rcvbuf_size, my_addr_len;
 676        struct sockaddr_in6 my_addr;
 677        struct socket *s_listen;
 678        struct net_conf *nc;
 679        const char *what;
 680
 681        rcu_read_lock();
 682        nc = rcu_dereference(tconn->net_conf);
 683        if (!nc) {
 684                rcu_read_unlock();
 685                return -EIO;
 686        }
 687        sndbuf_size = nc->sndbuf_size;
 688        rcvbuf_size = nc->rcvbuf_size;
 689        rcu_read_unlock();
 690
 691        my_addr_len = min_t(int, tconn->my_addr_len, sizeof(struct sockaddr_in6));
 692        memcpy(&my_addr, &tconn->my_addr, my_addr_len);
 693
 694        what = "sock_create_kern";
 695        err = sock_create_kern(((struct sockaddr *)&my_addr)->sa_family,
 696                               SOCK_STREAM, IPPROTO_TCP, &s_listen);
 697        if (err) {
 698                s_listen = NULL;
 699                goto out;
 700        }
 701
 702        s_listen->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
 703        drbd_setbufsize(s_listen, sndbuf_size, rcvbuf_size);
 704
 705        what = "bind before listen";
 706        err = s_listen->ops->bind(s_listen, (struct sockaddr *)&my_addr, my_addr_len);
 707        if (err < 0)
 708                goto out;
 709
 710        ad->s_listen = s_listen;
 711        write_lock_bh(&s_listen->sk->sk_callback_lock);
 712        ad->original_sk_state_change = s_listen->sk->sk_state_change;
 713        s_listen->sk->sk_state_change = drbd_incoming_connection;
 714        s_listen->sk->sk_user_data = ad;
 715        write_unlock_bh(&s_listen->sk->sk_callback_lock);
 716
 717        what = "listen";
 718        err = s_listen->ops->listen(s_listen, 5);
 719        if (err < 0)
 720                goto out;
 721
 722        return 0;
 723out:
 724        if (s_listen)
 725                sock_release(s_listen);
 726        if (err < 0) {
 727                if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
 728                        conn_err(tconn, "%s failed, err = %d\n", what, err);
 729                        conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
 730                }
 731        }
 732
 733        return -EIO;
 734}
 735
 736static void unregister_state_change(struct sock *sk, struct accept_wait_data *ad)
 737{
 738        write_lock_bh(&sk->sk_callback_lock);
 739        sk->sk_state_change = ad->original_sk_state_change;
 740        sk->sk_user_data = NULL;
 741        write_unlock_bh(&sk->sk_callback_lock);
 742}
 743
 744static struct socket *drbd_wait_for_connect(struct drbd_tconn *tconn, struct accept_wait_data *ad)
 745{
 746        int timeo, connect_int, err = 0;
 747        struct socket *s_estab = NULL;
 748        struct net_conf *nc;
 749
 750        rcu_read_lock();
 751        nc = rcu_dereference(tconn->net_conf);
 752        if (!nc) {
 753                rcu_read_unlock();
 754                return NULL;
 755        }
 756        connect_int = nc->connect_int;
 757        rcu_read_unlock();
 758
 759        timeo = connect_int * HZ;
 760        /* 28.5% random jitter */
 761        timeo += (prandom_u32() & 1) ? timeo / 7 : -timeo / 7;
 762
 763        err = wait_for_completion_interruptible_timeout(&ad->door_bell, timeo);
 764        if (err <= 0)
 765                return NULL;
 766
 767        err = kernel_accept(ad->s_listen, &s_estab, 0);
 768        if (err < 0) {
 769                if (err != -EAGAIN && err != -EINTR && err != -ERESTARTSYS) {
 770                        conn_err(tconn, "accept failed, err = %d\n", err);
 771                        conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
 772                }
 773        }
 774
 775        if (s_estab)
 776                unregister_state_change(s_estab->sk, ad);
 777
 778        return s_estab;
 779}
 780
 781static int decode_header(struct drbd_tconn *, void *, struct packet_info *);
 782
 783static int send_first_packet(struct drbd_tconn *tconn, struct drbd_socket *sock,
 784                             enum drbd_packet cmd)
 785{
 786        if (!conn_prepare_command(tconn, sock))
 787                return -EIO;
 788        return conn_send_command(tconn, sock, cmd, 0, NULL, 0);
 789}
 790
 791static int receive_first_packet(struct drbd_tconn *tconn, struct socket *sock)
 792{
 793        unsigned int header_size = drbd_header_size(tconn);
 794        struct packet_info pi;
 795        int err;
 796
 797        err = drbd_recv_short(sock, tconn->data.rbuf, header_size, 0);
 798        if (err != header_size) {
 799                if (err >= 0)
 800                        err = -EIO;
 801                return err;
 802        }
 803        err = decode_header(tconn, tconn->data.rbuf, &pi);
 804        if (err)
 805                return err;
 806        return pi.cmd;
 807}
 808
 809/**
 810 * drbd_socket_okay() - Free the socket if its connection is not okay
 811 * @sock:       pointer to the pointer to the socket.
 812 */
 813static int drbd_socket_okay(struct socket **sock)
 814{
 815        int rr;
 816        char tb[4];
 817
 818        if (!*sock)
 819                return false;
 820
 821        rr = drbd_recv_short(*sock, tb, 4, MSG_DONTWAIT | MSG_PEEK);
 822
 823        if (rr > 0 || rr == -EAGAIN) {
 824                return true;
 825        } else {
 826                sock_release(*sock);
 827                *sock = NULL;
 828                return false;
 829        }
 830}
 831/* Gets called if a connection is established, or if a new minor gets created
 832   in a connection */
 833int drbd_connected(struct drbd_conf *mdev)
 834{
 835        int err;
 836
 837        atomic_set(&mdev->packet_seq, 0);
 838        mdev->peer_seq = 0;
 839
 840        mdev->state_mutex = mdev->tconn->agreed_pro_version < 100 ?
 841                &mdev->tconn->cstate_mutex :
 842                &mdev->own_state_mutex;
 843
 844        err = drbd_send_sync_param(mdev);
 845        if (!err)
 846                err = drbd_send_sizes(mdev, 0, 0);
 847        if (!err)
 848                err = drbd_send_uuids(mdev);
 849        if (!err)
 850                err = drbd_send_current_state(mdev);
 851        clear_bit(USE_DEGR_WFC_T, &mdev->flags);
 852        clear_bit(RESIZE_PENDING, &mdev->flags);
 853        atomic_set(&mdev->ap_in_flight, 0);
 854        mod_timer(&mdev->request_timer, jiffies + HZ); /* just start it here. */
 855        return err;
 856}
 857
 858/*
 859 * return values:
 860 *   1 yes, we have a valid connection
 861 *   0 oops, did not work out, please try again
 862 *  -1 peer talks different language,
 863 *     no point in trying again, please go standalone.
 864 *  -2 We do not have a network config...
 865 */
 866static int conn_connect(struct drbd_tconn *tconn)
 867{
 868        struct drbd_socket sock, msock;
 869        struct drbd_conf *mdev;
 870        struct net_conf *nc;
 871        int vnr, timeout, h, ok;
 872        bool discard_my_data;
 873        enum drbd_state_rv rv;
 874        struct accept_wait_data ad = {
 875                .tconn = tconn,
 876                .door_bell = COMPLETION_INITIALIZER_ONSTACK(ad.door_bell),
 877        };
 878
 879        clear_bit(DISCONNECT_SENT, &tconn->flags);
 880        if (conn_request_state(tconn, NS(conn, C_WF_CONNECTION), CS_VERBOSE) < SS_SUCCESS)
 881                return -2;
 882
 883        mutex_init(&sock.mutex);
 884        sock.sbuf = tconn->data.sbuf;
 885        sock.rbuf = tconn->data.rbuf;
 886        sock.socket = NULL;
 887        mutex_init(&msock.mutex);
 888        msock.sbuf = tconn->meta.sbuf;
 889        msock.rbuf = tconn->meta.rbuf;
 890        msock.socket = NULL;
 891
 892        /* Assume that the peer only understands protocol 80 until we know better.  */
 893        tconn->agreed_pro_version = 80;
 894
 895        if (prepare_listen_socket(tconn, &ad))
 896                return 0;
 897
 898        do {
 899                struct socket *s;
 900
 901                s = drbd_try_connect(tconn);
 902                if (s) {
 903                        if (!sock.socket) {
 904                                sock.socket = s;
 905                                send_first_packet(tconn, &sock, P_INITIAL_DATA);
 906                        } else if (!msock.socket) {
 907                                clear_bit(RESOLVE_CONFLICTS, &tconn->flags);
 908                                msock.socket = s;
 909                                send_first_packet(tconn, &msock, P_INITIAL_META);
 910                        } else {
 911                                conn_err(tconn, "Logic error in conn_connect()\n");
 912                                goto out_release_sockets;
 913                        }
 914                }
 915
 916                if (sock.socket && msock.socket) {
 917                        rcu_read_lock();
 918                        nc = rcu_dereference(tconn->net_conf);
 919                        timeout = nc->ping_timeo * HZ / 10;
 920                        rcu_read_unlock();
 921                        schedule_timeout_interruptible(timeout);
 922                        ok = drbd_socket_okay(&sock.socket);
 923                        ok = drbd_socket_okay(&msock.socket) && ok;
 924                        if (ok)
 925                                break;
 926                }
 927
 928retry:
 929                s = drbd_wait_for_connect(tconn, &ad);
 930                if (s) {
 931                        int fp = receive_first_packet(tconn, s);
 932                        drbd_socket_okay(&sock.socket);
 933                        drbd_socket_okay(&msock.socket);
 934                        switch (fp) {
 935                        case P_INITIAL_DATA:
 936                                if (sock.socket) {
 937                                        conn_warn(tconn, "initial packet S crossed\n");
 938                                        sock_release(sock.socket);
 939                                        sock.socket = s;
 940                                        goto randomize;
 941                                }
 942                                sock.socket = s;
 943                                break;
 944                        case P_INITIAL_META:
 945                                set_bit(RESOLVE_CONFLICTS, &tconn->flags);
 946                                if (msock.socket) {
 947                                        conn_warn(tconn, "initial packet M crossed\n");
 948                                        sock_release(msock.socket);
 949                                        msock.socket = s;
 950                                        goto randomize;
 951                                }
 952                                msock.socket = s;
 953                                break;
 954                        default:
 955                                conn_warn(tconn, "Error receiving initial packet\n");
 956                                sock_release(s);
 957randomize:
 958                                if (prandom_u32() & 1)
 959                                        goto retry;
 960                        }
 961                }
 962
 963                if (tconn->cstate <= C_DISCONNECTING)
 964                        goto out_release_sockets;
 965                if (signal_pending(current)) {
 966                        flush_signals(current);
 967                        smp_rmb();
 968                        if (get_t_state(&tconn->receiver) == EXITING)
 969                                goto out_release_sockets;
 970                }
 971
 972                ok = drbd_socket_okay(&sock.socket);
 973                ok = drbd_socket_okay(&msock.socket) && ok;
 974        } while (!ok);
 975
 976        if (ad.s_listen)
 977                sock_release(ad.s_listen);
 978
 979        sock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
 980        msock.socket->sk->sk_reuse = SK_CAN_REUSE; /* SO_REUSEADDR */
 981
 982        sock.socket->sk->sk_allocation = GFP_NOIO;
 983        msock.socket->sk->sk_allocation = GFP_NOIO;
 984
 985        sock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE_BULK;
 986        msock.socket->sk->sk_priority = TC_PRIO_INTERACTIVE;
 987
 988        /* NOT YET ...
 989         * sock.socket->sk->sk_sndtimeo = tconn->net_conf->timeout*HZ/10;
 990         * sock.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
 991         * first set it to the P_CONNECTION_FEATURES timeout,
 992         * which we set to 4x the configured ping_timeout. */
 993        rcu_read_lock();
 994        nc = rcu_dereference(tconn->net_conf);
 995
 996        sock.socket->sk->sk_sndtimeo =
 997        sock.socket->sk->sk_rcvtimeo = nc->ping_timeo*4*HZ/10;
 998
 999        msock.socket->sk->sk_rcvtimeo = nc->ping_int*HZ;
1000        timeout = nc->timeout * HZ / 10;
1001        discard_my_data = nc->discard_my_data;
1002        rcu_read_unlock();
1003
1004        msock.socket->sk->sk_sndtimeo = timeout;
1005
1006        /* we don't want delays.
1007         * we use TCP_CORK where appropriate, though */
1008        drbd_tcp_nodelay(sock.socket);
1009        drbd_tcp_nodelay(msock.socket);
1010
1011        tconn->data.socket = sock.socket;
1012        tconn->meta.socket = msock.socket;
1013        tconn->last_received = jiffies;
1014
1015        h = drbd_do_features(tconn);
1016        if (h <= 0)
1017                return h;
1018
1019        if (tconn->cram_hmac_tfm) {
1020                /* drbd_request_state(mdev, NS(conn, WFAuth)); */
1021                switch (drbd_do_auth(tconn)) {
1022                case -1:
1023                        conn_err(tconn, "Authentication of peer failed\n");
1024                        return -1;
1025                case 0:
1026                        conn_err(tconn, "Authentication of peer failed, trying again.\n");
1027                        return 0;
1028                }
1029        }
1030
1031        tconn->data.socket->sk->sk_sndtimeo = timeout;
1032        tconn->data.socket->sk->sk_rcvtimeo = MAX_SCHEDULE_TIMEOUT;
1033
1034        if (drbd_send_protocol(tconn) == -EOPNOTSUPP)
1035                return -1;
1036
1037        set_bit(STATE_SENT, &tconn->flags);
1038
1039        rcu_read_lock();
1040        idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1041                kref_get(&mdev->kref);
1042                rcu_read_unlock();
1043
1044                /* Prevent a race between resync-handshake and
1045                 * being promoted to Primary.
1046                 *
1047                 * Grab and release the state mutex, so we know that any current
1048                 * drbd_set_role() is finished, and any incoming drbd_set_role
1049                 * will see the STATE_SENT flag, and wait for it to be cleared.
1050                 */
1051                mutex_lock(mdev->state_mutex);
1052                mutex_unlock(mdev->state_mutex);
1053
1054                if (discard_my_data)
1055                        set_bit(DISCARD_MY_DATA, &mdev->flags);
1056                else
1057                        clear_bit(DISCARD_MY_DATA, &mdev->flags);
1058
1059                drbd_connected(mdev);
1060                kref_put(&mdev->kref, &drbd_minor_destroy);
1061                rcu_read_lock();
1062        }
1063        rcu_read_unlock();
1064
1065        rv = conn_request_state(tconn, NS(conn, C_WF_REPORT_PARAMS), CS_VERBOSE);
1066        if (rv < SS_SUCCESS || tconn->cstate != C_WF_REPORT_PARAMS) {
1067                clear_bit(STATE_SENT, &tconn->flags);
1068                return 0;
1069        }
1070
1071        drbd_thread_start(&tconn->asender);
1072
1073        mutex_lock(&tconn->conf_update);
1074        /* The discard_my_data flag is a single-shot modifier to the next
1075         * connection attempt, the handshake of which is now well underway.
1076         * No need for rcu style copying of the whole struct
1077         * just to clear a single value. */
1078        tconn->net_conf->discard_my_data = 0;
1079        mutex_unlock(&tconn->conf_update);
1080
1081        return h;
1082
1083out_release_sockets:
1084        if (ad.s_listen)
1085                sock_release(ad.s_listen);
1086        if (sock.socket)
1087                sock_release(sock.socket);
1088        if (msock.socket)
1089                sock_release(msock.socket);
1090        return -1;
1091}
1092
1093static int decode_header(struct drbd_tconn *tconn, void *header, struct packet_info *pi)
1094{
1095        unsigned int header_size = drbd_header_size(tconn);
1096
1097        if (header_size == sizeof(struct p_header100) &&
1098            *(__be32 *)header == cpu_to_be32(DRBD_MAGIC_100)) {
1099                struct p_header100 *h = header;
1100                if (h->pad != 0) {
1101                        conn_err(tconn, "Header padding is not zero\n");
1102                        return -EINVAL;
1103                }
1104                pi->vnr = be16_to_cpu(h->volume);
1105                pi->cmd = be16_to_cpu(h->command);
1106                pi->size = be32_to_cpu(h->length);
1107        } else if (header_size == sizeof(struct p_header95) &&
1108                   *(__be16 *)header == cpu_to_be16(DRBD_MAGIC_BIG)) {
1109                struct p_header95 *h = header;
1110                pi->cmd = be16_to_cpu(h->command);
1111                pi->size = be32_to_cpu(h->length);
1112                pi->vnr = 0;
1113        } else if (header_size == sizeof(struct p_header80) &&
1114                   *(__be32 *)header == cpu_to_be32(DRBD_MAGIC)) {
1115                struct p_header80 *h = header;
1116                pi->cmd = be16_to_cpu(h->command);
1117                pi->size = be16_to_cpu(h->length);
1118                pi->vnr = 0;
1119        } else {
1120                conn_err(tconn, "Wrong magic value 0x%08x in protocol version %d\n",
1121                         be32_to_cpu(*(__be32 *)header),
1122                         tconn->agreed_pro_version);
1123                return -EINVAL;
1124        }
1125        pi->data = header + header_size;
1126        return 0;
1127}
1128
1129static int drbd_recv_header(struct drbd_tconn *tconn, struct packet_info *pi)
1130{
1131        void *buffer = tconn->data.rbuf;
1132        int err;
1133
1134        err = drbd_recv_all_warn(tconn, buffer, drbd_header_size(tconn));
1135        if (err)
1136                return err;
1137
1138        err = decode_header(tconn, buffer, pi);
1139        tconn->last_received = jiffies;
1140
1141        return err;
1142}
1143
1144static void drbd_flush(struct drbd_tconn *tconn)
1145{
1146        int rv;
1147        struct drbd_conf *mdev;
1148        int vnr;
1149
1150        if (tconn->write_ordering >= WO_bdev_flush) {
1151                rcu_read_lock();
1152                idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1153                        if (!get_ldev(mdev))
1154                                continue;
1155                        kref_get(&mdev->kref);
1156                        rcu_read_unlock();
1157
1158                        rv = blkdev_issue_flush(mdev->ldev->backing_bdev,
1159                                        GFP_NOIO, NULL);
1160                        if (rv) {
1161                                dev_info(DEV, "local disk flush failed with status %d\n", rv);
1162                                /* would rather check on EOPNOTSUPP, but that is not reliable.
1163                                 * don't try again for ANY return value != 0
1164                                 * if (rv == -EOPNOTSUPP) */
1165                                drbd_bump_write_ordering(tconn, WO_drain_io);
1166                        }
1167                        put_ldev(mdev);
1168                        kref_put(&mdev->kref, &drbd_minor_destroy);
1169
1170                        rcu_read_lock();
1171                        if (rv)
1172                                break;
1173                }
1174                rcu_read_unlock();
1175        }
1176}
1177
1178/**
1179 * drbd_may_finish_epoch() - Applies an epoch_event to the epoch's state, eventually finishes it.
1180 * @mdev:       DRBD device.
1181 * @epoch:      Epoch object.
1182 * @ev:         Epoch event.
1183 */
1184static enum finish_epoch drbd_may_finish_epoch(struct drbd_tconn *tconn,
1185                                               struct drbd_epoch *epoch,
1186                                               enum epoch_event ev)
1187{
1188        int epoch_size;
1189        struct drbd_epoch *next_epoch;
1190        enum finish_epoch rv = FE_STILL_LIVE;
1191
1192        spin_lock(&tconn->epoch_lock);
1193        do {
1194                next_epoch = NULL;
1195
1196                epoch_size = atomic_read(&epoch->epoch_size);
1197
1198                switch (ev & ~EV_CLEANUP) {
1199                case EV_PUT:
1200                        atomic_dec(&epoch->active);
1201                        break;
1202                case EV_GOT_BARRIER_NR:
1203                        set_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags);
1204                        break;
1205                case EV_BECAME_LAST:
1206                        /* nothing to do*/
1207                        break;
1208                }
1209
1210                if (epoch_size != 0 &&
1211                    atomic_read(&epoch->active) == 0 &&
1212                    (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags) || ev & EV_CLEANUP)) {
1213                        if (!(ev & EV_CLEANUP)) {
1214                                spin_unlock(&tconn->epoch_lock);
1215                                drbd_send_b_ack(epoch->tconn, epoch->barrier_nr, epoch_size);
1216                                spin_lock(&tconn->epoch_lock);
1217                        }
1218#if 0
1219                        /* FIXME: dec unacked on connection, once we have
1220                         * something to count pending connection packets in. */
1221                        if (test_bit(DE_HAVE_BARRIER_NUMBER, &epoch->flags))
1222                                dec_unacked(epoch->tconn);
1223#endif
1224
1225                        if (tconn->current_epoch != epoch) {
1226                                next_epoch = list_entry(epoch->list.next, struct drbd_epoch, list);
1227                                list_del(&epoch->list);
1228                                ev = EV_BECAME_LAST | (ev & EV_CLEANUP);
1229                                tconn->epochs--;
1230                                kfree(epoch);
1231
1232                                if (rv == FE_STILL_LIVE)
1233                                        rv = FE_DESTROYED;
1234                        } else {
1235                                epoch->flags = 0;
1236                                atomic_set(&epoch->epoch_size, 0);
1237                                /* atomic_set(&epoch->active, 0); is already zero */
1238                                if (rv == FE_STILL_LIVE)
1239                                        rv = FE_RECYCLED;
1240                        }
1241                }
1242
1243                if (!next_epoch)
1244                        break;
1245
1246                epoch = next_epoch;
1247        } while (1);
1248
1249        spin_unlock(&tconn->epoch_lock);
1250
1251        return rv;
1252}
1253
1254/**
1255 * drbd_bump_write_ordering() - Fall back to an other write ordering method
1256 * @tconn:      DRBD connection.
1257 * @wo:         Write ordering method to try.
1258 */
1259void drbd_bump_write_ordering(struct drbd_tconn *tconn, enum write_ordering_e wo)
1260{
1261        struct disk_conf *dc;
1262        struct drbd_conf *mdev;
1263        enum write_ordering_e pwo;
1264        int vnr;
1265        static char *write_ordering_str[] = {
1266                [WO_none] = "none",
1267                [WO_drain_io] = "drain",
1268                [WO_bdev_flush] = "flush",
1269        };
1270
1271        pwo = tconn->write_ordering;
1272        wo = min(pwo, wo);
1273        rcu_read_lock();
1274        idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1275                if (!get_ldev_if_state(mdev, D_ATTACHING))
1276                        continue;
1277                dc = rcu_dereference(mdev->ldev->disk_conf);
1278
1279                if (wo == WO_bdev_flush && !dc->disk_flushes)
1280                        wo = WO_drain_io;
1281                if (wo == WO_drain_io && !dc->disk_drain)
1282                        wo = WO_none;
1283                put_ldev(mdev);
1284        }
1285        rcu_read_unlock();
1286        tconn->write_ordering = wo;
1287        if (pwo != tconn->write_ordering || wo == WO_bdev_flush)
1288                conn_info(tconn, "Method to ensure write ordering: %s\n", write_ordering_str[tconn->write_ordering]);
1289}
1290
1291/**
1292 * drbd_submit_peer_request()
1293 * @mdev:       DRBD device.
1294 * @peer_req:   peer request
1295 * @rw:         flag field, see bio->bi_rw
1296 *
1297 * May spread the pages to multiple bios,
1298 * depending on bio_add_page restrictions.
1299 *
1300 * Returns 0 if all bios have been submitted,
1301 * -ENOMEM if we could not allocate enough bios,
1302 * -ENOSPC (any better suggestion?) if we have not been able to bio_add_page a
1303 *  single page to an empty bio (which should never happen and likely indicates
1304 *  that the lower level IO stack is in some way broken). This has been observed
1305 *  on certain Xen deployments.
1306 */
1307/* TODO allocate from our own bio_set. */
1308int drbd_submit_peer_request(struct drbd_conf *mdev,
1309                             struct drbd_peer_request *peer_req,
1310                             const unsigned rw, const int fault_type)
1311{
1312        struct bio *bios = NULL;
1313        struct bio *bio;
1314        struct page *page = peer_req->pages;
1315        sector_t sector = peer_req->i.sector;
1316        unsigned ds = peer_req->i.size;
1317        unsigned n_bios = 0;
1318        unsigned nr_pages = (ds + PAGE_SIZE -1) >> PAGE_SHIFT;
1319        int err = -ENOMEM;
1320
1321        /* In most cases, we will only need one bio.  But in case the lower
1322         * level restrictions happen to be different at this offset on this
1323         * side than those of the sending peer, we may need to submit the
1324         * request in more than one bio.
1325         *
1326         * Plain bio_alloc is good enough here, this is no DRBD internally
1327         * generated bio, but a bio allocated on behalf of the peer.
1328         */
1329next_bio:
1330        bio = bio_alloc(GFP_NOIO, nr_pages);
1331        if (!bio) {
1332                dev_err(DEV, "submit_ee: Allocation of a bio failed\n");
1333                goto fail;
1334        }
1335        /* > peer_req->i.sector, unless this is the first bio */
1336        bio->bi_sector = sector;
1337        bio->bi_bdev = mdev->ldev->backing_bdev;
1338        bio->bi_rw = rw;
1339        bio->bi_private = peer_req;
1340        bio->bi_end_io = drbd_peer_request_endio;
1341
1342        bio->bi_next = bios;
1343        bios = bio;
1344        ++n_bios;
1345
1346        page_chain_for_each(page) {
1347                unsigned len = min_t(unsigned, ds, PAGE_SIZE);
1348                if (!bio_add_page(bio, page, len, 0)) {
1349                        /* A single page must always be possible!
1350                         * But in case it fails anyways,
1351                         * we deal with it, and complain (below). */
1352                        if (bio->bi_vcnt == 0) {
1353                                dev_err(DEV,
1354                                        "bio_add_page failed for len=%u, "
1355                                        "bi_vcnt=0 (bi_sector=%llu)\n",
1356                                        len, (unsigned long long)bio->bi_sector);
1357                                err = -ENOSPC;
1358                                goto fail;
1359                        }
1360                        goto next_bio;
1361                }
1362                ds -= len;
1363                sector += len >> 9;
1364                --nr_pages;
1365        }
1366        D_ASSERT(page == NULL);
1367        D_ASSERT(ds == 0);
1368
1369        atomic_set(&peer_req->pending_bios, n_bios);
1370        do {
1371                bio = bios;
1372                bios = bios->bi_next;
1373                bio->bi_next = NULL;
1374
1375                drbd_generic_make_request(mdev, fault_type, bio);
1376        } while (bios);
1377        return 0;
1378
1379fail:
1380        while (bios) {
1381                bio = bios;
1382                bios = bios->bi_next;
1383                bio_put(bio);
1384        }
1385        return err;
1386}
1387
1388static void drbd_remove_epoch_entry_interval(struct drbd_conf *mdev,
1389                                             struct drbd_peer_request *peer_req)
1390{
1391        struct drbd_interval *i = &peer_req->i;
1392
1393        drbd_remove_interval(&mdev->write_requests, i);
1394        drbd_clear_interval(i);
1395
1396        /* Wake up any processes waiting for this peer request to complete.  */
1397        if (i->waiting)
1398                wake_up(&mdev->misc_wait);
1399}
1400
1401void conn_wait_active_ee_empty(struct drbd_tconn *tconn)
1402{
1403        struct drbd_conf *mdev;
1404        int vnr;
1405
1406        rcu_read_lock();
1407        idr_for_each_entry(&tconn->volumes, mdev, vnr) {
1408                kref_get(&mdev->kref);
1409                rcu_read_unlock();
1410                drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
1411                kref_put(&mdev->kref, &drbd_minor_destroy);
1412                rcu_read_lock();
1413        }
1414        rcu_read_unlock();
1415}
1416
1417static int receive_Barrier(struct drbd_tconn *tconn, struct packet_info *pi)
1418{
1419        int rv;
1420        struct p_barrier *p = pi->data;
1421        struct drbd_epoch *epoch;
1422
1423        /* FIXME these are unacked on connection,
1424         * not a specific (peer)device.
1425         */
1426        tconn->current_epoch->barrier_nr = p->barrier;
1427        tconn->current_epoch->tconn = tconn;
1428        rv = drbd_may_finish_epoch(tconn, tconn->current_epoch, EV_GOT_BARRIER_NR);
1429
1430        /* P_BARRIER_ACK may imply that the corresponding extent is dropped from
1431         * the activity log, which means it would not be resynced in case the
1432         * R_PRIMARY crashes now.
1433         * Therefore we must send the barrier_ack after the barrier request was
1434         * completed. */
1435        switch (tconn->write_ordering) {
1436        case WO_none:
1437                if (rv == FE_RECYCLED)
1438                        return 0;
1439
1440                /* receiver context, in the writeout path of the other node.
1441                 * avoid potential distributed deadlock */
1442                epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1443                if (epoch)
1444                        break;
1445                else
1446                        conn_warn(tconn, "Allocation of an epoch failed, slowing down\n");
1447                        /* Fall through */
1448
1449        case WO_bdev_flush:
1450        case WO_drain_io:
1451                conn_wait_active_ee_empty(tconn);
1452                drbd_flush(tconn);
1453
1454                if (atomic_read(&tconn->current_epoch->epoch_size)) {
1455                        epoch = kmalloc(sizeof(struct drbd_epoch), GFP_NOIO);
1456                        if (epoch)
1457                                break;
1458                }
1459
1460                return 0;
1461        default:
1462                conn_err(tconn, "Strangeness in tconn->write_ordering %d\n", tconn->write_ordering);
1463                return -EIO;
1464        }
1465
1466        epoch->flags = 0;
1467        atomic_set(&epoch->epoch_size, 0);
1468        atomic_set(&epoch->active, 0);
1469
1470        spin_lock(&tconn->epoch_lock);
1471        if (atomic_read(&tconn->current_epoch->epoch_size)) {
1472                list_add(&epoch->list, &tconn->current_epoch->list);
1473                tconn->current_epoch = epoch;
1474                tconn->epochs++;
1475        } else {
1476                /* The current_epoch got recycled while we allocated this one... */
1477                kfree(epoch);
1478        }
1479        spin_unlock(&tconn->epoch_lock);
1480
1481        return 0;
1482}
1483
1484/* used from receive_RSDataReply (recv_resync_read)
1485 * and from receive_Data */
1486static struct drbd_peer_request *
1487read_in_block(struct drbd_conf *mdev, u64 id, sector_t sector,
1488              int data_size) __must_hold(local)
1489{
1490        const sector_t capacity = drbd_get_capacity(mdev->this_bdev);
1491        struct drbd_peer_request *peer_req;
1492        struct page *page;
1493        int dgs, ds, err;
1494        void *dig_in = mdev->tconn->int_dig_in;
1495        void *dig_vv = mdev->tconn->int_dig_vv;
1496        unsigned long *data;
1497
1498        dgs = 0;
1499        if (mdev->tconn->peer_integrity_tfm) {
1500                dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1501                /*
1502                 * FIXME: Receive the incoming digest into the receive buffer
1503                 *        here, together with its struct p_data?
1504                 */
1505                err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1506                if (err)
1507                        return NULL;
1508                data_size -= dgs;
1509        }
1510
1511        if (!expect(IS_ALIGNED(data_size, 512)))
1512                return NULL;
1513        if (!expect(data_size <= DRBD_MAX_BIO_SIZE))
1514                return NULL;
1515
1516        /* even though we trust out peer,
1517         * we sometimes have to double check. */
1518        if (sector + (data_size>>9) > capacity) {
1519                dev_err(DEV, "request from peer beyond end of local disk: "
1520                        "capacity: %llus < sector: %llus + size: %u\n",
1521                        (unsigned long long)capacity,
1522                        (unsigned long long)sector, data_size);
1523                return NULL;
1524        }
1525
1526        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
1527         * "criss-cross" setup, that might cause write-out on some other DRBD,
1528         * which in turn might block on the other node at this very place.  */
1529        peer_req = drbd_alloc_peer_req(mdev, id, sector, data_size, GFP_NOIO);
1530        if (!peer_req)
1531                return NULL;
1532
1533        if (!data_size)
1534                return peer_req;
1535
1536        ds = data_size;
1537        page = peer_req->pages;
1538        page_chain_for_each(page) {
1539                unsigned len = min_t(int, ds, PAGE_SIZE);
1540                data = kmap(page);
1541                err = drbd_recv_all_warn(mdev->tconn, data, len);
1542                if (drbd_insert_fault(mdev, DRBD_FAULT_RECEIVE)) {
1543                        dev_err(DEV, "Fault injection: Corrupting data on receive\n");
1544                        data[0] = data[0] ^ (unsigned long)-1;
1545                }
1546                kunmap(page);
1547                if (err) {
1548                        drbd_free_peer_req(mdev, peer_req);
1549                        return NULL;
1550                }
1551                ds -= len;
1552        }
1553
1554        if (dgs) {
1555                drbd_csum_ee(mdev, mdev->tconn->peer_integrity_tfm, peer_req, dig_vv);
1556                if (memcmp(dig_in, dig_vv, dgs)) {
1557                        dev_err(DEV, "Digest integrity check FAILED: %llus +%u\n",
1558                                (unsigned long long)sector, data_size);
1559                        drbd_free_peer_req(mdev, peer_req);
1560                        return NULL;
1561                }
1562        }
1563        mdev->recv_cnt += data_size>>9;
1564        return peer_req;
1565}
1566
1567/* drbd_drain_block() just takes a data block
1568 * out of the socket input buffer, and discards it.
1569 */
1570static int drbd_drain_block(struct drbd_conf *mdev, int data_size)
1571{
1572        struct page *page;
1573        int err = 0;
1574        void *data;
1575
1576        if (!data_size)
1577                return 0;
1578
1579        page = drbd_alloc_pages(mdev, 1, 1);
1580
1581        data = kmap(page);
1582        while (data_size) {
1583                unsigned int len = min_t(int, data_size, PAGE_SIZE);
1584
1585                err = drbd_recv_all_warn(mdev->tconn, data, len);
1586                if (err)
1587                        break;
1588                data_size -= len;
1589        }
1590        kunmap(page);
1591        drbd_free_pages(mdev, page, 0);
1592        return err;
1593}
1594
1595static int recv_dless_read(struct drbd_conf *mdev, struct drbd_request *req,
1596                           sector_t sector, int data_size)
1597{
1598        struct bio_vec *bvec;
1599        struct bio *bio;
1600        int dgs, err, i, expect;
1601        void *dig_in = mdev->tconn->int_dig_in;
1602        void *dig_vv = mdev->tconn->int_dig_vv;
1603
1604        dgs = 0;
1605        if (mdev->tconn->peer_integrity_tfm) {
1606                dgs = crypto_hash_digestsize(mdev->tconn->peer_integrity_tfm);
1607                err = drbd_recv_all_warn(mdev->tconn, dig_in, dgs);
1608                if (err)
1609                        return err;
1610                data_size -= dgs;
1611        }
1612
1613        /* optimistically update recv_cnt.  if receiving fails below,
1614         * we disconnect anyways, and counters will be reset. */
1615        mdev->recv_cnt += data_size>>9;
1616
1617        bio = req->master_bio;
1618        D_ASSERT(sector == bio->bi_sector);
1619
1620        bio_for_each_segment(bvec, bio, i) {
1621                void *mapped = kmap(bvec->bv_page) + bvec->bv_offset;
1622                expect = min_t(int, data_size, bvec->bv_len);
1623                err = drbd_recv_all_warn(mdev->tconn, mapped, expect);
1624                kunmap(bvec->bv_page);
1625                if (err)
1626                        return err;
1627                data_size -= expect;
1628        }
1629
1630        if (dgs) {
1631                drbd_csum_bio(mdev, mdev->tconn->peer_integrity_tfm, bio, dig_vv);
1632                if (memcmp(dig_in, dig_vv, dgs)) {
1633                        dev_err(DEV, "Digest integrity check FAILED. Broken NICs?\n");
1634                        return -EINVAL;
1635                }
1636        }
1637
1638        D_ASSERT(data_size == 0);
1639        return 0;
1640}
1641
1642/*
1643 * e_end_resync_block() is called in asender context via
1644 * drbd_finish_peer_reqs().
1645 */
1646static int e_end_resync_block(struct drbd_work *w, int unused)
1647{
1648        struct drbd_peer_request *peer_req =
1649                container_of(w, struct drbd_peer_request, w);
1650        struct drbd_conf *mdev = w->mdev;
1651        sector_t sector = peer_req->i.sector;
1652        int err;
1653
1654        D_ASSERT(drbd_interval_empty(&peer_req->i));
1655
1656        if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1657                drbd_set_in_sync(mdev, sector, peer_req->i.size);
1658                err = drbd_send_ack(mdev, P_RS_WRITE_ACK, peer_req);
1659        } else {
1660                /* Record failure to sync */
1661                drbd_rs_failed_io(mdev, sector, peer_req->i.size);
1662
1663                err  = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1664        }
1665        dec_unacked(mdev);
1666
1667        return err;
1668}
1669
1670static int recv_resync_read(struct drbd_conf *mdev, sector_t sector, int data_size) __releases(local)
1671{
1672        struct drbd_peer_request *peer_req;
1673
1674        peer_req = read_in_block(mdev, ID_SYNCER, sector, data_size);
1675        if (!peer_req)
1676                goto fail;
1677
1678        dec_rs_pending(mdev);
1679
1680        inc_unacked(mdev);
1681        /* corresponding dec_unacked() in e_end_resync_block()
1682         * respective _drbd_clear_done_ee */
1683
1684        peer_req->w.cb = e_end_resync_block;
1685
1686        spin_lock_irq(&mdev->tconn->req_lock);
1687        list_add(&peer_req->w.list, &mdev->sync_ee);
1688        spin_unlock_irq(&mdev->tconn->req_lock);
1689
1690        atomic_add(data_size >> 9, &mdev->rs_sect_ev);
1691        if (drbd_submit_peer_request(mdev, peer_req, WRITE, DRBD_FAULT_RS_WR) == 0)
1692                return 0;
1693
1694        /* don't care for the reason here */
1695        dev_err(DEV, "submit failed, triggering re-connect\n");
1696        spin_lock_irq(&mdev->tconn->req_lock);
1697        list_del(&peer_req->w.list);
1698        spin_unlock_irq(&mdev->tconn->req_lock);
1699
1700        drbd_free_peer_req(mdev, peer_req);
1701fail:
1702        put_ldev(mdev);
1703        return -EIO;
1704}
1705
1706static struct drbd_request *
1707find_request(struct drbd_conf *mdev, struct rb_root *root, u64 id,
1708             sector_t sector, bool missing_ok, const char *func)
1709{
1710        struct drbd_request *req;
1711
1712        /* Request object according to our peer */
1713        req = (struct drbd_request *)(unsigned long)id;
1714        if (drbd_contains_interval(root, sector, &req->i) && req->i.local)
1715                return req;
1716        if (!missing_ok) {
1717                dev_err(DEV, "%s: failed to find request 0x%lx, sector %llus\n", func,
1718                        (unsigned long)id, (unsigned long long)sector);
1719        }
1720        return NULL;
1721}
1722
1723static int receive_DataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1724{
1725        struct drbd_conf *mdev;
1726        struct drbd_request *req;
1727        sector_t sector;
1728        int err;
1729        struct p_data *p = pi->data;
1730
1731        mdev = vnr_to_mdev(tconn, pi->vnr);
1732        if (!mdev)
1733                return -EIO;
1734
1735        sector = be64_to_cpu(p->sector);
1736
1737        spin_lock_irq(&mdev->tconn->req_lock);
1738        req = find_request(mdev, &mdev->read_requests, p->block_id, sector, false, __func__);
1739        spin_unlock_irq(&mdev->tconn->req_lock);
1740        if (unlikely(!req))
1741                return -EIO;
1742
1743        /* hlist_del(&req->collision) is done in _req_may_be_done, to avoid
1744         * special casing it there for the various failure cases.
1745         * still no race with drbd_fail_pending_reads */
1746        err = recv_dless_read(mdev, req, sector, pi->size);
1747        if (!err)
1748                req_mod(req, DATA_RECEIVED);
1749        /* else: nothing. handled from drbd_disconnect...
1750         * I don't think we may complete this just yet
1751         * in case we are "on-disconnect: freeze" */
1752
1753        return err;
1754}
1755
1756static int receive_RSDataReply(struct drbd_tconn *tconn, struct packet_info *pi)
1757{
1758        struct drbd_conf *mdev;
1759        sector_t sector;
1760        int err;
1761        struct p_data *p = pi->data;
1762
1763        mdev = vnr_to_mdev(tconn, pi->vnr);
1764        if (!mdev)
1765                return -EIO;
1766
1767        sector = be64_to_cpu(p->sector);
1768        D_ASSERT(p->block_id == ID_SYNCER);
1769
1770        if (get_ldev(mdev)) {
1771                /* data is submitted to disk within recv_resync_read.
1772                 * corresponding put_ldev done below on error,
1773                 * or in drbd_peer_request_endio. */
1774                err = recv_resync_read(mdev, sector, pi->size);
1775        } else {
1776                if (__ratelimit(&drbd_ratelimit_state))
1777                        dev_err(DEV, "Can not write resync data to local disk.\n");
1778
1779                err = drbd_drain_block(mdev, pi->size);
1780
1781                drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
1782        }
1783
1784        atomic_add(pi->size >> 9, &mdev->rs_sect_in);
1785
1786        return err;
1787}
1788
1789static void restart_conflicting_writes(struct drbd_conf *mdev,
1790                                       sector_t sector, int size)
1791{
1792        struct drbd_interval *i;
1793        struct drbd_request *req;
1794
1795        drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
1796                if (!i->local)
1797                        continue;
1798                req = container_of(i, struct drbd_request, i);
1799                if (req->rq_state & RQ_LOCAL_PENDING ||
1800                    !(req->rq_state & RQ_POSTPONED))
1801                        continue;
1802                /* as it is RQ_POSTPONED, this will cause it to
1803                 * be queued on the retry workqueue. */
1804                __req_mod(req, CONFLICT_RESOLVED, NULL);
1805        }
1806}
1807
1808/*
1809 * e_end_block() is called in asender context via drbd_finish_peer_reqs().
1810 */
1811static int e_end_block(struct drbd_work *w, int cancel)
1812{
1813        struct drbd_peer_request *peer_req =
1814                container_of(w, struct drbd_peer_request, w);
1815        struct drbd_conf *mdev = w->mdev;
1816        sector_t sector = peer_req->i.sector;
1817        int err = 0, pcmd;
1818
1819        if (peer_req->flags & EE_SEND_WRITE_ACK) {
1820                if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) {
1821                        pcmd = (mdev->state.conn >= C_SYNC_SOURCE &&
1822                                mdev->state.conn <= C_PAUSED_SYNC_T &&
1823                                peer_req->flags & EE_MAY_SET_IN_SYNC) ?
1824                                P_RS_WRITE_ACK : P_WRITE_ACK;
1825                        err = drbd_send_ack(mdev, pcmd, peer_req);
1826                        if (pcmd == P_RS_WRITE_ACK)
1827                                drbd_set_in_sync(mdev, sector, peer_req->i.size);
1828                } else {
1829                        err = drbd_send_ack(mdev, P_NEG_ACK, peer_req);
1830                        /* we expect it to be marked out of sync anyways...
1831                         * maybe assert this?  */
1832                }
1833                dec_unacked(mdev);
1834        }
1835        /* we delete from the conflict detection hash _after_ we sent out the
1836         * P_WRITE_ACK / P_NEG_ACK, to get the sequence number right.  */
1837        if (peer_req->flags & EE_IN_INTERVAL_TREE) {
1838                spin_lock_irq(&mdev->tconn->req_lock);
1839                D_ASSERT(!drbd_interval_empty(&peer_req->i));
1840                drbd_remove_epoch_entry_interval(mdev, peer_req);
1841                if (peer_req->flags & EE_RESTART_REQUESTS)
1842                        restart_conflicting_writes(mdev, sector, peer_req->i.size);
1843                spin_unlock_irq(&mdev->tconn->req_lock);
1844        } else
1845                D_ASSERT(drbd_interval_empty(&peer_req->i));
1846
1847        drbd_may_finish_epoch(mdev->tconn, peer_req->epoch, EV_PUT + (cancel ? EV_CLEANUP : 0));
1848
1849        return err;
1850}
1851
1852static int e_send_ack(struct drbd_work *w, enum drbd_packet ack)
1853{
1854        struct drbd_conf *mdev = w->mdev;
1855        struct drbd_peer_request *peer_req =
1856                container_of(w, struct drbd_peer_request, w);
1857        int err;
1858
1859        err = drbd_send_ack(mdev, ack, peer_req);
1860        dec_unacked(mdev);
1861
1862        return err;
1863}
1864
1865static int e_send_superseded(struct drbd_work *w, int unused)
1866{
1867        return e_send_ack(w, P_SUPERSEDED);
1868}
1869
1870static int e_send_retry_write(struct drbd_work *w, int unused)
1871{
1872        struct drbd_tconn *tconn = w->mdev->tconn;
1873
1874        return e_send_ack(w, tconn->agreed_pro_version >= 100 ?
1875                             P_RETRY_WRITE : P_SUPERSEDED);
1876}
1877
1878static bool seq_greater(u32 a, u32 b)
1879{
1880        /*
1881         * We assume 32-bit wrap-around here.
1882         * For 24-bit wrap-around, we would have to shift:
1883         *  a <<= 8; b <<= 8;
1884         */
1885        return (s32)a - (s32)b > 0;
1886}
1887
1888static u32 seq_max(u32 a, u32 b)
1889{
1890        return seq_greater(a, b) ? a : b;
1891}
1892
1893static bool need_peer_seq(struct drbd_conf *mdev)
1894{
1895        struct drbd_tconn *tconn = mdev->tconn;
1896        int tp;
1897
1898        /*
1899         * We only need to keep track of the last packet_seq number of our peer
1900         * if we are in dual-primary mode and we have the resolve-conflicts flag set; see
1901         * handle_write_conflicts().
1902         */
1903
1904        rcu_read_lock();
1905        tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
1906        rcu_read_unlock();
1907
1908        return tp && test_bit(RESOLVE_CONFLICTS, &tconn->flags);
1909}
1910
1911static void update_peer_seq(struct drbd_conf *mdev, unsigned int peer_seq)
1912{
1913        unsigned int newest_peer_seq;
1914
1915        if (need_peer_seq(mdev)) {
1916                spin_lock(&mdev->peer_seq_lock);
1917                newest_peer_seq = seq_max(mdev->peer_seq, peer_seq);
1918                mdev->peer_seq = newest_peer_seq;
1919                spin_unlock(&mdev->peer_seq_lock);
1920                /* wake up only if we actually changed mdev->peer_seq */
1921                if (peer_seq == newest_peer_seq)
1922                        wake_up(&mdev->seq_wait);
1923        }
1924}
1925
1926static inline int overlaps(sector_t s1, int l1, sector_t s2, int l2)
1927{
1928        return !((s1 + (l1>>9) <= s2) || (s1 >= s2 + (l2>>9)));
1929}
1930
1931/* maybe change sync_ee into interval trees as well? */
1932static bool overlapping_resync_write(struct drbd_conf *mdev, struct drbd_peer_request *peer_req)
1933{
1934        struct drbd_peer_request *rs_req;
1935        bool rv = 0;
1936
1937        spin_lock_irq(&mdev->tconn->req_lock);
1938        list_for_each_entry(rs_req, &mdev->sync_ee, w.list) {
1939                if (overlaps(peer_req->i.sector, peer_req->i.size,
1940                             rs_req->i.sector, rs_req->i.size)) {
1941                        rv = 1;
1942                        break;
1943                }
1944        }
1945        spin_unlock_irq(&mdev->tconn->req_lock);
1946
1947        return rv;
1948}
1949
1950/* Called from receive_Data.
1951 * Synchronize packets on sock with packets on msock.
1952 *
1953 * This is here so even when a P_DATA packet traveling via sock overtook an Ack
1954 * packet traveling on msock, they are still processed in the order they have
1955 * been sent.
1956 *
1957 * Note: we don't care for Ack packets overtaking P_DATA packets.
1958 *
1959 * In case packet_seq is larger than mdev->peer_seq number, there are
1960 * outstanding packets on the msock. We wait for them to arrive.
1961 * In case we are the logically next packet, we update mdev->peer_seq
1962 * ourselves. Correctly handles 32bit wrap around.
1963 *
1964 * Assume we have a 10 GBit connection, that is about 1<<30 byte per second,
1965 * about 1<<21 sectors per second. So "worst" case, we have 1<<3 == 8 seconds
1966 * for the 24bit wrap (historical atomic_t guarantee on some archs), and we have
1967 * 1<<9 == 512 seconds aka ages for the 32bit wrap around...
1968 *
1969 * returns 0 if we may process the packet,
1970 * -ERESTARTSYS if we were interrupted (by disconnect signal). */
1971static int wait_for_and_update_peer_seq(struct drbd_conf *mdev, const u32 peer_seq)
1972{
1973        DEFINE_WAIT(wait);
1974        long timeout;
1975        int ret;
1976
1977        if (!need_peer_seq(mdev))
1978                return 0;
1979
1980        spin_lock(&mdev->peer_seq_lock);
1981        for (;;) {
1982                if (!seq_greater(peer_seq - 1, mdev->peer_seq)) {
1983                        mdev->peer_seq = seq_max(mdev->peer_seq, peer_seq);
1984                        ret = 0;
1985                        break;
1986                }
1987                if (signal_pending(current)) {
1988                        ret = -ERESTARTSYS;
1989                        break;
1990                }
1991                prepare_to_wait(&mdev->seq_wait, &wait, TASK_INTERRUPTIBLE);
1992                spin_unlock(&mdev->peer_seq_lock);
1993                rcu_read_lock();
1994                timeout = rcu_dereference(mdev->tconn->net_conf)->ping_timeo*HZ/10;
1995                rcu_read_unlock();
1996                timeout = schedule_timeout(timeout);
1997                spin_lock(&mdev->peer_seq_lock);
1998                if (!timeout) {
1999                        ret = -ETIMEDOUT;
2000                        dev_err(DEV, "Timed out waiting for missing ack packets; disconnecting\n");
2001                        break;
2002                }
2003        }
2004        spin_unlock(&mdev->peer_seq_lock);
2005        finish_wait(&mdev->seq_wait, &wait);
2006        return ret;
2007}
2008
2009/* see also bio_flags_to_wire()
2010 * DRBD_REQ_*, because we need to semantically map the flags to data packet
2011 * flags and back. We may replicate to other kernel versions. */
2012static unsigned long wire_flags_to_bio(struct drbd_conf *mdev, u32 dpf)
2013{
2014        return  (dpf & DP_RW_SYNC ? REQ_SYNC : 0) |
2015                (dpf & DP_FUA ? REQ_FUA : 0) |
2016                (dpf & DP_FLUSH ? REQ_FLUSH : 0) |
2017                (dpf & DP_DISCARD ? REQ_DISCARD : 0);
2018}
2019
2020static void fail_postponed_requests(struct drbd_conf *mdev, sector_t sector,
2021                                    unsigned int size)
2022{
2023        struct drbd_interval *i;
2024
2025    repeat:
2026        drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2027                struct drbd_request *req;
2028                struct bio_and_error m;
2029
2030                if (!i->local)
2031                        continue;
2032                req = container_of(i, struct drbd_request, i);
2033                if (!(req->rq_state & RQ_POSTPONED))
2034                        continue;
2035                req->rq_state &= ~RQ_POSTPONED;
2036                __req_mod(req, NEG_ACKED, &m);
2037                spin_unlock_irq(&mdev->tconn->req_lock);
2038                if (m.bio)
2039                        complete_master_bio(mdev, &m);
2040                spin_lock_irq(&mdev->tconn->req_lock);
2041                goto repeat;
2042        }
2043}
2044
2045static int handle_write_conflicts(struct drbd_conf *mdev,
2046                                  struct drbd_peer_request *peer_req)
2047{
2048        struct drbd_tconn *tconn = mdev->tconn;
2049        bool resolve_conflicts = test_bit(RESOLVE_CONFLICTS, &tconn->flags);
2050        sector_t sector = peer_req->i.sector;
2051        const unsigned int size = peer_req->i.size;
2052        struct drbd_interval *i;
2053        bool equal;
2054        int err;
2055
2056        /*
2057         * Inserting the peer request into the write_requests tree will prevent
2058         * new conflicting local requests from being added.
2059         */
2060        drbd_insert_interval(&mdev->write_requests, &peer_req->i);
2061
2062    repeat:
2063        drbd_for_each_overlap(i, &mdev->write_requests, sector, size) {
2064                if (i == &peer_req->i)
2065                        continue;
2066
2067                if (!i->local) {
2068                        /*
2069                         * Our peer has sent a conflicting remote request; this
2070                         * should not happen in a two-node setup.  Wait for the
2071                         * earlier peer request to complete.
2072                         */
2073                        err = drbd_wait_misc(mdev, i);
2074                        if (err)
2075                                goto out;
2076                        goto repeat;
2077                }
2078
2079                equal = i->sector == sector && i->size == size;
2080                if (resolve_conflicts) {
2081                        /*
2082                         * If the peer request is fully contained within the
2083                         * overlapping request, it can be considered overwritten
2084                         * and thus superseded; otherwise, it will be retried
2085                         * once all overlapping requests have completed.
2086                         */
2087                        bool superseded = i->sector <= sector && i->sector +
2088                                       (i->size >> 9) >= sector + (size >> 9);
2089
2090                        if (!equal)
2091                                dev_alert(DEV, "Concurrent writes detected: "
2092                                               "local=%llus +%u, remote=%llus +%u, "
2093                                               "assuming %s came first\n",
2094                                          (unsigned long long)i->sector, i->size,
2095                                          (unsigned long long)sector, size,
2096                                          superseded ? "local" : "remote");
2097
2098                        inc_unacked(mdev);
2099                        peer_req->w.cb = superseded ? e_send_superseded :
2100                                                   e_send_retry_write;
2101                        list_add_tail(&peer_req->w.list, &mdev->done_ee);
2102                        wake_asender(mdev->tconn);
2103
2104                        err = -ENOENT;
2105                        goto out;
2106                } else {
2107                        struct drbd_request *req =
2108                                container_of(i, struct drbd_request, i);
2109
2110                        if (!equal)
2111                                dev_alert(DEV, "Concurrent writes detected: "
2112                                               "local=%llus +%u, remote=%llus +%u\n",
2113                                          (unsigned long long)i->sector, i->size,
2114                                          (unsigned long long)sector, size);
2115
2116                        if (req->rq_state & RQ_LOCAL_PENDING ||
2117                            !(req->rq_state & RQ_POSTPONED)) {
2118                                /*
2119                                 * Wait for the node with the discard flag to
2120                                 * decide if this request has been superseded
2121                                 * or needs to be retried.
2122                                 * Requests that have been superseded will
2123                                 * disappear from the write_requests tree.
2124                                 *
2125                                 * In addition, wait for the conflicting
2126                                 * request to finish locally before submitting
2127                                 * the conflicting peer request.
2128                                 */
2129                                err = drbd_wait_misc(mdev, &req->i);
2130                                if (err) {
2131                                        _conn_request_state(mdev->tconn,
2132                                                            NS(conn, C_TIMEOUT),
2133                                                            CS_HARD);
2134                                        fail_postponed_requests(mdev, sector, size);
2135                                        goto out;
2136                                }
2137                                goto repeat;
2138                        }
2139                        /*
2140                         * Remember to restart the conflicting requests after
2141                         * the new peer request has completed.
2142                         */
2143                        peer_req->flags |= EE_RESTART_REQUESTS;
2144                }
2145        }
2146        err = 0;
2147
2148    out:
2149        if (err)
2150                drbd_remove_epoch_entry_interval(mdev, peer_req);
2151        return err;
2152}
2153
2154/* mirrored write */
2155static int receive_Data(struct drbd_tconn *tconn, struct packet_info *pi)
2156{
2157        struct drbd_conf *mdev;
2158        sector_t sector;
2159        struct drbd_peer_request *peer_req;
2160        struct p_data *p = pi->data;
2161        u32 peer_seq = be32_to_cpu(p->seq_num);
2162        int rw = WRITE;
2163        u32 dp_flags;
2164        int err, tp;
2165
2166        mdev = vnr_to_mdev(tconn, pi->vnr);
2167        if (!mdev)
2168                return -EIO;
2169
2170        if (!get_ldev(mdev)) {
2171                int err2;
2172
2173                err = wait_for_and_update_peer_seq(mdev, peer_seq);
2174                drbd_send_ack_dp(mdev, P_NEG_ACK, p, pi->size);
2175                atomic_inc(&tconn->current_epoch->epoch_size);
2176                err2 = drbd_drain_block(mdev, pi->size);
2177                if (!err)
2178                        err = err2;
2179                return err;
2180        }
2181
2182        /*
2183         * Corresponding put_ldev done either below (on various errors), or in
2184         * drbd_peer_request_endio, if we successfully submit the data at the
2185         * end of this function.
2186         */
2187
2188        sector = be64_to_cpu(p->sector);
2189        peer_req = read_in_block(mdev, p->block_id, sector, pi->size);
2190        if (!peer_req) {
2191                put_ldev(mdev);
2192                return -EIO;
2193        }
2194
2195        peer_req->w.cb = e_end_block;
2196
2197        dp_flags = be32_to_cpu(p->dp_flags);
2198        rw |= wire_flags_to_bio(mdev, dp_flags);
2199        if (peer_req->pages == NULL) {
2200                D_ASSERT(peer_req->i.size == 0);
2201                D_ASSERT(dp_flags & DP_FLUSH);
2202        }
2203
2204        if (dp_flags & DP_MAY_SET_IN_SYNC)
2205                peer_req->flags |= EE_MAY_SET_IN_SYNC;
2206
2207        spin_lock(&tconn->epoch_lock);
2208        peer_req->epoch = tconn->current_epoch;
2209        atomic_inc(&peer_req->epoch->epoch_size);
2210        atomic_inc(&peer_req->epoch->active);
2211        spin_unlock(&tconn->epoch_lock);
2212
2213        rcu_read_lock();
2214        tp = rcu_dereference(mdev->tconn->net_conf)->two_primaries;
2215        rcu_read_unlock();
2216        if (tp) {
2217                peer_req->flags |= EE_IN_INTERVAL_TREE;
2218                err = wait_for_and_update_peer_seq(mdev, peer_seq);
2219                if (err)
2220                        goto out_interrupted;
2221                spin_lock_irq(&mdev->tconn->req_lock);
2222                err = handle_write_conflicts(mdev, peer_req);
2223                if (err) {
2224                        spin_unlock_irq(&mdev->tconn->req_lock);
2225                        if (err == -ENOENT) {
2226                                put_ldev(mdev);
2227                                return 0;
2228                        }
2229                        goto out_interrupted;
2230                }
2231        } else
2232                spin_lock_irq(&mdev->tconn->req_lock);
2233        list_add(&peer_req->w.list, &mdev->active_ee);
2234        spin_unlock_irq(&mdev->tconn->req_lock);
2235
2236        if (mdev->state.conn == C_SYNC_TARGET)
2237                wait_event(mdev->ee_wait, !overlapping_resync_write(mdev, peer_req));
2238
2239        if (mdev->tconn->agreed_pro_version < 100) {
2240                rcu_read_lock();
2241                switch (rcu_dereference(mdev->tconn->net_conf)->wire_protocol) {
2242                case DRBD_PROT_C:
2243                        dp_flags |= DP_SEND_WRITE_ACK;
2244                        break;
2245                case DRBD_PROT_B:
2246                        dp_flags |= DP_SEND_RECEIVE_ACK;
2247                        break;
2248                }
2249                rcu_read_unlock();
2250        }
2251
2252        if (dp_flags & DP_SEND_WRITE_ACK) {
2253                peer_req->flags |= EE_SEND_WRITE_ACK;
2254                inc_unacked(mdev);
2255                /* corresponding dec_unacked() in e_end_block()
2256                 * respective _drbd_clear_done_ee */
2257        }
2258
2259        if (dp_flags & DP_SEND_RECEIVE_ACK) {
2260                /* I really don't like it that the receiver thread
2261                 * sends on the msock, but anyways */
2262                drbd_send_ack(mdev, P_RECV_ACK, peer_req);
2263        }
2264
2265        if (mdev->state.pdsk < D_INCONSISTENT) {
2266                /* In case we have the only disk of the cluster, */
2267                drbd_set_out_of_sync(mdev, peer_req->i.sector, peer_req->i.size);
2268                peer_req->flags |= EE_CALL_AL_COMPLETE_IO;
2269                peer_req->flags &= ~EE_MAY_SET_IN_SYNC;
2270                drbd_al_begin_io(mdev, &peer_req->i, true);
2271        }
2272
2273        err = drbd_submit_peer_request(mdev, peer_req, rw, DRBD_FAULT_DT_WR);
2274        if (!err)
2275                return 0;
2276
2277        /* don't care for the reason here */
2278        dev_err(DEV, "submit failed, triggering re-connect\n");
2279        spin_lock_irq(&mdev->tconn->req_lock);
2280        list_del(&peer_req->w.list);
2281        drbd_remove_epoch_entry_interval(mdev, peer_req);
2282        spin_unlock_irq(&mdev->tconn->req_lock);
2283        if (peer_req->flags & EE_CALL_AL_COMPLETE_IO)
2284                drbd_al_complete_io(mdev, &peer_req->i);
2285
2286out_interrupted:
2287        drbd_may_finish_epoch(tconn, peer_req->epoch, EV_PUT + EV_CLEANUP);
2288        put_ldev(mdev);
2289        drbd_free_peer_req(mdev, peer_req);
2290        return err;
2291}
2292
2293/* We may throttle resync, if the lower device seems to be busy,
2294 * and current sync rate is above c_min_rate.
2295 *
2296 * To decide whether or not the lower device is busy, we use a scheme similar
2297 * to MD RAID is_mddev_idle(): if the partition stats reveal "significant"
2298 * (more than 64 sectors) of activity we cannot account for with our own resync
2299 * activity, it obviously is "busy".
2300 *
2301 * The current sync rate used here uses only the most recent two step marks,
2302 * to have a short time average so we can react faster.
2303 */
2304int drbd_rs_should_slow_down(struct drbd_conf *mdev, sector_t sector)
2305{
2306        struct gendisk *disk = mdev->ldev->backing_bdev->bd_contains->bd_disk;
2307        unsigned long db, dt, dbdt;
2308        struct lc_element *tmp;
2309        int curr_events;
2310        int throttle = 0;
2311        unsigned int c_min_rate;
2312
2313        rcu_read_lock();
2314        c_min_rate = rcu_dereference(mdev->ldev->disk_conf)->c_min_rate;
2315        rcu_read_unlock();
2316
2317        /* feature disabled? */
2318        if (c_min_rate == 0)
2319                return 0;
2320
2321        spin_lock_irq(&mdev->al_lock);
2322        tmp = lc_find(mdev->resync, BM_SECT_TO_EXT(sector));
2323        if (tmp) {
2324                struct bm_extent *bm_ext = lc_entry(tmp, struct bm_extent, lce);
2325                if (test_bit(BME_PRIORITY, &bm_ext->flags)) {
2326                        spin_unlock_irq(&mdev->al_lock);
2327                        return 0;
2328                }
2329                /* Do not slow down if app IO is already waiting for this extent */
2330        }
2331        spin_unlock_irq(&mdev->al_lock);
2332
2333        curr_events = (int)part_stat_read(&disk->part0, sectors[0]) +
2334                      (int)part_stat_read(&disk->part0, sectors[1]) -
2335                        atomic_read(&mdev->rs_sect_ev);
2336
2337        if (!mdev->rs_last_events || curr_events - mdev->rs_last_events > 64) {
2338                unsigned long rs_left;
2339                int i;
2340
2341                mdev->rs_last_events = curr_events;
2342
2343                /* sync speed average over the last 2*DRBD_SYNC_MARK_STEP,
2344                 * approx. */
2345                i = (mdev->rs_last_mark + DRBD_SYNC_MARKS-1) % DRBD_SYNC_MARKS;
2346
2347                if (mdev->state.conn == C_VERIFY_S || mdev->state.conn == C_VERIFY_T)
2348                        rs_left = mdev->ov_left;
2349                else
2350                        rs_left = drbd_bm_total_weight(mdev) - mdev->rs_failed;
2351
2352                dt = ((long)jiffies - (long)mdev->rs_mark_time[i]) / HZ;
2353                if (!dt)
2354                        dt++;
2355                db = mdev->rs_mark_left[i] - rs_left;
2356                dbdt = Bit2KB(db/dt);
2357
2358                if (dbdt > c_min_rate)
2359                        throttle = 1;
2360        }
2361        return throttle;
2362}
2363
2364
2365static int receive_DataRequest(struct drbd_tconn *tconn, struct packet_info *pi)
2366{
2367        struct drbd_conf *mdev;
2368        sector_t sector;
2369        sector_t capacity;
2370        struct drbd_peer_request *peer_req;
2371        struct digest_info *di = NULL;
2372        int size, verb;
2373        unsigned int fault_type;
2374        struct p_block_req *p = pi->data;
2375
2376        mdev = vnr_to_mdev(tconn, pi->vnr);
2377        if (!mdev)
2378                return -EIO;
2379        capacity = drbd_get_capacity(mdev->this_bdev);
2380
2381        sector = be64_to_cpu(p->sector);
2382        size   = be32_to_cpu(p->blksize);
2383
2384        if (size <= 0 || !IS_ALIGNED(size, 512) || size > DRBD_MAX_BIO_SIZE) {
2385                dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2386                                (unsigned long long)sector, size);
2387                return -EINVAL;
2388        }
2389        if (sector + (size>>9) > capacity) {
2390                dev_err(DEV, "%s:%d: sector: %llus, size: %u\n", __FILE__, __LINE__,
2391                                (unsigned long long)sector, size);
2392                return -EINVAL;
2393        }
2394
2395        if (!get_ldev_if_state(mdev, D_UP_TO_DATE)) {
2396                verb = 1;
2397                switch (pi->cmd) {
2398                case P_DATA_REQUEST:
2399                        drbd_send_ack_rp(mdev, P_NEG_DREPLY, p);
2400                        break;
2401                case P_RS_DATA_REQUEST:
2402                case P_CSUM_RS_REQUEST:
2403                case P_OV_REQUEST:
2404                        drbd_send_ack_rp(mdev, P_NEG_RS_DREPLY , p);
2405                        break;
2406                case P_OV_REPLY:
2407                        verb = 0;
2408                        dec_rs_pending(mdev);
2409                        drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, ID_IN_SYNC);
2410                        break;
2411                default:
2412                        BUG();
2413                }
2414                if (verb && __ratelimit(&drbd_ratelimit_state))
2415                        dev_err(DEV, "Can not satisfy peer's read request, "
2416                            "no local data.\n");
2417
2418                /* drain possibly payload */
2419                return drbd_drain_block(mdev, pi->size);
2420        }
2421
2422        /* GFP_NOIO, because we must not cause arbitrary write-out: in a DRBD
2423         * "criss-cross" setup, that might cause write-out on some other DRBD,
2424         * which in turn might block on the other node at this very place.  */
2425        peer_req = drbd_alloc_peer_req(mdev, p->block_id, sector, size, GFP_NOIO);
2426        if (!peer_req) {
2427                put_ldev(mdev);
2428                return -ENOMEM;
2429        }
2430
2431        switch (pi->cmd) {
2432        case P_DATA_REQUEST:
2433                peer_req->w.cb = w_e_end_data_req;
2434                fault_type = DRBD_FAULT_DT_RD;
2435                /* application IO, don't drbd_rs_begin_io */
2436                goto submit;
2437
2438        case P_RS_DATA_REQUEST:
2439                peer_req->w.cb = w_e_end_rsdata_req;
2440                fault_type = DRBD_FAULT_RS_RD;
2441                /* used in the sector offset progress display */
2442                mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2443                break;
2444
2445        case P_OV_REPLY:
2446        case P_CSUM_RS_REQUEST:
2447                fault_type = DRBD_FAULT_RS_RD;
2448                di = kmalloc(sizeof(*di) + pi->size, GFP_NOIO);
2449                if (!di)
2450                        goto out_free_e;
2451
2452                di->digest_size = pi->size;
2453                di->digest = (((char *)di)+sizeof(struct digest_info));
2454
2455                peer_req->digest = di;
2456                peer_req->flags |= EE_HAS_DIGEST;
2457
2458                if (drbd_recv_all(mdev->tconn, di->digest, pi->size))
2459                        goto out_free_e;
2460
2461                if (pi->cmd == P_CSUM_RS_REQUEST) {
2462                        D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
2463                        peer_req->w.cb = w_e_end_csum_rs_req;
2464                        /* used in the sector offset progress display */
2465                        mdev->bm_resync_fo = BM_SECT_TO_BIT(sector);
2466                } else if (pi->cmd == P_OV_REPLY) {
2467                        /* track progress, we may need to throttle */
2468                        atomic_add(size >> 9, &mdev->rs_sect_in);
2469                        peer_req->w.cb = w_e_end_ov_reply;
2470                        dec_rs_pending(mdev);
2471                        /* drbd_rs_begin_io done when we sent this request,
2472                         * but accounting still needs to be done. */
2473                        goto submit_for_resync;
2474                }
2475                break;
2476
2477        case P_OV_REQUEST:
2478                if (mdev->ov_start_sector == ~(sector_t)0 &&
2479                    mdev->tconn->agreed_pro_version >= 90) {
2480                        unsigned long now = jiffies;
2481                        int i;
2482                        mdev->ov_start_sector = sector;
2483                        mdev->ov_position = sector;
2484                        mdev->ov_left = drbd_bm_bits(mdev) - BM_SECT_TO_BIT(sector);
2485                        mdev->rs_total = mdev->ov_left;
2486                        for (i = 0; i < DRBD_SYNC_MARKS; i++) {
2487                                mdev->rs_mark_left[i] = mdev->ov_left;
2488                                mdev->rs_mark_time[i] = now;
2489                        }
2490                        dev_info(DEV, "Online Verify start sector: %llu\n",
2491                                        (unsigned long long)sector);
2492                }
2493                peer_req->w.cb = w_e_end_ov_req;
2494                fault_type = DRBD_FAULT_RS_RD;
2495                break;
2496
2497        default:
2498                BUG();
2499        }
2500
2501        /* Throttle, drbd_rs_begin_io and submit should become asynchronous
2502         * wrt the receiver, but it is not as straightforward as it may seem.
2503         * Various places in the resync start and stop logic assume resync
2504         * requests are processed in order, requeuing this on the worker thread
2505         * introduces a bunch of new code for synchronization between threads.
2506         *
2507         * Unlimited throttling before drbd_rs_begin_io may stall the resync
2508         * "forever", throttling after drbd_rs_begin_io will lock that extent
2509         * for application writes for the same time.  For now, just throttle
2510         * here, where the rest of the code expects the receiver to sleep for
2511         * a while, anyways.
2512         */
2513
2514        /* Throttle before drbd_rs_begin_io, as that locks out application IO;
2515         * this defers syncer requests for some time, before letting at least
2516         * on request through.  The resync controller on the receiving side
2517         * will adapt to the incoming rate accordingly.
2518         *
2519         * We cannot throttle here if remote is Primary/SyncTarget:
2520         * we would also throttle its application reads.
2521         * In that case, throttling is done on the SyncTarget only.
2522         */
2523        if (mdev->state.peer != R_PRIMARY && drbd_rs_should_slow_down(mdev, sector))
2524                schedule_timeout_uninterruptible(HZ/10);
2525        if (drbd_rs_begin_io(mdev, sector))
2526                goto out_free_e;
2527
2528submit_for_resync:
2529        atomic_add(size >> 9, &mdev->rs_sect_ev);
2530
2531submit:
2532        inc_unacked(mdev);
2533        spin_lock_irq(&mdev->tconn->req_lock);
2534        list_add_tail(&peer_req->w.list, &mdev->read_ee);
2535        spin_unlock_irq(&mdev->tconn->req_lock);
2536
2537        if (drbd_submit_peer_request(mdev, peer_req, READ, fault_type) == 0)
2538                return 0;
2539
2540        /* don't care for the reason here */
2541        dev_err(DEV, "submit failed, triggering re-connect\n");
2542        spin_lock_irq(&mdev->tconn->req_lock);
2543        list_del(&peer_req->w.list);
2544        spin_unlock_irq(&mdev->tconn->req_lock);
2545        /* no drbd_rs_complete_io(), we are dropping the connection anyways */
2546
2547out_free_e:
2548        put_ldev(mdev);
2549        drbd_free_peer_req(mdev, peer_req);
2550        return -EIO;
2551}
2552
2553static int drbd_asb_recover_0p(struct drbd_conf *mdev) __must_hold(local)
2554{
2555        int self, peer, rv = -100;
2556        unsigned long ch_self, ch_peer;
2557        enum drbd_after_sb_p after_sb_0p;
2558
2559        self = mdev->ldev->md.uuid[UI_BITMAP] & 1;
2560        peer = mdev->p_uuid[UI_BITMAP] & 1;
2561
2562        ch_peer = mdev->p_uuid[UI_SIZE];
2563        ch_self = mdev->comm_bm_set;
2564
2565        rcu_read_lock();
2566        after_sb_0p = rcu_dereference(mdev->tconn->net_conf)->after_sb_0p;
2567        rcu_read_unlock();
2568        switch (after_sb_0p) {
2569        case ASB_CONSENSUS:
2570        case ASB_DISCARD_SECONDARY:
2571        case ASB_CALL_HELPER:
2572        case ASB_VIOLENTLY:
2573                dev_err(DEV, "Configuration error.\n");
2574                break;
2575        case ASB_DISCONNECT:
2576                break;
2577        case ASB_DISCARD_YOUNGER_PRI:
2578                if (self == 0 && peer == 1) {
2579                        rv = -1;
2580                        break;
2581                }
2582                if (self == 1 && peer == 0) {
2583                        rv =  1;
2584                        break;
2585                }
2586                /* Else fall through to one of the other strategies... */
2587        case ASB_DISCARD_OLDER_PRI:
2588                if (self == 0 && peer == 1) {
2589                        rv = 1;
2590                        break;
2591                }
2592                if (self == 1 && peer == 0) {
2593                        rv = -1;
2594                        break;
2595                }
2596                /* Else fall through to one of the other strategies... */
2597                dev_warn(DEV, "Discard younger/older primary did not find a decision\n"
2598                     "Using discard-least-changes instead\n");
2599        case ASB_DISCARD_ZERO_CHG:
2600                if (ch_peer == 0 && ch_self == 0) {
2601                        rv = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags)
2602                                ? -1 : 1;
2603                        break;
2604                } else {
2605                        if (ch_peer == 0) { rv =  1; break; }
2606                        if (ch_self == 0) { rv = -1; break; }
2607                }
2608                if (after_sb_0p == ASB_DISCARD_ZERO_CHG)
2609                        break;
2610        case ASB_DISCARD_LEAST_CHG:
2611                if      (ch_self < ch_peer)
2612                        rv = -1;
2613                else if (ch_self > ch_peer)
2614                        rv =  1;
2615                else /* ( ch_self == ch_peer ) */
2616                     /* Well, then use something else. */
2617                        rv = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags)
2618                                ? -1 : 1;
2619                break;
2620        case ASB_DISCARD_LOCAL:
2621                rv = -1;
2622                break;
2623        case ASB_DISCARD_REMOTE:
2624                rv =  1;
2625        }
2626
2627        return rv;
2628}
2629
2630static int drbd_asb_recover_1p(struct drbd_conf *mdev) __must_hold(local)
2631{
2632        int hg, rv = -100;
2633        enum drbd_after_sb_p after_sb_1p;
2634
2635        rcu_read_lock();
2636        after_sb_1p = rcu_dereference(mdev->tconn->net_conf)->after_sb_1p;
2637        rcu_read_unlock();
2638        switch (after_sb_1p) {
2639        case ASB_DISCARD_YOUNGER_PRI:
2640        case ASB_DISCARD_OLDER_PRI:
2641        case ASB_DISCARD_LEAST_CHG:
2642        case ASB_DISCARD_LOCAL:
2643        case ASB_DISCARD_REMOTE:
2644        case ASB_DISCARD_ZERO_CHG:
2645                dev_err(DEV, "Configuration error.\n");
2646                break;
2647        case ASB_DISCONNECT:
2648                break;
2649        case ASB_CONSENSUS:
2650                hg = drbd_asb_recover_0p(mdev);
2651                if (hg == -1 && mdev->state.role == R_SECONDARY)
2652                        rv = hg;
2653                if (hg == 1  && mdev->state.role == R_PRIMARY)
2654                        rv = hg;
2655                break;
2656        case ASB_VIOLENTLY:
2657                rv = drbd_asb_recover_0p(mdev);
2658                break;
2659        case ASB_DISCARD_SECONDARY:
2660                return mdev->state.role == R_PRIMARY ? 1 : -1;
2661        case ASB_CALL_HELPER:
2662                hg = drbd_asb_recover_0p(mdev);
2663                if (hg == -1 && mdev->state.role == R_PRIMARY) {
2664                        enum drbd_state_rv rv2;
2665
2666                         /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2667                          * we might be here in C_WF_REPORT_PARAMS which is transient.
2668                          * we do not need to wait for the after state change work either. */
2669                        rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2670                        if (rv2 != SS_SUCCESS) {
2671                                drbd_khelper(mdev, "pri-lost-after-sb");
2672                        } else {
2673                                dev_warn(DEV, "Successfully gave up primary role.\n");
2674                                rv = hg;
2675                        }
2676                } else
2677                        rv = hg;
2678        }
2679
2680        return rv;
2681}
2682
2683static int drbd_asb_recover_2p(struct drbd_conf *mdev) __must_hold(local)
2684{
2685        int hg, rv = -100;
2686        enum drbd_after_sb_p after_sb_2p;
2687
2688        rcu_read_lock();
2689        after_sb_2p = rcu_dereference(mdev->tconn->net_conf)->after_sb_2p;
2690        rcu_read_unlock();
2691        switch (after_sb_2p) {
2692        case ASB_DISCARD_YOUNGER_PRI:
2693        case ASB_DISCARD_OLDER_PRI:
2694        case ASB_DISCARD_LEAST_CHG:
2695        case ASB_DISCARD_LOCAL:
2696        case ASB_DISCARD_REMOTE:
2697        case ASB_CONSENSUS:
2698        case ASB_DISCARD_SECONDARY:
2699        case ASB_DISCARD_ZERO_CHG:
2700                dev_err(DEV, "Configuration error.\n");
2701                break;
2702        case ASB_VIOLENTLY:
2703                rv = drbd_asb_recover_0p(mdev);
2704                break;
2705        case ASB_DISCONNECT:
2706                break;
2707        case ASB_CALL_HELPER:
2708                hg = drbd_asb_recover_0p(mdev);
2709                if (hg == -1) {
2710                        enum drbd_state_rv rv2;
2711
2712                         /* drbd_change_state() does not sleep while in SS_IN_TRANSIENT_STATE,
2713                          * we might be here in C_WF_REPORT_PARAMS which is transient.
2714                          * we do not need to wait for the after state change work either. */
2715                        rv2 = drbd_change_state(mdev, CS_VERBOSE, NS(role, R_SECONDARY));
2716                        if (rv2 != SS_SUCCESS) {
2717                                drbd_khelper(mdev, "pri-lost-after-sb");
2718                        } else {
2719                                dev_warn(DEV, "Successfully gave up primary role.\n");
2720                                rv = hg;
2721                        }
2722                } else
2723                        rv = hg;
2724        }
2725
2726        return rv;
2727}
2728
2729static void drbd_uuid_dump(struct drbd_conf *mdev, char *text, u64 *uuid,
2730                           u64 bits, u64 flags)
2731{
2732        if (!uuid) {
2733                dev_info(DEV, "%s uuid info vanished while I was looking!\n", text);
2734                return;
2735        }
2736        dev_info(DEV, "%s %016llX:%016llX:%016llX:%016llX bits:%llu flags:%llX\n",
2737             text,
2738             (unsigned long long)uuid[UI_CURRENT],
2739             (unsigned long long)uuid[UI_BITMAP],
2740             (unsigned long long)uuid[UI_HISTORY_START],
2741             (unsigned long long)uuid[UI_HISTORY_END],
2742             (unsigned long long)bits,
2743             (unsigned long long)flags);
2744}
2745
2746/*
2747  100   after split brain try auto recover
2748    2   C_SYNC_SOURCE set BitMap
2749    1   C_SYNC_SOURCE use BitMap
2750    0   no Sync
2751   -1   C_SYNC_TARGET use BitMap
2752   -2   C_SYNC_TARGET set BitMap
2753 -100   after split brain, disconnect
2754-1000   unrelated data
2755-1091   requires proto 91
2756-1096   requires proto 96
2757 */
2758static int drbd_uuid_compare(struct drbd_conf *mdev, int *rule_nr) __must_hold(local)
2759{
2760        u64 self, peer;
2761        int i, j;
2762
2763        self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2764        peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2765
2766        *rule_nr = 10;
2767        if (self == UUID_JUST_CREATED && peer == UUID_JUST_CREATED)
2768                return 0;
2769
2770        *rule_nr = 20;
2771        if ((self == UUID_JUST_CREATED || self == (u64)0) &&
2772             peer != UUID_JUST_CREATED)
2773                return -2;
2774
2775        *rule_nr = 30;
2776        if (self != UUID_JUST_CREATED &&
2777            (peer == UUID_JUST_CREATED || peer == (u64)0))
2778                return 2;
2779
2780        if (self == peer) {
2781                int rct, dc; /* roles at crash time */
2782
2783                if (mdev->p_uuid[UI_BITMAP] == (u64)0 && mdev->ldev->md.uuid[UI_BITMAP] != (u64)0) {
2784
2785                        if (mdev->tconn->agreed_pro_version < 91)
2786                                return -1091;
2787
2788                        if ((mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) &&
2789                            (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1))) {
2790                                dev_info(DEV, "was SyncSource, missed the resync finished event, corrected myself:\n");
2791                                drbd_uuid_move_history(mdev);
2792                                mdev->ldev->md.uuid[UI_HISTORY_START] = mdev->ldev->md.uuid[UI_BITMAP];
2793                                mdev->ldev->md.uuid[UI_BITMAP] = 0;
2794
2795                                drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2796                                               mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2797                                *rule_nr = 34;
2798                        } else {
2799                                dev_info(DEV, "was SyncSource (peer failed to write sync_uuid)\n");
2800                                *rule_nr = 36;
2801                        }
2802
2803                        return 1;
2804                }
2805
2806                if (mdev->ldev->md.uuid[UI_BITMAP] == (u64)0 && mdev->p_uuid[UI_BITMAP] != (u64)0) {
2807
2808                        if (mdev->tconn->agreed_pro_version < 91)
2809                                return -1091;
2810
2811                        if ((mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) == (mdev->p_uuid[UI_BITMAP] & ~((u64)1)) &&
2812                            (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) == (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1))) {
2813                                dev_info(DEV, "was SyncTarget, peer missed the resync finished event, corrected peer:\n");
2814
2815                                mdev->p_uuid[UI_HISTORY_START + 1] = mdev->p_uuid[UI_HISTORY_START];
2816                                mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_BITMAP];
2817                                mdev->p_uuid[UI_BITMAP] = 0UL;
2818
2819                                drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2820                                *rule_nr = 35;
2821                        } else {
2822                                dev_info(DEV, "was SyncTarget (failed to write sync_uuid)\n");
2823                                *rule_nr = 37;
2824                        }
2825
2826                        return -1;
2827                }
2828
2829                /* Common power [off|failure] */
2830                rct = (test_bit(CRASHED_PRIMARY, &mdev->flags) ? 1 : 0) +
2831                        (mdev->p_uuid[UI_FLAGS] & 2);
2832                /* lowest bit is set when we were primary,
2833                 * next bit (weight 2) is set when peer was primary */
2834                *rule_nr = 40;
2835
2836                switch (rct) {
2837                case 0: /* !self_pri && !peer_pri */ return 0;
2838                case 1: /*  self_pri && !peer_pri */ return 1;
2839                case 2: /* !self_pri &&  peer_pri */ return -1;
2840                case 3: /*  self_pri &&  peer_pri */
2841                        dc = test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags);
2842                        return dc ? -1 : 1;
2843                }
2844        }
2845
2846        *rule_nr = 50;
2847        peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2848        if (self == peer)
2849                return -1;
2850
2851        *rule_nr = 51;
2852        peer = mdev->p_uuid[UI_HISTORY_START] & ~((u64)1);
2853        if (self == peer) {
2854                if (mdev->tconn->agreed_pro_version < 96 ?
2855                    (mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1)) ==
2856                    (mdev->p_uuid[UI_HISTORY_START + 1] & ~((u64)1)) :
2857                    peer + UUID_NEW_BM_OFFSET == (mdev->p_uuid[UI_BITMAP] & ~((u64)1))) {
2858                        /* The last P_SYNC_UUID did not get though. Undo the last start of
2859                           resync as sync source modifications of the peer's UUIDs. */
2860
2861                        if (mdev->tconn->agreed_pro_version < 91)
2862                                return -1091;
2863
2864                        mdev->p_uuid[UI_BITMAP] = mdev->p_uuid[UI_HISTORY_START];
2865                        mdev->p_uuid[UI_HISTORY_START] = mdev->p_uuid[UI_HISTORY_START + 1];
2866
2867                        dev_info(DEV, "Lost last syncUUID packet, corrected:\n");
2868                        drbd_uuid_dump(mdev, "peer", mdev->p_uuid, mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2869
2870                        return -1;
2871                }
2872        }
2873
2874        *rule_nr = 60;
2875        self = mdev->ldev->md.uuid[UI_CURRENT] & ~((u64)1);
2876        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2877                peer = mdev->p_uuid[i] & ~((u64)1);
2878                if (self == peer)
2879                        return -2;
2880        }
2881
2882        *rule_nr = 70;
2883        self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2884        peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2885        if (self == peer)
2886                return 1;
2887
2888        *rule_nr = 71;
2889        self = mdev->ldev->md.uuid[UI_HISTORY_START] & ~((u64)1);
2890        if (self == peer) {
2891                if (mdev->tconn->agreed_pro_version < 96 ?
2892                    (mdev->ldev->md.uuid[UI_HISTORY_START + 1] & ~((u64)1)) ==
2893                    (mdev->p_uuid[UI_HISTORY_START] & ~((u64)1)) :
2894                    self + UUID_NEW_BM_OFFSET == (mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1))) {
2895                        /* The last P_SYNC_UUID did not get though. Undo the last start of
2896                           resync as sync source modifications of our UUIDs. */
2897
2898                        if (mdev->tconn->agreed_pro_version < 91)
2899                                return -1091;
2900
2901                        __drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_HISTORY_START]);
2902                        __drbd_uuid_set(mdev, UI_HISTORY_START, mdev->ldev->md.uuid[UI_HISTORY_START + 1]);
2903
2904                        dev_info(DEV, "Last syncUUID did not get through, corrected:\n");
2905                        drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid,
2906                                       mdev->state.disk >= D_NEGOTIATING ? drbd_bm_total_weight(mdev) : 0, 0);
2907
2908                        return 1;
2909                }
2910        }
2911
2912
2913        *rule_nr = 80;
2914        peer = mdev->p_uuid[UI_CURRENT] & ~((u64)1);
2915        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2916                self = mdev->ldev->md.uuid[i] & ~((u64)1);
2917                if (self == peer)
2918                        return 2;
2919        }
2920
2921        *rule_nr = 90;
2922        self = mdev->ldev->md.uuid[UI_BITMAP] & ~((u64)1);
2923        peer = mdev->p_uuid[UI_BITMAP] & ~((u64)1);
2924        if (self == peer && self != ((u64)0))
2925                return 100;
2926
2927        *rule_nr = 100;
2928        for (i = UI_HISTORY_START; i <= UI_HISTORY_END; i++) {
2929                self = mdev->ldev->md.uuid[i] & ~((u64)1);
2930                for (j = UI_HISTORY_START; j <= UI_HISTORY_END; j++) {
2931                        peer = mdev->p_uuid[j] & ~((u64)1);
2932                        if (self == peer)
2933                                return -100;
2934                }
2935        }
2936
2937        return -1000;
2938}
2939
2940/* drbd_sync_handshake() returns the new conn state on success, or
2941   CONN_MASK (-1) on failure.
2942 */
2943static enum drbd_conns drbd_sync_handshake(struct drbd_conf *mdev, enum drbd_role peer_role,
2944                                           enum drbd_disk_state peer_disk) __must_hold(local)
2945{
2946        enum drbd_conns rv = C_MASK;
2947        enum drbd_disk_state mydisk;
2948        struct net_conf *nc;
2949        int hg, rule_nr, rr_conflict, tentative;
2950
2951        mydisk = mdev->state.disk;
2952        if (mydisk == D_NEGOTIATING)
2953                mydisk = mdev->new_state_tmp.disk;
2954
2955        dev_info(DEV, "drbd_sync_handshake:\n");
2956
2957        spin_lock_irq(&mdev->ldev->md.uuid_lock);
2958        drbd_uuid_dump(mdev, "self", mdev->ldev->md.uuid, mdev->comm_bm_set, 0);
2959        drbd_uuid_dump(mdev, "peer", mdev->p_uuid,
2960                       mdev->p_uuid[UI_SIZE], mdev->p_uuid[UI_FLAGS]);
2961
2962        hg = drbd_uuid_compare(mdev, &rule_nr);
2963        spin_unlock_irq(&mdev->ldev->md.uuid_lock);
2964
2965        dev_info(DEV, "uuid_compare()=%d by rule %d\n", hg, rule_nr);
2966
2967        if (hg == -1000) {
2968                dev_alert(DEV, "Unrelated data, aborting!\n");
2969                return C_MASK;
2970        }
2971        if (hg < -1000) {
2972                dev_alert(DEV, "To resolve this both sides have to support at least protocol %d\n", -hg - 1000);
2973                return C_MASK;
2974        }
2975
2976        if    ((mydisk == D_INCONSISTENT && peer_disk > D_INCONSISTENT) ||
2977            (peer_disk == D_INCONSISTENT && mydisk    > D_INCONSISTENT)) {
2978                int f = (hg == -100) || abs(hg) == 2;
2979                hg = mydisk > D_INCONSISTENT ? 1 : -1;
2980                if (f)
2981                        hg = hg*2;
2982                dev_info(DEV, "Becoming sync %s due to disk states.\n",
2983                     hg > 0 ? "source" : "target");
2984        }
2985
2986        if (abs(hg) == 100)
2987                drbd_khelper(mdev, "initial-split-brain");
2988
2989        rcu_read_lock();
2990        nc = rcu_dereference(mdev->tconn->net_conf);
2991
2992        if (hg == 100 || (hg == -100 && nc->always_asbp)) {
2993                int pcount = (mdev->state.role == R_PRIMARY)
2994                           + (peer_role == R_PRIMARY);
2995                int forced = (hg == -100);
2996
2997                switch (pcount) {
2998                case 0:
2999                        hg = drbd_asb_recover_0p(mdev);
3000                        break;
3001                case 1:
3002                        hg = drbd_asb_recover_1p(mdev);
3003                        break;
3004                case 2:
3005                        hg = drbd_asb_recover_2p(mdev);
3006                        break;
3007                }
3008                if (abs(hg) < 100) {
3009                        dev_warn(DEV, "Split-Brain detected, %d primaries, "
3010                             "automatically solved. Sync from %s node\n",
3011                             pcount, (hg < 0) ? "peer" : "this");
3012                        if (forced) {
3013                                dev_warn(DEV, "Doing a full sync, since"
3014                                     " UUIDs where ambiguous.\n");
3015                                hg = hg*2;
3016                        }
3017                }
3018        }
3019
3020        if (hg == -100) {
3021                if (test_bit(DISCARD_MY_DATA, &mdev->flags) && !(mdev->p_uuid[UI_FLAGS]&1))
3022                        hg = -1;
3023                if (!test_bit(DISCARD_MY_DATA, &mdev->flags) && (mdev->p_uuid[UI_FLAGS]&1))
3024                        hg = 1;
3025
3026                if (abs(hg) < 100)
3027                        dev_warn(DEV, "Split-Brain detected, manually solved. "
3028                             "Sync from %s node\n",
3029                             (hg < 0) ? "peer" : "this");
3030        }
3031        rr_conflict = nc->rr_conflict;
3032        tentative = nc->tentative;
3033        rcu_read_unlock();
3034
3035        if (hg == -100) {
3036                /* FIXME this log message is not correct if we end up here
3037                 * after an attempted attach on a diskless node.
3038                 * We just refuse to attach -- well, we drop the "connection"
3039                 * to that disk, in a way... */
3040                dev_alert(DEV, "Split-Brain detected but unresolved, dropping connection!\n");
3041                drbd_khelper(mdev, "split-brain");
3042                return C_MASK;
3043        }
3044
3045        if (hg > 0 && mydisk <= D_INCONSISTENT) {
3046                dev_err(DEV, "I shall become SyncSource, but I am inconsistent!\n");
3047                return C_MASK;
3048        }
3049
3050        if (hg < 0 && /* by intention we do not use mydisk here. */
3051            mdev->state.role == R_PRIMARY && mdev->state.disk >= D_CONSISTENT) {
3052                switch (rr_conflict) {
3053                case ASB_CALL_HELPER:
3054                        drbd_khelper(mdev, "pri-lost");
3055                        /* fall through */
3056                case ASB_DISCONNECT:
3057                        dev_err(DEV, "I shall become SyncTarget, but I am primary!\n");
3058                        return C_MASK;
3059                case ASB_VIOLENTLY:
3060                        dev_warn(DEV, "Becoming SyncTarget, violating the stable-data"
3061                             "assumption\n");
3062                }
3063        }
3064
3065        if (tentative || test_bit(CONN_DRY_RUN, &mdev->tconn->flags)) {
3066                if (hg == 0)
3067                        dev_info(DEV, "dry-run connect: No resync, would become Connected immediately.\n");
3068                else
3069                        dev_info(DEV, "dry-run connect: Would become %s, doing a %s resync.",
3070                                 drbd_conn_str(hg > 0 ? C_SYNC_SOURCE : C_SYNC_TARGET),
3071                                 abs(hg) >= 2 ? "full" : "bit-map based");
3072                return C_MASK;
3073        }
3074
3075        if (abs(hg) >= 2) {
3076                dev_info(DEV, "Writing the whole bitmap, full sync required after drbd_sync_handshake.\n");
3077                if (drbd_bitmap_io(mdev, &drbd_bmio_set_n_write, "set_n_write from sync_handshake",
3078                                        BM_LOCKED_SET_ALLOWED))
3079                        return C_MASK;
3080        }
3081
3082        if (hg > 0) { /* become sync source. */
3083                rv = C_WF_BITMAP_S;
3084        } else if (hg < 0) { /* become sync target */
3085                rv = C_WF_BITMAP_T;
3086        } else {
3087                rv = C_CONNECTED;
3088                if (drbd_bm_total_weight(mdev)) {
3089                        dev_info(DEV, "No resync, but %lu bits in bitmap!\n",
3090                             drbd_bm_total_weight(mdev));
3091                }
3092        }
3093
3094        return rv;
3095}
3096
3097static enum drbd_after_sb_p convert_after_sb(enum drbd_after_sb_p peer)
3098{
3099        /* ASB_DISCARD_REMOTE - ASB_DISCARD_LOCAL is valid */
3100        if (peer == ASB_DISCARD_REMOTE)
3101                return ASB_DISCARD_LOCAL;
3102
3103        /* any other things with ASB_DISCARD_REMOTE or ASB_DISCARD_LOCAL are invalid */
3104        if (peer == ASB_DISCARD_LOCAL)
3105                return ASB_DISCARD_REMOTE;
3106
3107        /* everything else is valid if they are equal on both sides. */
3108        return peer;
3109}
3110
3111static int receive_protocol(struct drbd_tconn *tconn, struct packet_info *pi)
3112{
3113        struct p_protocol *p = pi->data;
3114        enum drbd_after_sb_p p_after_sb_0p, p_after_sb_1p, p_after_sb_2p;
3115        int p_proto, p_discard_my_data, p_two_primaries, cf;
3116        struct net_conf *nc, *old_net_conf, *new_net_conf = NULL;
3117        char integrity_alg[SHARED_SECRET_MAX] = "";
3118        struct crypto_hash *peer_integrity_tfm = NULL;
3119        void *int_dig_in = NULL, *int_dig_vv = NULL;
3120
3121        p_proto         = be32_to_cpu(p->protocol);
3122        p_after_sb_0p   = be32_to_cpu(p->after_sb_0p);
3123        p_after_sb_1p   = be32_to_cpu(p->after_sb_1p);
3124        p_after_sb_2p   = be32_to_cpu(p->after_sb_2p);
3125        p_two_primaries = be32_to_cpu(p->two_primaries);
3126        cf              = be32_to_cpu(p->conn_flags);
3127        p_discard_my_data = cf & CF_DISCARD_MY_DATA;
3128
3129        if (tconn->agreed_pro_version >= 87) {
3130                int err;
3131
3132                if (pi->size > sizeof(integrity_alg))
3133                        return -EIO;
3134                err = drbd_recv_all(tconn, integrity_alg, pi->size);
3135                if (err)
3136                        return err;
3137                integrity_alg[SHARED_SECRET_MAX - 1] = 0;
3138        }
3139
3140        if (pi->cmd != P_PROTOCOL_UPDATE) {
3141                clear_bit(CONN_DRY_RUN, &tconn->flags);
3142
3143                if (cf & CF_DRY_RUN)
3144                        set_bit(CONN_DRY_RUN, &tconn->flags);
3145
3146                rcu_read_lock();
3147                nc = rcu_dereference(tconn->net_conf);
3148
3149                if (p_proto != nc->wire_protocol) {
3150                        conn_err(tconn, "incompatible %s settings\n", "protocol");
3151                        goto disconnect_rcu_unlock;
3152                }
3153
3154                if (convert_after_sb(p_after_sb_0p) != nc->after_sb_0p) {
3155                        conn_err(tconn, "incompatible %s settings\n", "after-sb-0pri");
3156                        goto disconnect_rcu_unlock;
3157                }
3158
3159                if (convert_after_sb(p_after_sb_1p) != nc->after_sb_1p) {
3160                        conn_err(tconn, "incompatible %s settings\n", "after-sb-1pri");
3161                        goto disconnect_rcu_unlock;
3162                }
3163
3164                if (convert_after_sb(p_after_sb_2p) != nc->after_sb_2p) {
3165                        conn_err(tconn, "incompatible %s settings\n", "after-sb-2pri");
3166                        goto disconnect_rcu_unlock;
3167                }
3168
3169                if (p_discard_my_data && nc->discard_my_data) {
3170                        conn_err(tconn, "incompatible %s settings\n", "discard-my-data");
3171                        goto disconnect_rcu_unlock;
3172                }
3173
3174                if (p_two_primaries != nc->two_primaries) {
3175                        conn_err(tconn, "incompatible %s settings\n", "allow-two-primaries");
3176                        goto disconnect_rcu_unlock;
3177                }
3178
3179                if (strcmp(integrity_alg, nc->integrity_alg)) {
3180                        conn_err(tconn, "incompatible %s settings\n", "data-integrity-alg");
3181                        goto disconnect_rcu_unlock;
3182                }
3183
3184                rcu_read_unlock();
3185        }
3186
3187        if (integrity_alg[0]) {
3188                int hash_size;
3189
3190                /*
3191                 * We can only change the peer data integrity algorithm
3192                 * here.  Changing our own data integrity algorithm
3193                 * requires that we send a P_PROTOCOL_UPDATE packet at
3194                 * the same time; otherwise, the peer has no way to
3195                 * tell between which packets the algorithm should
3196                 * change.
3197                 */
3198
3199                peer_integrity_tfm = crypto_alloc_hash(integrity_alg, 0, CRYPTO_ALG_ASYNC);
3200                if (!peer_integrity_tfm) {
3201                        conn_err(tconn, "peer data-integrity-alg %s not supported\n",
3202                                 integrity_alg);
3203                        goto disconnect;
3204                }
3205
3206                hash_size = crypto_hash_digestsize(peer_integrity_tfm);
3207                int_dig_in = kmalloc(hash_size, GFP_KERNEL);
3208                int_dig_vv = kmalloc(hash_size, GFP_KERNEL);
3209                if (!(int_dig_in && int_dig_vv)) {
3210                        conn_err(tconn, "Allocation of buffers for data integrity checking failed\n");
3211                        goto disconnect;
3212                }
3213        }
3214
3215        new_net_conf = kmalloc(sizeof(struct net_conf), GFP_KERNEL);
3216        if (!new_net_conf) {
3217                conn_err(tconn, "Allocation of new net_conf failed\n");
3218                goto disconnect;
3219        }
3220
3221        mutex_lock(&tconn->data.mutex);
3222        mutex_lock(&tconn->conf_update);
3223        old_net_conf = tconn->net_conf;
3224        *new_net_conf = *old_net_conf;
3225
3226        new_net_conf->wire_protocol = p_proto;
3227        new_net_conf->after_sb_0p = convert_after_sb(p_after_sb_0p);
3228        new_net_conf->after_sb_1p = convert_after_sb(p_after_sb_1p);
3229        new_net_conf->after_sb_2p = convert_after_sb(p_after_sb_2p);
3230        new_net_conf->two_primaries = p_two_primaries;
3231
3232        rcu_assign_pointer(tconn->net_conf, new_net_conf);
3233        mutex_unlock(&tconn->conf_update);
3234        mutex_unlock(&tconn->data.mutex);
3235
3236        crypto_free_hash(tconn->peer_integrity_tfm);
3237        kfree(tconn->int_dig_in);
3238        kfree(tconn->int_dig_vv);
3239        tconn->peer_integrity_tfm = peer_integrity_tfm;
3240        tconn->int_dig_in = int_dig_in;
3241        tconn->int_dig_vv = int_dig_vv;
3242
3243        if (strcmp(old_net_conf->integrity_alg, integrity_alg))
3244                conn_info(tconn, "peer data-integrity-alg: %s\n",
3245                          integrity_alg[0] ? integrity_alg : "(none)");
3246
3247        synchronize_rcu();
3248        kfree(old_net_conf);
3249        return 0;
3250
3251disconnect_rcu_unlock:
3252        rcu_read_unlock();
3253disconnect:
3254        crypto_free_hash(peer_integrity_tfm);
3255        kfree(int_dig_in);
3256        kfree(int_dig_vv);
3257        conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3258        return -EIO;
3259}
3260
3261/* helper function
3262 * input: alg name, feature name
3263 * return: NULL (alg name was "")
3264 *         ERR_PTR(error) if something goes wrong
3265 *         or the crypto hash ptr, if it worked out ok. */
3266struct crypto_hash *drbd_crypto_alloc_digest_safe(const struct drbd_conf *mdev,
3267                const char *alg, const char *name)
3268{
3269        struct crypto_hash *tfm;
3270
3271        if (!alg[0])
3272                return NULL;
3273
3274        tfm = crypto_alloc_hash(alg, 0, CRYPTO_ALG_ASYNC);
3275        if (IS_ERR(tfm)) {
3276                dev_err(DEV, "Can not allocate \"%s\" as %s (reason: %ld)\n",
3277                        alg, name, PTR_ERR(tfm));
3278                return tfm;
3279        }
3280        return tfm;
3281}
3282
3283static int ignore_remaining_packet(struct drbd_tconn *tconn, struct packet_info *pi)
3284{
3285        void *buffer = tconn->data.rbuf;
3286        int size = pi->size;
3287
3288        while (size) {
3289                int s = min_t(int, size, DRBD_SOCKET_BUFFER_SIZE);
3290                s = drbd_recv(tconn, buffer, s);
3291                if (s <= 0) {
3292                        if (s < 0)
3293                                return s;
3294                        break;
3295                }
3296                size -= s;
3297        }
3298        if (size)
3299                return -EIO;
3300        return 0;
3301}
3302
3303/*
3304 * config_unknown_volume  -  device configuration command for unknown volume
3305 *
3306 * When a device is added to an existing connection, the node on which the
3307 * device is added first will send configuration commands to its peer but the
3308 * peer will not know about the device yet.  It will warn and ignore these
3309 * commands.  Once the device is added on the second node, the second node will
3310 * send the same device configuration commands, but in the other direction.
3311 *
3312 * (We can also end up here if drbd is misconfigured.)
3313 */
3314static int config_unknown_volume(struct drbd_tconn *tconn, struct packet_info *pi)
3315{
3316        conn_warn(tconn, "%s packet received for volume %u, which is not configured locally\n",
3317                  cmdname(pi->cmd), pi->vnr);
3318        return ignore_remaining_packet(tconn, pi);
3319}
3320
3321static int receive_SyncParam(struct drbd_tconn *tconn, struct packet_info *pi)
3322{
3323        struct drbd_conf *mdev;
3324        struct p_rs_param_95 *p;
3325        unsigned int header_size, data_size, exp_max_sz;
3326        struct crypto_hash *verify_tfm = NULL;
3327        struct crypto_hash *csums_tfm = NULL;
3328        struct net_conf *old_net_conf, *new_net_conf = NULL;
3329        struct disk_conf *old_disk_conf = NULL, *new_disk_conf = NULL;
3330        const int apv = tconn->agreed_pro_version;
3331        struct fifo_buffer *old_plan = NULL, *new_plan = NULL;
3332        int fifo_size = 0;
3333        int err;
3334
3335        mdev = vnr_to_mdev(tconn, pi->vnr);
3336        if (!mdev)
3337                return config_unknown_volume(tconn, pi);
3338
3339        exp_max_sz  = apv <= 87 ? sizeof(struct p_rs_param)
3340                    : apv == 88 ? sizeof(struct p_rs_param)
3341                                        + SHARED_SECRET_MAX
3342                    : apv <= 94 ? sizeof(struct p_rs_param_89)
3343                    : /* apv >= 95 */ sizeof(struct p_rs_param_95);
3344
3345        if (pi->size > exp_max_sz) {
3346                dev_err(DEV, "SyncParam packet too long: received %u, expected <= %u bytes\n",
3347                    pi->size, exp_max_sz);
3348                return -EIO;
3349        }
3350
3351        if (apv <= 88) {
3352                header_size = sizeof(struct p_rs_param);
3353                data_size = pi->size - header_size;
3354        } else if (apv <= 94) {
3355                header_size = sizeof(struct p_rs_param_89);
3356                data_size = pi->size - header_size;
3357                D_ASSERT(data_size == 0);
3358        } else {
3359                header_size = sizeof(struct p_rs_param_95);
3360                data_size = pi->size - header_size;
3361                D_ASSERT(data_size == 0);
3362        }
3363
3364        /* initialize verify_alg and csums_alg */
3365        p = pi->data;
3366        memset(p->verify_alg, 0, 2 * SHARED_SECRET_MAX);
3367
3368        err = drbd_recv_all(mdev->tconn, p, header_size);
3369        if (err)
3370                return err;
3371
3372        mutex_lock(&mdev->tconn->conf_update);
3373        old_net_conf = mdev->tconn->net_conf;
3374        if (get_ldev(mdev)) {
3375                new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3376                if (!new_disk_conf) {
3377                        put_ldev(mdev);
3378                        mutex_unlock(&mdev->tconn->conf_update);
3379                        dev_err(DEV, "Allocation of new disk_conf failed\n");
3380                        return -ENOMEM;
3381                }
3382
3383                old_disk_conf = mdev->ldev->disk_conf;
3384                *new_disk_conf = *old_disk_conf;
3385
3386                new_disk_conf->resync_rate = be32_to_cpu(p->resync_rate);
3387        }
3388
3389        if (apv >= 88) {
3390                if (apv == 88) {
3391                        if (data_size > SHARED_SECRET_MAX || data_size == 0) {
3392                                dev_err(DEV, "verify-alg of wrong size, "
3393                                        "peer wants %u, accepting only up to %u byte\n",
3394                                        data_size, SHARED_SECRET_MAX);
3395                                err = -EIO;
3396                                goto reconnect;
3397                        }
3398
3399                        err = drbd_recv_all(mdev->tconn, p->verify_alg, data_size);
3400                        if (err)
3401                                goto reconnect;
3402                        /* we expect NUL terminated string */
3403                        /* but just in case someone tries to be evil */
3404                        D_ASSERT(p->verify_alg[data_size-1] == 0);
3405                        p->verify_alg[data_size-1] = 0;
3406
3407                } else /* apv >= 89 */ {
3408                        /* we still expect NUL terminated strings */
3409                        /* but just in case someone tries to be evil */
3410                        D_ASSERT(p->verify_alg[SHARED_SECRET_MAX-1] == 0);
3411                        D_ASSERT(p->csums_alg[SHARED_SECRET_MAX-1] == 0);
3412                        p->verify_alg[SHARED_SECRET_MAX-1] = 0;
3413                        p->csums_alg[SHARED_SECRET_MAX-1] = 0;
3414                }
3415
3416                if (strcmp(old_net_conf->verify_alg, p->verify_alg)) {
3417                        if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3418                                dev_err(DEV, "Different verify-alg settings. me=\"%s\" peer=\"%s\"\n",
3419                                    old_net_conf->verify_alg, p->verify_alg);
3420                                goto disconnect;
3421                        }
3422                        verify_tfm = drbd_crypto_alloc_digest_safe(mdev,
3423                                        p->verify_alg, "verify-alg");
3424                        if (IS_ERR(verify_tfm)) {
3425                                verify_tfm = NULL;
3426                                goto disconnect;
3427                        }
3428                }
3429
3430                if (apv >= 89 && strcmp(old_net_conf->csums_alg, p->csums_alg)) {
3431                        if (mdev->state.conn == C_WF_REPORT_PARAMS) {
3432                                dev_err(DEV, "Different csums-alg settings. me=\"%s\" peer=\"%s\"\n",
3433                                    old_net_conf->csums_alg, p->csums_alg);
3434                                goto disconnect;
3435                        }
3436                        csums_tfm = drbd_crypto_alloc_digest_safe(mdev,
3437                                        p->csums_alg, "csums-alg");
3438                        if (IS_ERR(csums_tfm)) {
3439                                csums_tfm = NULL;
3440                                goto disconnect;
3441                        }
3442                }
3443
3444                if (apv > 94 && new_disk_conf) {
3445                        new_disk_conf->c_plan_ahead = be32_to_cpu(p->c_plan_ahead);
3446                        new_disk_conf->c_delay_target = be32_to_cpu(p->c_delay_target);
3447                        new_disk_conf->c_fill_target = be32_to_cpu(p->c_fill_target);
3448                        new_disk_conf->c_max_rate = be32_to_cpu(p->c_max_rate);
3449
3450                        fifo_size = (new_disk_conf->c_plan_ahead * 10 * SLEEP_TIME) / HZ;
3451                        if (fifo_size != mdev->rs_plan_s->size) {
3452                                new_plan = fifo_alloc(fifo_size);
3453                                if (!new_plan) {
3454                                        dev_err(DEV, "kmalloc of fifo_buffer failed");
3455                                        put_ldev(mdev);
3456                                        goto disconnect;
3457                                }
3458                        }
3459                }
3460
3461                if (verify_tfm || csums_tfm) {
3462                        new_net_conf = kzalloc(sizeof(struct net_conf), GFP_KERNEL);
3463                        if (!new_net_conf) {
3464                                dev_err(DEV, "Allocation of new net_conf failed\n");
3465                                goto disconnect;
3466                        }
3467
3468                        *new_net_conf = *old_net_conf;
3469
3470                        if (verify_tfm) {
3471                                strcpy(new_net_conf->verify_alg, p->verify_alg);
3472                                new_net_conf->verify_alg_len = strlen(p->verify_alg) + 1;
3473                                crypto_free_hash(mdev->tconn->verify_tfm);
3474                                mdev->tconn->verify_tfm = verify_tfm;
3475                                dev_info(DEV, "using verify-alg: \"%s\"\n", p->verify_alg);
3476                        }
3477                        if (csums_tfm) {
3478                                strcpy(new_net_conf->csums_alg, p->csums_alg);
3479                                new_net_conf->csums_alg_len = strlen(p->csums_alg) + 1;
3480                                crypto_free_hash(mdev->tconn->csums_tfm);
3481                                mdev->tconn->csums_tfm = csums_tfm;
3482                                dev_info(DEV, "using csums-alg: \"%s\"\n", p->csums_alg);
3483                        }
3484                        rcu_assign_pointer(tconn->net_conf, new_net_conf);
3485                }
3486        }
3487
3488        if (new_disk_conf) {
3489                rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3490                put_ldev(mdev);
3491        }
3492
3493        if (new_plan) {
3494                old_plan = mdev->rs_plan_s;
3495                rcu_assign_pointer(mdev->rs_plan_s, new_plan);
3496        }
3497
3498        mutex_unlock(&mdev->tconn->conf_update);
3499        synchronize_rcu();
3500        if (new_net_conf)
3501                kfree(old_net_conf);
3502        kfree(old_disk_conf);
3503        kfree(old_plan);
3504
3505        return 0;
3506
3507reconnect:
3508        if (new_disk_conf) {
3509                put_ldev(mdev);
3510                kfree(new_disk_conf);
3511        }
3512        mutex_unlock(&mdev->tconn->conf_update);
3513        return -EIO;
3514
3515disconnect:
3516        kfree(new_plan);
3517        if (new_disk_conf) {
3518                put_ldev(mdev);
3519                kfree(new_disk_conf);
3520        }
3521        mutex_unlock(&mdev->tconn->conf_update);
3522        /* just for completeness: actually not needed,
3523         * as this is not reached if csums_tfm was ok. */
3524        crypto_free_hash(csums_tfm);
3525        /* but free the verify_tfm again, if csums_tfm did not work out */
3526        crypto_free_hash(verify_tfm);
3527        conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3528        return -EIO;
3529}
3530
3531/* warn if the arguments differ by more than 12.5% */
3532static void warn_if_differ_considerably(struct drbd_conf *mdev,
3533        const char *s, sector_t a, sector_t b)
3534{
3535        sector_t d;
3536        if (a == 0 || b == 0)
3537                return;
3538        d = (a > b) ? (a - b) : (b - a);
3539        if (d > (a>>3) || d > (b>>3))
3540                dev_warn(DEV, "Considerable difference in %s: %llus vs. %llus\n", s,
3541                     (unsigned long long)a, (unsigned long long)b);
3542}
3543
3544static int receive_sizes(struct drbd_tconn *tconn, struct packet_info *pi)
3545{
3546        struct drbd_conf *mdev;
3547        struct p_sizes *p = pi->data;
3548        enum determine_dev_size dd = DS_UNCHANGED;
3549        sector_t p_size, p_usize, my_usize;
3550        int ldsc = 0; /* local disk size changed */
3551        enum dds_flags ddsf;
3552
3553        mdev = vnr_to_mdev(tconn, pi->vnr);
3554        if (!mdev)
3555                return config_unknown_volume(tconn, pi);
3556
3557        p_size = be64_to_cpu(p->d_size);
3558        p_usize = be64_to_cpu(p->u_size);
3559
3560        /* just store the peer's disk size for now.
3561         * we still need to figure out whether we accept that. */
3562        mdev->p_size = p_size;
3563
3564        if (get_ldev(mdev)) {
3565                rcu_read_lock();
3566                my_usize = rcu_dereference(mdev->ldev->disk_conf)->disk_size;
3567                rcu_read_unlock();
3568
3569                warn_if_differ_considerably(mdev, "lower level device sizes",
3570                           p_size, drbd_get_max_capacity(mdev->ldev));
3571                warn_if_differ_considerably(mdev, "user requested size",
3572                                            p_usize, my_usize);
3573
3574                /* if this is the first connect, or an otherwise expected
3575                 * param exchange, choose the minimum */
3576                if (mdev->state.conn == C_WF_REPORT_PARAMS)
3577                        p_usize = min_not_zero(my_usize, p_usize);
3578
3579                /* Never shrink a device with usable data during connect.
3580                   But allow online shrinking if we are connected. */
3581                if (drbd_new_dev_size(mdev, mdev->ldev, p_usize, 0) <
3582                    drbd_get_capacity(mdev->this_bdev) &&
3583                    mdev->state.disk >= D_OUTDATED &&
3584                    mdev->state.conn < C_CONNECTED) {
3585                        dev_err(DEV, "The peer's disk size is too small!\n");
3586                        conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3587                        put_ldev(mdev);
3588                        return -EIO;
3589                }
3590
3591                if (my_usize != p_usize) {
3592                        struct disk_conf *old_disk_conf, *new_disk_conf = NULL;
3593
3594                        new_disk_conf = kzalloc(sizeof(struct disk_conf), GFP_KERNEL);
3595                        if (!new_disk_conf) {
3596                                dev_err(DEV, "Allocation of new disk_conf failed\n");
3597                                put_ldev(mdev);
3598                                return -ENOMEM;
3599                        }
3600
3601                        mutex_lock(&mdev->tconn->conf_update);
3602                        old_disk_conf = mdev->ldev->disk_conf;
3603                        *new_disk_conf = *old_disk_conf;
3604                        new_disk_conf->disk_size = p_usize;
3605
3606                        rcu_assign_pointer(mdev->ldev->disk_conf, new_disk_conf);
3607                        mutex_unlock(&mdev->tconn->conf_update);
3608                        synchronize_rcu();
3609                        kfree(old_disk_conf);
3610
3611                        dev_info(DEV, "Peer sets u_size to %lu sectors\n",
3612                                 (unsigned long)my_usize);
3613                }
3614
3615                put_ldev(mdev);
3616        }
3617
3618        ddsf = be16_to_cpu(p->dds_flags);
3619        if (get_ldev(mdev)) {
3620                dd = drbd_determine_dev_size(mdev, ddsf, NULL);
3621                put_ldev(mdev);
3622                if (dd == DS_ERROR)
3623                        return -EIO;
3624                drbd_md_sync(mdev);
3625        } else {
3626                /* I am diskless, need to accept the peer's size. */
3627                drbd_set_my_capacity(mdev, p_size);
3628        }
3629
3630        mdev->peer_max_bio_size = be32_to_cpu(p->max_bio_size);
3631        drbd_reconsider_max_bio_size(mdev);
3632
3633        if (get_ldev(mdev)) {
3634                if (mdev->ldev->known_size != drbd_get_capacity(mdev->ldev->backing_bdev)) {
3635                        mdev->ldev->known_size = drbd_get_capacity(mdev->ldev->backing_bdev);
3636                        ldsc = 1;
3637                }
3638
3639                put_ldev(mdev);
3640        }
3641
3642        if (mdev->state.conn > C_WF_REPORT_PARAMS) {
3643                if (be64_to_cpu(p->c_size) !=
3644                    drbd_get_capacity(mdev->this_bdev) || ldsc) {
3645                        /* we have different sizes, probably peer
3646                         * needs to know my new size... */
3647                        drbd_send_sizes(mdev, 0, ddsf);
3648                }
3649                if (test_and_clear_bit(RESIZE_PENDING, &mdev->flags) ||
3650                    (dd == DS_GREW && mdev->state.conn == C_CONNECTED)) {
3651                        if (mdev->state.pdsk >= D_INCONSISTENT &&
3652                            mdev->state.disk >= D_INCONSISTENT) {
3653                                if (ddsf & DDSF_NO_RESYNC)
3654                                        dev_info(DEV, "Resync of new storage suppressed with --assume-clean\n");
3655                                else
3656                                        resync_after_online_grow(mdev);
3657                        } else
3658                                set_bit(RESYNC_AFTER_NEG, &mdev->flags);
3659                }
3660        }
3661
3662        return 0;
3663}
3664
3665static int receive_uuids(struct drbd_tconn *tconn, struct packet_info *pi)
3666{
3667        struct drbd_conf *mdev;
3668        struct p_uuids *p = pi->data;
3669        u64 *p_uuid;
3670        int i, updated_uuids = 0;
3671
3672        mdev = vnr_to_mdev(tconn, pi->vnr);
3673        if (!mdev)
3674                return config_unknown_volume(tconn, pi);
3675
3676        p_uuid = kmalloc(sizeof(u64)*UI_EXTENDED_SIZE, GFP_NOIO);
3677        if (!p_uuid) {
3678                dev_err(DEV, "kmalloc of p_uuid failed\n");
3679                return false;
3680        }
3681
3682        for (i = UI_CURRENT; i < UI_EXTENDED_SIZE; i++)
3683                p_uuid[i] = be64_to_cpu(p->uuid[i]);
3684
3685        kfree(mdev->p_uuid);
3686        mdev->p_uuid = p_uuid;
3687
3688        if (mdev->state.conn < C_CONNECTED &&
3689            mdev->state.disk < D_INCONSISTENT &&
3690            mdev->state.role == R_PRIMARY &&
3691            (mdev->ed_uuid & ~((u64)1)) != (p_uuid[UI_CURRENT] & ~((u64)1))) {
3692                dev_err(DEV, "Can only connect to data with current UUID=%016llX\n",
3693                    (unsigned long long)mdev->ed_uuid);
3694                conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3695                return -EIO;
3696        }
3697
3698        if (get_ldev(mdev)) {
3699                int skip_initial_sync =
3700                        mdev->state.conn == C_CONNECTED &&
3701                        mdev->tconn->agreed_pro_version >= 90 &&
3702                        mdev->ldev->md.uuid[UI_CURRENT] == UUID_JUST_CREATED &&
3703                        (p_uuid[UI_FLAGS] & 8);
3704                if (skip_initial_sync) {
3705                        dev_info(DEV, "Accepted new current UUID, preparing to skip initial sync\n");
3706                        drbd_bitmap_io(mdev, &drbd_bmio_clear_n_write,
3707                                        "clear_n_write from receive_uuids",
3708                                        BM_LOCKED_TEST_ALLOWED);
3709                        _drbd_uuid_set(mdev, UI_CURRENT, p_uuid[UI_CURRENT]);
3710                        _drbd_uuid_set(mdev, UI_BITMAP, 0);
3711                        _drbd_set_state(_NS2(mdev, disk, D_UP_TO_DATE, pdsk, D_UP_TO_DATE),
3712                                        CS_VERBOSE, NULL);
3713                        drbd_md_sync(mdev);
3714                        updated_uuids = 1;
3715                }
3716                put_ldev(mdev);
3717        } else if (mdev->state.disk < D_INCONSISTENT &&
3718                   mdev->state.role == R_PRIMARY) {
3719                /* I am a diskless primary, the peer just created a new current UUID
3720                   for me. */
3721                updated_uuids = drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3722        }
3723
3724        /* Before we test for the disk state, we should wait until an eventually
3725           ongoing cluster wide state change is finished. That is important if
3726           we are primary and are detaching from our disk. We need to see the
3727           new disk state... */
3728        mutex_lock(mdev->state_mutex);
3729        mutex_unlock(mdev->state_mutex);
3730        if (mdev->state.conn >= C_CONNECTED && mdev->state.disk < D_INCONSISTENT)
3731                updated_uuids |= drbd_set_ed_uuid(mdev, p_uuid[UI_CURRENT]);
3732
3733        if (updated_uuids)
3734                drbd_print_uuids(mdev, "receiver updated UUIDs to");
3735
3736        return 0;
3737}
3738
3739/**
3740 * convert_state() - Converts the peer's view of the cluster state to our point of view
3741 * @ps:         The state as seen by the peer.
3742 */
3743static union drbd_state convert_state(union drbd_state ps)
3744{
3745        union drbd_state ms;
3746
3747        static enum drbd_conns c_tab[] = {
3748                [C_WF_REPORT_PARAMS] = C_WF_REPORT_PARAMS,
3749                [C_CONNECTED] = C_CONNECTED,
3750
3751                [C_STARTING_SYNC_S] = C_STARTING_SYNC_T,
3752                [C_STARTING_SYNC_T] = C_STARTING_SYNC_S,
3753                [C_DISCONNECTING] = C_TEAR_DOWN, /* C_NETWORK_FAILURE, */
3754                [C_VERIFY_S]       = C_VERIFY_T,
3755                [C_MASK]   = C_MASK,
3756        };
3757
3758        ms.i = ps.i;
3759
3760        ms.conn = c_tab[ps.conn];
3761        ms.peer = ps.role;
3762        ms.role = ps.peer;
3763        ms.pdsk = ps.disk;
3764        ms.disk = ps.pdsk;
3765        ms.peer_isp = (ps.aftr_isp | ps.user_isp);
3766
3767        return ms;
3768}
3769
3770static int receive_req_state(struct drbd_tconn *tconn, struct packet_info *pi)
3771{
3772        struct drbd_conf *mdev;
3773        struct p_req_state *p = pi->data;
3774        union drbd_state mask, val;
3775        enum drbd_state_rv rv;
3776
3777        mdev = vnr_to_mdev(tconn, pi->vnr);
3778        if (!mdev)
3779                return -EIO;
3780
3781        mask.i = be32_to_cpu(p->mask);
3782        val.i = be32_to_cpu(p->val);
3783
3784        if (test_bit(RESOLVE_CONFLICTS, &mdev->tconn->flags) &&
3785            mutex_is_locked(mdev->state_mutex)) {
3786                drbd_send_sr_reply(mdev, SS_CONCURRENT_ST_CHG);
3787                return 0;
3788        }
3789
3790        mask = convert_state(mask);
3791        val = convert_state(val);
3792
3793        rv = drbd_change_state(mdev, CS_VERBOSE, mask, val);
3794        drbd_send_sr_reply(mdev, rv);
3795
3796        drbd_md_sync(mdev);
3797
3798        return 0;
3799}
3800
3801static int receive_req_conn_state(struct drbd_tconn *tconn, struct packet_info *pi)
3802{
3803        struct p_req_state *p = pi->data;
3804        union drbd_state mask, val;
3805        enum drbd_state_rv rv;
3806
3807        mask.i = be32_to_cpu(p->mask);
3808        val.i = be32_to_cpu(p->val);
3809
3810        if (test_bit(RESOLVE_CONFLICTS, &tconn->flags) &&
3811            mutex_is_locked(&tconn->cstate_mutex)) {
3812                conn_send_sr_reply(tconn, SS_CONCURRENT_ST_CHG);
3813                return 0;
3814        }
3815
3816        mask = convert_state(mask);
3817        val = convert_state(val);
3818
3819        rv = conn_request_state(tconn, mask, val, CS_VERBOSE | CS_LOCAL_ONLY | CS_IGN_OUTD_FAIL);
3820        conn_send_sr_reply(tconn, rv);
3821
3822        return 0;
3823}
3824
3825static int receive_state(struct drbd_tconn *tconn, struct packet_info *pi)
3826{
3827        struct drbd_conf *mdev;
3828        struct p_state *p = pi->data;
3829        union drbd_state os, ns, peer_state;
3830        enum drbd_disk_state real_peer_disk;
3831        enum chg_state_flags cs_flags;
3832        int rv;
3833
3834        mdev = vnr_to_mdev(tconn, pi->vnr);
3835        if (!mdev)
3836                return config_unknown_volume(tconn, pi);
3837
3838        peer_state.i = be32_to_cpu(p->state);
3839
3840        real_peer_disk = peer_state.disk;
3841        if (peer_state.disk == D_NEGOTIATING) {
3842                real_peer_disk = mdev->p_uuid[UI_FLAGS] & 4 ? D_INCONSISTENT : D_CONSISTENT;
3843                dev_info(DEV, "real peer disk state = %s\n", drbd_disk_str(real_peer_disk));
3844        }
3845
3846        spin_lock_irq(&mdev->tconn->req_lock);
3847 retry:
3848        os = ns = drbd_read_state(mdev);
3849        spin_unlock_irq(&mdev->tconn->req_lock);
3850
3851        /* If some other part of the code (asender thread, timeout)
3852         * already decided to close the connection again,
3853         * we must not "re-establish" it here. */
3854        if (os.conn <= C_TEAR_DOWN)
3855                return -ECONNRESET;
3856
3857        /* If this is the "end of sync" confirmation, usually the peer disk
3858         * transitions from D_INCONSISTENT to D_UP_TO_DATE. For empty (0 bits
3859         * set) resync started in PausedSyncT, or if the timing of pause-/
3860         * unpause-sync events has been "just right", the peer disk may
3861         * transition from D_CONSISTENT to D_UP_TO_DATE as well.
3862         */
3863        if ((os.pdsk == D_INCONSISTENT || os.pdsk == D_CONSISTENT) &&
3864            real_peer_disk == D_UP_TO_DATE &&
3865            os.conn > C_CONNECTED && os.disk == D_UP_TO_DATE) {
3866                /* If we are (becoming) SyncSource, but peer is still in sync
3867                 * preparation, ignore its uptodate-ness to avoid flapping, it
3868                 * will change to inconsistent once the peer reaches active
3869                 * syncing states.
3870                 * It may have changed syncer-paused flags, however, so we
3871                 * cannot ignore this completely. */
3872                if (peer_state.conn > C_CONNECTED &&
3873                    peer_state.conn < C_SYNC_SOURCE)
3874                        real_peer_disk = D_INCONSISTENT;
3875
3876                /* if peer_state changes to connected at the same time,
3877                 * it explicitly notifies us that it finished resync.
3878                 * Maybe we should finish it up, too? */
3879                else if (os.conn >= C_SYNC_SOURCE &&
3880                         peer_state.conn == C_CONNECTED) {
3881                        if (drbd_bm_total_weight(mdev) <= mdev->rs_failed)
3882                                drbd_resync_finished(mdev);
3883                        return 0;
3884                }
3885        }
3886
3887        /* explicit verify finished notification, stop sector reached. */
3888        if (os.conn == C_VERIFY_T && os.disk == D_UP_TO_DATE &&
3889            peer_state.conn == C_CONNECTED && real_peer_disk == D_UP_TO_DATE) {
3890                ov_out_of_sync_print(mdev);
3891                drbd_resync_finished(mdev);
3892                return 0;
3893        }
3894
3895        /* peer says his disk is inconsistent, while we think it is uptodate,
3896         * and this happens while the peer still thinks we have a sync going on,
3897         * but we think we are already done with the sync.
3898         * We ignore this to avoid flapping pdsk.
3899         * This should not happen, if the peer is a recent version of drbd. */
3900        if (os.pdsk == D_UP_TO_DATE && real_peer_disk == D_INCONSISTENT &&
3901            os.conn == C_CONNECTED && peer_state.conn > C_SYNC_SOURCE)
3902                real_peer_disk = D_UP_TO_DATE;
3903
3904        if (ns.conn == C_WF_REPORT_PARAMS)
3905                ns.conn = C_CONNECTED;
3906
3907        if (peer_state.conn == C_AHEAD)
3908                ns.conn = C_BEHIND;
3909
3910        if (mdev->p_uuid && peer_state.disk >= D_NEGOTIATING &&
3911            get_ldev_if_state(mdev, D_NEGOTIATING)) {
3912                int cr; /* consider resync */
3913
3914                /* if we established a new connection */
3915                cr  = (os.conn < C_CONNECTED);
3916                /* if we had an established connection
3917                 * and one of the nodes newly attaches a disk */
3918                cr |= (os.conn == C_CONNECTED &&
3919                       (peer_state.disk == D_NEGOTIATING ||
3920                        os.disk == D_NEGOTIATING));
3921                /* if we have both been inconsistent, and the peer has been
3922                 * forced to be UpToDate with --overwrite-data */
3923                cr |= test_bit(CONSIDER_RESYNC, &mdev->flags);
3924                /* if we had been plain connected, and the admin requested to
3925                 * start a sync by "invalidate" or "invalidate-remote" */
3926                cr |= (os.conn == C_CONNECTED &&
3927                                (peer_state.conn >= C_STARTING_SYNC_S &&
3928                                 peer_state.conn <= C_WF_BITMAP_T));
3929
3930                if (cr)
3931                        ns.conn = drbd_sync_handshake(mdev, peer_state.role, real_peer_disk);
3932
3933                put_ldev(mdev);
3934                if (ns.conn == C_MASK) {
3935                        ns.conn = C_CONNECTED;
3936                        if (mdev->state.disk == D_NEGOTIATING) {
3937                                drbd_force_state(mdev, NS(disk, D_FAILED));
3938                        } else if (peer_state.disk == D_NEGOTIATING) {
3939                                dev_err(DEV, "Disk attach process on the peer node was aborted.\n");
3940                                peer_state.disk = D_DISKLESS;
3941                                real_peer_disk = D_DISKLESS;
3942                        } else {
3943                                if (test_and_clear_bit(CONN_DRY_RUN, &mdev->tconn->flags))
3944                                        return -EIO;
3945                                D_ASSERT(os.conn == C_WF_REPORT_PARAMS);
3946                                conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3947                                return -EIO;
3948                        }
3949                }
3950        }
3951
3952        spin_lock_irq(&mdev->tconn->req_lock);
3953        if (os.i != drbd_read_state(mdev).i)
3954                goto retry;
3955        clear_bit(CONSIDER_RESYNC, &mdev->flags);
3956        ns.peer = peer_state.role;
3957        ns.pdsk = real_peer_disk;
3958        ns.peer_isp = (peer_state.aftr_isp | peer_state.user_isp);
3959        if ((ns.conn == C_CONNECTED || ns.conn == C_WF_BITMAP_S) && ns.disk == D_NEGOTIATING)
3960                ns.disk = mdev->new_state_tmp.disk;
3961        cs_flags = CS_VERBOSE + (os.conn < C_CONNECTED && ns.conn >= C_CONNECTED ? 0 : CS_HARD);
3962        if (ns.pdsk == D_CONSISTENT && drbd_suspended(mdev) && ns.conn == C_CONNECTED && os.conn < C_CONNECTED &&
3963            test_bit(NEW_CUR_UUID, &mdev->flags)) {
3964                /* Do not allow tl_restart(RESEND) for a rebooted peer. We can only allow this
3965                   for temporal network outages! */
3966                spin_unlock_irq(&mdev->tconn->req_lock);
3967                dev_err(DEV, "Aborting Connect, can not thaw IO with an only Consistent peer\n");
3968                tl_clear(mdev->tconn);
3969                drbd_uuid_new_current(mdev);
3970                clear_bit(NEW_CUR_UUID, &mdev->flags);
3971                conn_request_state(mdev->tconn, NS2(conn, C_PROTOCOL_ERROR, susp, 0), CS_HARD);
3972                return -EIO;
3973        }
3974        rv = _drbd_set_state(mdev, ns, cs_flags, NULL);
3975        ns = drbd_read_state(mdev);
3976        spin_unlock_irq(&mdev->tconn->req_lock);
3977
3978        if (rv < SS_SUCCESS) {
3979                conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD);
3980                return -EIO;
3981        }
3982
3983        if (os.conn > C_WF_REPORT_PARAMS) {
3984                if (ns.conn > C_CONNECTED && peer_state.conn <= C_CONNECTED &&
3985                    peer_state.disk != D_NEGOTIATING ) {
3986                        /* we want resync, peer has not yet decided to sync... */
3987                        /* Nowadays only used when forcing a node into primary role and
3988                           setting its disk to UpToDate with that */
3989                        drbd_send_uuids(mdev);
3990                        drbd_send_current_state(mdev);
3991                }
3992        }
3993
3994        clear_bit(DISCARD_MY_DATA, &mdev->flags);
3995
3996        drbd_md_sync(mdev); /* update connected indicator, la_size_sect, ... */
3997
3998        return 0;
3999}
4000
4001static int receive_sync_uuid(struct drbd_tconn *tconn, struct packet_info *pi)
4002{
4003        struct drbd_conf *mdev;
4004        struct p_rs_uuid *p = pi->data;
4005
4006        mdev = vnr_to_mdev(tconn, pi->vnr);
4007        if (!mdev)
4008                return -EIO;
4009
4010        wait_event(mdev->misc_wait,
4011                   mdev->state.conn == C_WF_SYNC_UUID ||
4012                   mdev->state.conn == C_BEHIND ||
4013                   mdev->state.conn < C_CONNECTED ||
4014                   mdev->state.disk < D_NEGOTIATING);
4015
4016        /* D_ASSERT( mdev->state.conn == C_WF_SYNC_UUID ); */
4017
4018        /* Here the _drbd_uuid_ functions are right, current should
4019           _not_ be rotated into the history */
4020        if (get_ldev_if_state(mdev, D_NEGOTIATING)) {
4021                _drbd_uuid_set(mdev, UI_CURRENT, be64_to_cpu(p->uuid));
4022                _drbd_uuid_set(mdev, UI_BITMAP, 0UL);
4023
4024                drbd_print_uuids(mdev, "updated sync uuid");
4025                drbd_start_resync(mdev, C_SYNC_TARGET);
4026
4027                put_ldev(mdev);
4028        } else
4029                dev_err(DEV, "Ignoring SyncUUID packet!\n");
4030
4031        return 0;
4032}
4033
4034/**
4035 * receive_bitmap_plain
4036 *
4037 * Return 0 when done, 1 when another iteration is needed, and a negative error
4038 * code upon failure.
4039 */
4040static int
4041receive_bitmap_plain(struct drbd_conf *mdev, unsigned int size,
4042                     unsigned long *p, struct bm_xfer_ctx *c)
4043{
4044        unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE -
4045                                 drbd_header_size(mdev->tconn);
4046        unsigned int num_words = min_t(size_t, data_size / sizeof(*p),
4047                                       c->bm_words - c->word_offset);
4048        unsigned int want = num_words * sizeof(*p);
4049        int err;
4050
4051        if (want != size) {
4052                dev_err(DEV, "%s:want (%u) != size (%u)\n", __func__, want, size);
4053                return -EIO;
4054        }
4055        if (want == 0)
4056                return 0;
4057        err = drbd_recv_all(mdev->tconn, p, want);
4058        if (err)
4059                return err;
4060
4061        drbd_bm_merge_lel(mdev, c->word_offset, num_words, p);
4062
4063        c->word_offset += num_words;
4064        c->bit_offset = c->word_offset * BITS_PER_LONG;
4065        if (c->bit_offset > c->bm_bits)
4066                c->bit_offset = c->bm_bits;
4067
4068        return 1;
4069}
4070
4071static enum drbd_bitmap_code dcbp_get_code(struct p_compressed_bm *p)
4072{
4073        return (enum drbd_bitmap_code)(p->encoding & 0x0f);
4074}
4075
4076static int dcbp_get_start(struct p_compressed_bm *p)
4077{
4078        return (p->encoding & 0x80) != 0;
4079}
4080
4081static int dcbp_get_pad_bits(struct p_compressed_bm *p)
4082{
4083        return (p->encoding >> 4) & 0x7;
4084}
4085
4086/**
4087 * recv_bm_rle_bits
4088 *
4089 * Return 0 when done, 1 when another iteration is needed, and a negative error
4090 * code upon failure.
4091 */
4092static int
4093recv_bm_rle_bits(struct drbd_conf *mdev,
4094                struct p_compressed_bm *p,
4095                 struct bm_xfer_ctx *c,
4096                 unsigned int len)
4097{
4098        struct bitstream bs;
4099        u64 look_ahead;
4100        u64 rl;
4101        u64 tmp;
4102        unsigned long s = c->bit_offset;
4103        unsigned long e;
4104        int toggle = dcbp_get_start(p);
4105        int have;
4106        int bits;
4107
4108        bitstream_init(&bs, p->code, len, dcbp_get_pad_bits(p));
4109
4110        bits = bitstream_get_bits(&bs, &look_ahead, 64);
4111        if (bits < 0)
4112                return -EIO;
4113
4114        for (have = bits; have > 0; s += rl, toggle = !toggle) {
4115                bits = vli_decode_bits(&rl, look_ahead);
4116                if (bits <= 0)
4117                        return -EIO;
4118
4119                if (toggle) {
4120                        e = s + rl -1;
4121                        if (e >= c->bm_bits) {
4122                                dev_err(DEV, "bitmap overflow (e:%lu) while decoding bm RLE packet\n", e);
4123                                return -EIO;
4124                        }
4125                        _drbd_bm_set_bits(mdev, s, e);
4126                }
4127
4128                if (have < bits) {
4129                        dev_err(DEV, "bitmap decoding error: h:%d b:%d la:0x%08llx l:%u/%u\n",
4130                                have, bits, look_ahead,
4131                                (unsigned int)(bs.cur.b - p->code),
4132                                (unsigned int)bs.buf_len);
4133                        return -EIO;
4134                }
4135                look_ahead >>= bits;
4136                have -= bits;
4137
4138                bits = bitstream_get_bits(&bs, &tmp, 64 - have);
4139                if (bits < 0)
4140                        return -EIO;
4141                look_ahead |= tmp << have;
4142                have += bits;
4143        }
4144
4145        c->bit_offset = s;
4146        bm_xfer_ctx_bit_to_word_offset(c);
4147
4148        return (s != c->bm_bits);
4149}
4150
4151/**
4152 * decode_bitmap_c
4153 *
4154 * Return 0 when done, 1 when another iteration is needed, and a negative error
4155 * code upon failure.
4156 */
4157static int
4158decode_bitmap_c(struct drbd_conf *mdev,
4159                struct p_compressed_bm *p,
4160                struct bm_xfer_ctx *c,
4161                unsigned int len)
4162{
4163        if (dcbp_get_code(p) == RLE_VLI_Bits)
4164                return recv_bm_rle_bits(mdev, p, c, len - sizeof(*p));
4165
4166        /* other variants had been implemented for evaluation,
4167         * but have been dropped as this one turned out to be "best"
4168         * during all our tests. */
4169
4170        dev_err(DEV, "receive_bitmap_c: unknown encoding %u\n", p->encoding);
4171        conn_request_state(mdev->tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4172        return -EIO;
4173}
4174
4175void INFO_bm_xfer_stats(struct drbd_conf *mdev,
4176                const char *direction, struct bm_xfer_ctx *c)
4177{
4178        /* what would it take to transfer it "plaintext" */
4179        unsigned int header_size = drbd_header_size(mdev->tconn);
4180        unsigned int data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
4181        unsigned int plain =
4182                header_size * (DIV_ROUND_UP(c->bm_words, data_size) + 1) +
4183                c->bm_words * sizeof(unsigned long);
4184        unsigned int total = c->bytes[0] + c->bytes[1];
4185        unsigned int r;
4186
4187        /* total can not be zero. but just in case: */
4188        if (total == 0)
4189                return;
4190
4191        /* don't report if not compressed */
4192        if (total >= plain)
4193                return;
4194
4195        /* total < plain. check for overflow, still */
4196        r = (total > UINT_MAX/1000) ? (total / (plain/1000))
4197                                    : (1000 * total / plain);
4198
4199        if (r > 1000)
4200                r = 1000;
4201
4202        r = 1000 - r;
4203        dev_info(DEV, "%s bitmap stats [Bytes(packets)]: plain %u(%u), RLE %u(%u), "
4204             "total %u; compression: %u.%u%%\n",
4205                        direction,
4206                        c->bytes[1], c->packets[1],
4207                        c->bytes[0], c->packets[0],
4208                        total, r/10, r % 10);
4209}
4210
4211/* Since we are processing the bitfield from lower addresses to higher,
4212   it does not matter if the process it in 32 bit chunks or 64 bit
4213   chunks as long as it is little endian. (Understand it as byte stream,
4214   beginning with the lowest byte...) If we would use big endian
4215   we would need to process it from the highest address to the lowest,
4216   in order to be agnostic to the 32 vs 64 bits issue.
4217
4218   returns 0 on failure, 1 if we successfully received it. */
4219static int receive_bitmap(struct drbd_tconn *tconn, struct packet_info *pi)
4220{
4221        struct drbd_conf *mdev;
4222        struct bm_xfer_ctx c;
4223        int err;
4224
4225        mdev = vnr_to_mdev(tconn, pi->vnr);
4226        if (!mdev)
4227                return -EIO;
4228
4229        drbd_bm_lock(mdev, "receive bitmap", BM_LOCKED_SET_ALLOWED);
4230        /* you are supposed to send additional out-of-sync information
4231         * if you actually set bits during this phase */
4232
4233        c = (struct bm_xfer_ctx) {
4234                .bm_bits = drbd_bm_bits(mdev),
4235                .bm_words = drbd_bm_words(mdev),
4236        };
4237
4238        for(;;) {
4239                if (pi->cmd == P_BITMAP)
4240                        err = receive_bitmap_plain(mdev, pi->size, pi->data, &c);
4241                else if (pi->cmd == P_COMPRESSED_BITMAP) {
4242                        /* MAYBE: sanity check that we speak proto >= 90,
4243                         * and the feature is enabled! */
4244                        struct p_compressed_bm *p = pi->data;
4245
4246                        if (pi->size > DRBD_SOCKET_BUFFER_SIZE - drbd_header_size(tconn)) {
4247                                dev_err(DEV, "ReportCBitmap packet too large\n");
4248                                err = -EIO;
4249                                goto out;
4250                        }
4251                        if (pi->size <= sizeof(*p)) {
4252                                dev_err(DEV, "ReportCBitmap packet too small (l:%u)\n", pi->size);
4253                                err = -EIO;
4254                                goto out;
4255                        }
4256                        err = drbd_recv_all(mdev->tconn, p, pi->size);
4257                        if (err)
4258                               goto out;
4259                        err = decode_bitmap_c(mdev, p, &c, pi->size);
4260                } else {
4261                        dev_warn(DEV, "receive_bitmap: cmd neither ReportBitMap nor ReportCBitMap (is 0x%x)", pi->cmd);
4262                        err = -EIO;
4263                        goto out;
4264                }
4265
4266                c.packets[pi->cmd == P_BITMAP]++;
4267                c.bytes[pi->cmd == P_BITMAP] += drbd_header_size(tconn) + pi->size;
4268
4269                if (err <= 0) {
4270                        if (err < 0)
4271                                goto out;
4272                        break;
4273                }
4274                err = drbd_recv_header(mdev->tconn, pi);
4275                if (err)
4276                        goto out;
4277        }
4278
4279        INFO_bm_xfer_stats(mdev, "receive", &c);
4280
4281        if (mdev->state.conn == C_WF_BITMAP_T) {
4282                enum drbd_state_rv rv;
4283
4284                err = drbd_send_bitmap(mdev);
4285                if (err)
4286                        goto out;
4287                /* Omit CS_ORDERED with this state transition to avoid deadlocks. */
4288                rv = _drbd_request_state(mdev, NS(conn, C_WF_SYNC_UUID), CS_VERBOSE);
4289                D_ASSERT(rv == SS_SUCCESS);
4290        } else if (mdev->state.conn != C_WF_BITMAP_S) {
4291                /* admin may have requested C_DISCONNECTING,
4292                 * other threads may have noticed network errors */
4293                dev_info(DEV, "unexpected cstate (%s) in receive_bitmap\n",
4294                    drbd_conn_str(mdev->state.conn));
4295        }
4296        err = 0;
4297
4298 out:
4299        drbd_bm_unlock(mdev);
4300        if (!err && mdev->state.conn == C_WF_BITMAP_S)
4301                drbd_start_resync(mdev, C_SYNC_SOURCE);
4302        return err;
4303}
4304
4305static int receive_skip(struct drbd_tconn *tconn, struct packet_info *pi)
4306{
4307        conn_warn(tconn, "skipping unknown optional packet type %d, l: %d!\n",
4308                 pi->cmd, pi->size);
4309
4310        return ignore_remaining_packet(tconn, pi);
4311}
4312
4313static int receive_UnplugRemote(struct drbd_tconn *tconn, struct packet_info *pi)
4314{
4315        /* Make sure we've acked all the TCP data associated
4316         * with the data requests being unplugged */
4317        drbd_tcp_quickack(tconn->data.socket);
4318
4319        return 0;
4320}
4321
4322static int receive_out_of_sync(struct drbd_tconn *tconn, struct packet_info *pi)
4323{
4324        struct drbd_conf *mdev;
4325        struct p_block_desc *p = pi->data;
4326
4327        mdev = vnr_to_mdev(tconn, pi->vnr);
4328        if (!mdev)
4329                return -EIO;
4330
4331        switch (mdev->state.conn) {
4332        case C_WF_SYNC_UUID:
4333        case C_WF_BITMAP_T:
4334        case C_BEHIND:
4335                        break;
4336        default:
4337                dev_err(DEV, "ASSERT FAILED cstate = %s, expected: WFSyncUUID|WFBitMapT|Behind\n",
4338                                drbd_conn_str(mdev->state.conn));
4339        }
4340
4341        drbd_set_out_of_sync(mdev, be64_to_cpu(p->sector), be32_to_cpu(p->blksize));
4342
4343        return 0;
4344}
4345
4346struct data_cmd {
4347        int expect_payload;
4348        size_t pkt_size;
4349        int (*fn)(struct drbd_tconn *, struct packet_info *);
4350};
4351
4352static struct data_cmd drbd_cmd_handler[] = {
4353        [P_DATA]            = { 1, sizeof(struct p_data), receive_Data },
4354        [P_DATA_REPLY]      = { 1, sizeof(struct p_data), receive_DataReply },
4355        [P_RS_DATA_REPLY]   = { 1, sizeof(struct p_data), receive_RSDataReply } ,
4356        [P_BARRIER]         = { 0, sizeof(struct p_barrier), receive_Barrier } ,
4357        [P_BITMAP]          = { 1, 0, receive_bitmap } ,
4358        [P_COMPRESSED_BITMAP] = { 1, 0, receive_bitmap } ,
4359        [P_UNPLUG_REMOTE]   = { 0, 0, receive_UnplugRemote },
4360        [P_DATA_REQUEST]    = { 0, sizeof(struct p_block_req), receive_DataRequest },
4361        [P_RS_DATA_REQUEST] = { 0, sizeof(struct p_block_req), receive_DataRequest },
4362        [P_SYNC_PARAM]      = { 1, 0, receive_SyncParam },
4363        [P_SYNC_PARAM89]    = { 1, 0, receive_SyncParam },
4364        [P_PROTOCOL]        = { 1, sizeof(struct p_protocol), receive_protocol },
4365        [P_UUIDS]           = { 0, sizeof(struct p_uuids), receive_uuids },
4366        [P_SIZES]           = { 0, sizeof(struct p_sizes), receive_sizes },
4367        [P_STATE]           = { 0, sizeof(struct p_state), receive_state },
4368        [P_STATE_CHG_REQ]   = { 0, sizeof(struct p_req_state), receive_req_state },
4369        [P_SYNC_UUID]       = { 0, sizeof(struct p_rs_uuid), receive_sync_uuid },
4370        [P_OV_REQUEST]      = { 0, sizeof(struct p_block_req), receive_DataRequest },
4371        [P_OV_REPLY]        = { 1, sizeof(struct p_block_req), receive_DataRequest },
4372        [P_CSUM_RS_REQUEST] = { 1, sizeof(struct p_block_req), receive_DataRequest },
4373        [P_DELAY_PROBE]     = { 0, sizeof(struct p_delay_probe93), receive_skip },
4374        [P_OUT_OF_SYNC]     = { 0, sizeof(struct p_block_desc), receive_out_of_sync },
4375        [P_CONN_ST_CHG_REQ] = { 0, sizeof(struct p_req_state), receive_req_conn_state },
4376        [P_PROTOCOL_UPDATE] = { 1, sizeof(struct p_protocol), receive_protocol },
4377};
4378
4379static void drbdd(struct drbd_tconn *tconn)
4380{
4381        struct packet_info pi;
4382        size_t shs; /* sub header size */
4383        int err;
4384
4385        while (get_t_state(&tconn->receiver) == RUNNING) {
4386                struct data_cmd *cmd;
4387
4388                drbd_thread_current_set_cpu(&tconn->receiver);
4389                if (drbd_recv_header(tconn, &pi))
4390                        goto err_out;
4391
4392                cmd = &drbd_cmd_handler[pi.cmd];
4393                if (unlikely(pi.cmd >= ARRAY_SIZE(drbd_cmd_handler) || !cmd->fn)) {
4394                        conn_err(tconn, "Unexpected data packet %s (0x%04x)",
4395                                 cmdname(pi.cmd), pi.cmd);
4396                        goto err_out;
4397                }
4398
4399                shs = cmd->pkt_size;
4400                if (pi.size > shs && !cmd->expect_payload) {
4401                        conn_err(tconn, "No payload expected %s l:%d\n",
4402                                 cmdname(pi.cmd), pi.size);
4403                        goto err_out;
4404                }
4405
4406                if (shs) {
4407                        err = drbd_recv_all_warn(tconn, pi.data, shs);
4408                        if (err)
4409                                goto err_out;
4410                        pi.size -= shs;
4411                }
4412
4413                err = cmd->fn(tconn, &pi);
4414                if (err) {
4415                        conn_err(tconn, "error receiving %s, e: %d l: %d!\n",
4416                                 cmdname(pi.cmd), err, pi.size);
4417                        goto err_out;
4418                }
4419        }
4420        return;
4421
4422    err_out:
4423        conn_request_state(tconn, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
4424}
4425
4426void conn_flush_workqueue(struct drbd_tconn *tconn)
4427{
4428        struct drbd_wq_barrier barr;
4429
4430        barr.w.cb = w_prev_work_done;
4431        barr.w.tconn = tconn;
4432        init_completion(&barr.done);
4433        drbd_queue_work(&tconn->sender_work, &barr.w);
4434        wait_for_completion(&barr.done);
4435}
4436
4437static void conn_disconnect(struct drbd_tconn *tconn)
4438{
4439        struct drbd_conf *mdev;
4440        enum drbd_conns oc;
4441        int vnr;
4442
4443        if (tconn->cstate == C_STANDALONE)
4444                return;
4445
4446        /* We are about to start the cleanup after connection loss.
4447         * Make sure drbd_make_request knows about that.
4448         * Usually we should be in some network failure state already,
4449         * but just in case we are not, we fix it up here.
4450         */
4451        conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
4452
4453        /* asender does not clean up anything. it must not interfere, either */
4454        drbd_thread_stop(&tconn->asender);
4455        drbd_free_sock(tconn);
4456
4457        rcu_read_lock();
4458        idr_for_each_entry(&tconn->volumes, mdev, vnr) {
4459                kref_get(&mdev->kref);
4460                rcu_read_unlock();
4461                drbd_disconnected(mdev);
4462                kref_put(&mdev->kref, &drbd_minor_destroy);
4463                rcu_read_lock();
4464        }
4465        rcu_read_unlock();
4466
4467        if (!list_empty(&tconn->current_epoch->list))
4468                conn_err(tconn, "ASSERTION FAILED: tconn->current_epoch->list not empty\n");
4469        /* ok, no more ee's on the fly, it is safe to reset the epoch_size */
4470        atomic_set(&tconn->current_epoch->epoch_size, 0);
4471        tconn->send.seen_any_write_yet = false;
4472
4473        conn_info(tconn, "Connection closed\n");
4474
4475        if (conn_highest_role(tconn) == R_PRIMARY && conn_highest_pdsk(tconn) >= D_UNKNOWN)
4476                conn_try_outdate_peer_async(tconn);
4477
4478        spin_lock_irq(&tconn->req_lock);
4479        oc = tconn->cstate;
4480        if (oc >= C_UNCONNECTED)
4481                _conn_request_state(tconn, NS(conn, C_UNCONNECTED), CS_VERBOSE);
4482
4483        spin_unlock_irq(&tconn->req_lock);
4484
4485        if (oc == C_DISCONNECTING)
4486                conn_request_state(tconn, NS(conn, C_STANDALONE), CS_VERBOSE | CS_HARD);
4487}
4488
4489static int drbd_disconnected(struct drbd_conf *mdev)
4490{
4491        unsigned int i;
4492
4493        /* wait for current activity to cease. */
4494        spin_lock_irq(&mdev->tconn->req_lock);
4495        _drbd_wait_ee_list_empty(mdev, &mdev->active_ee);
4496        _drbd_wait_ee_list_empty(mdev, &mdev->sync_ee);
4497        _drbd_wait_ee_list_empty(mdev, &mdev->read_ee);
4498        spin_unlock_irq(&mdev->tconn->req_lock);
4499
4500        /* We do not have data structures that would allow us to
4501         * get the rs_pending_cnt down to 0 again.
4502         *  * On C_SYNC_TARGET we do not have any data structures describing
4503         *    the pending RSDataRequest's we have sent.
4504         *  * On C_SYNC_SOURCE there is no data structure that tracks
4505         *    the P_RS_DATA_REPLY blocks that we sent to the SyncTarget.
4506         *  And no, it is not the sum of the reference counts in the
4507         *  resync_LRU. The resync_LRU tracks the whole operation including
4508         *  the disk-IO, while the rs_pending_cnt only tracks the blocks
4509         *  on the fly. */
4510        drbd_rs_cancel_all(mdev);
4511        mdev->rs_total = 0;
4512        mdev->rs_failed = 0;
4513        atomic_set(&mdev->rs_pending_cnt, 0);
4514        wake_up(&mdev->misc_wait);
4515
4516        del_timer_sync(&mdev->resync_timer);
4517        resync_timer_fn((unsigned long)mdev);
4518
4519        /* wait for all w_e_end_data_req, w_e_end_rsdata_req, w_send_barrier,
4520         * w_make_resync_request etc. which may still be on the worker queue
4521         * to be "canceled" */
4522        drbd_flush_workqueue(mdev);
4523
4524        drbd_finish_peer_reqs(mdev);
4525
4526        /* This second workqueue flush is necessary, since drbd_finish_peer_reqs()
4527           might have issued a work again. The one before drbd_finish_peer_reqs() is
4528           necessary to reclain net_ee in drbd_finish_peer_reqs(). */
4529        drbd_flush_workqueue(mdev);
4530
4531        /* need to do it again, drbd_finish_peer_reqs() may have populated it
4532         * again via drbd_try_clear_on_disk_bm(). */
4533        drbd_rs_cancel_all(mdev);
4534
4535        kfree(mdev->p_uuid);
4536        mdev->p_uuid = NULL;
4537
4538        if (!drbd_suspended(mdev))
4539                tl_clear(mdev->tconn);
4540
4541        drbd_md_sync(mdev);
4542
4543        /* serialize with bitmap writeout triggered by the state change,
4544         * if any. */
4545        wait_event(mdev->misc_wait, !test_bit(BITMAP_IO, &mdev->flags));
4546
4547        /* tcp_close and release of sendpage pages can be deferred.  I don't
4548         * want to use SO_LINGER, because apparently it can be deferred for
4549         * more than 20 seconds (longest time I checked).
4550         *
4551         * Actually we don't care for exactly when the network stack does its
4552         * put_page(), but release our reference on these pages right here.
4553         */
4554        i = drbd_free_peer_reqs(mdev, &mdev->net_ee);
4555        if (i)
4556                dev_info(DEV, "net_ee not empty, killed %u entries\n", i);
4557        i = atomic_read(&mdev->pp_in_use_by_net);
4558        if (i)
4559                dev_info(DEV, "pp_in_use_by_net = %d, expected 0\n", i);
4560        i = atomic_read(&mdev->pp_in_use);
4561        if (i)
4562                dev_info(DEV, "pp_in_use = %d, expected 0\n", i);
4563
4564        D_ASSERT(list_empty(&mdev->read_ee));
4565        D_ASSERT(list_empty(&mdev->active_ee));
4566        D_ASSERT(list_empty(&mdev->sync_ee));
4567        D_ASSERT(list_empty(&mdev->done_ee));
4568
4569        return 0;
4570}
4571
4572/*
4573 * We support PRO_VERSION_MIN to PRO_VERSION_MAX. The protocol version
4574 * we can agree on is stored in agreed_pro_version.
4575 *
4576 * feature flags and the reserved array should be enough room for future
4577 * enhancements of the handshake protocol, and possible plugins...
4578 *
4579 * for now, they are expected to be zero, but ignored.
4580 */
4581static int drbd_send_features(struct drbd_tconn *tconn)
4582{
4583        struct drbd_socket *sock;
4584        struct p_connection_features *p;
4585
4586        sock = &tconn->data;
4587        p = conn_prepare_command(tconn, sock);
4588        if (!p)
4589                return -EIO;
4590        memset(p, 0, sizeof(*p));
4591        p->protocol_min = cpu_to_be32(PRO_VERSION_MIN);
4592        p->protocol_max = cpu_to_be32(PRO_VERSION_MAX);
4593        return conn_send_command(tconn, sock, P_CONNECTION_FEATURES, sizeof(*p), NULL, 0);
4594}
4595
4596/*
4597 * return values:
4598 *   1 yes, we have a valid connection
4599 *   0 oops, did not work out, please try again
4600 *  -1 peer talks different language,
4601 *     no point in trying again, please go standalone.
4602 */
4603static int drbd_do_features(struct drbd_tconn *tconn)
4604{
4605        /* ASSERT current == tconn->receiver ... */
4606        struct p_connection_features *p;
4607        const int expect = sizeof(struct p_connection_features);
4608        struct packet_info pi;
4609        int err;
4610
4611        err = drbd_send_features(tconn);
4612        if (err)
4613                return 0;
4614
4615        err = drbd_recv_header(tconn, &pi);
4616        if (err)
4617                return 0;
4618
4619        if (pi.cmd != P_CONNECTION_FEATURES) {
4620                conn_err(tconn, "expected ConnectionFeatures packet, received: %s (0x%04x)\n",
4621                         cmdname(pi.cmd), pi.cmd);
4622                return -1;
4623        }
4624
4625        if (pi.size != expect) {
4626                conn_err(tconn, "expected ConnectionFeatures length: %u, received: %u\n",
4627                     expect, pi.size);
4628                return -1;
4629        }
4630
4631        p = pi.data;
4632        err = drbd_recv_all_warn(tconn, p, expect);
4633        if (err)
4634                return 0;
4635
4636        p->protocol_min = be32_to_cpu(p->protocol_min);
4637        p->protocol_max = be32_to_cpu(p->protocol_max);
4638        if (p->protocol_max == 0)
4639                p->protocol_max = p->protocol_min;
4640
4641        if (PRO_VERSION_MAX < p->protocol_min ||
4642            PRO_VERSION_MIN > p->protocol_max)
4643                goto incompat;
4644
4645        tconn->agreed_pro_version = min_t(int, PRO_VERSION_MAX, p->protocol_max);
4646
4647        conn_info(tconn, "Handshake successful: "
4648             "Agreed network protocol version %d\n", tconn->agreed_pro_version);
4649
4650        return 1;
4651
4652 incompat:
4653        conn_err(tconn, "incompatible DRBD dialects: "
4654            "I support %d-%d, peer supports %d-%d\n",
4655            PRO_VERSION_MIN, PRO_VERSION_MAX,
4656            p->protocol_min, p->protocol_max);
4657        return -1;
4658}
4659
4660#if !defined(CONFIG_CRYPTO_HMAC) && !defined(CONFIG_CRYPTO_HMAC_MODULE)
4661static int drbd_do_auth(struct drbd_tconn *tconn)
4662{
4663        conn_err(tconn, "This kernel was build without CONFIG_CRYPTO_HMAC.\n");
4664        conn_err(tconn, "You need to disable 'cram-hmac-alg' in drbd.conf.\n");
4665        return -1;
4666}
4667#else
4668#define CHALLENGE_LEN 64
4669
4670/* Return value:
4671        1 - auth succeeded,
4672        0 - failed, try again (network error),
4673        -1 - auth failed, don't try again.
4674*/
4675
4676static int drbd_do_auth(struct drbd_tconn *tconn)
4677{
4678        struct drbd_socket *sock;
4679        char my_challenge[CHALLENGE_LEN];  /* 64 Bytes... */
4680        struct scatterlist sg;
4681        char *response = NULL;
4682        char *right_response = NULL;
4683        char *peers_ch = NULL;
4684        unsigned int key_len;
4685        char secret[SHARED_SECRET_MAX]; /* 64 byte */
4686        unsigned int resp_size;
4687        struct hash_desc desc;
4688        struct packet_info pi;
4689        struct net_conf *nc;
4690        int err, rv;
4691
4692        /* FIXME: Put the challenge/response into the preallocated socket buffer.  */
4693
4694        rcu_read_lock();
4695        nc = rcu_dereference(tconn->net_conf);
4696        key_len = strlen(nc->shared_secret);
4697        memcpy(secret, nc->shared_secret, key_len);
4698        rcu_read_unlock();
4699
4700        desc.tfm = tconn->cram_hmac_tfm;
4701        desc.flags = 0;
4702
4703        rv = crypto_hash_setkey(tconn->cram_hmac_tfm, (u8 *)secret, key_len);
4704        if (rv) {
4705                conn_err(tconn, "crypto_hash_setkey() failed with %d\n", rv);
4706                rv = -1;
4707                goto fail;
4708        }
4709
4710        get_random_bytes(my_challenge, CHALLENGE_LEN);
4711
4712        sock = &tconn->data;
4713        if (!conn_prepare_command(tconn, sock)) {
4714                rv = 0;
4715                goto fail;
4716        }
4717        rv = !conn_send_command(tconn, sock, P_AUTH_CHALLENGE, 0,
4718                                my_challenge, CHALLENGE_LEN);
4719        if (!rv)
4720                goto fail;
4721
4722        err = drbd_recv_header(tconn, &pi);
4723        if (err) {
4724                rv = 0;
4725                goto fail;
4726        }
4727
4728        if (pi.cmd != P_AUTH_CHALLENGE) {
4729                conn_err(tconn, "expected AuthChallenge packet, received: %s (0x%04x)\n",
4730                         cmdname(pi.cmd), pi.cmd);
4731                rv = 0;
4732                goto fail;
4733        }
4734
4735        if (pi.size > CHALLENGE_LEN * 2) {
4736                conn_err(tconn, "expected AuthChallenge payload too big.\n");
4737                rv = -1;
4738                goto fail;
4739        }
4740
4741        peers_ch = kmalloc(pi.size, GFP_NOIO);
4742        if (peers_ch == NULL) {
4743                conn_err(tconn, "kmalloc of peers_ch failed\n");
4744                rv = -1;
4745                goto fail;
4746        }
4747
4748        err = drbd_recv_all_warn(tconn, peers_ch, pi.size);
4749        if (err) {
4750                rv = 0;
4751                goto fail;
4752        }
4753
4754        resp_size = crypto_hash_digestsize(tconn->cram_hmac_tfm);
4755        response = kmalloc(resp_size, GFP_NOIO);
4756        if (response == NULL) {
4757                conn_err(tconn, "kmalloc of response failed\n");
4758                rv = -1;
4759                goto fail;
4760        }
4761
4762        sg_init_table(&sg, 1);
4763        sg_set_buf(&sg, peers_ch, pi.size);
4764
4765        rv = crypto_hash_digest(&desc, &sg, sg.length, response);
4766        if (rv) {
4767                conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4768                rv = -1;
4769                goto fail;
4770        }
4771
4772        if (!conn_prepare_command(tconn, sock)) {
4773                rv = 0;
4774                goto fail;
4775        }
4776        rv = !conn_send_command(tconn, sock, P_AUTH_RESPONSE, 0,
4777                                response, resp_size);
4778        if (!rv)
4779                goto fail;
4780
4781        err = drbd_recv_header(tconn, &pi);
4782        if (err) {
4783                rv = 0;
4784                goto fail;
4785        }
4786
4787        if (pi.cmd != P_AUTH_RESPONSE) {
4788                conn_err(tconn, "expected AuthResponse packet, received: %s (0x%04x)\n",
4789                         cmdname(pi.cmd), pi.cmd);
4790                rv = 0;
4791                goto fail;
4792        }
4793
4794        if (pi.size != resp_size) {
4795                conn_err(tconn, "expected AuthResponse payload of wrong size\n");
4796                rv = 0;
4797                goto fail;
4798        }
4799
4800        err = drbd_recv_all_warn(tconn, response , resp_size);
4801        if (err) {
4802                rv = 0;
4803                goto fail;
4804        }
4805
4806        right_response = kmalloc(resp_size, GFP_NOIO);
4807        if (right_response == NULL) {
4808                conn_err(tconn, "kmalloc of right_response failed\n");
4809                rv = -1;
4810                goto fail;
4811        }
4812
4813        sg_set_buf(&sg, my_challenge, CHALLENGE_LEN);
4814
4815        rv = crypto_hash_digest(&desc, &sg, sg.length, right_response);
4816        if (rv) {
4817                conn_err(tconn, "crypto_hash_digest() failed with %d\n", rv);
4818                rv = -1;
4819                goto fail;
4820        }
4821
4822        rv = !memcmp(response, right_response, resp_size);
4823
4824        if (rv)
4825                conn_info(tconn, "Peer authenticated using %d bytes HMAC\n",
4826                     resp_size);
4827        else
4828                rv = -1;
4829
4830 fail:
4831        kfree(peers_ch);
4832        kfree(response);
4833        kfree(right_response);
4834
4835        return rv;
4836}
4837#endif
4838
4839int drbdd_init(struct drbd_thread *thi)
4840{
4841        struct drbd_tconn *tconn = thi->tconn;
4842        int h;
4843
4844        conn_info(tconn, "receiver (re)started\n");
4845
4846        do {
4847                h = conn_connect(tconn);
4848                if (h == 0) {
4849                        conn_disconnect(tconn);
4850                        schedule_timeout_interruptible(HZ);
4851                }
4852                if (h == -1) {
4853                        conn_warn(tconn, "Discarding network configuration.\n");
4854                        conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
4855                }
4856        } while (h == 0);
4857
4858        if (h > 0)
4859                drbdd(tconn);
4860
4861        conn_disconnect(tconn);
4862
4863        conn_info(tconn, "receiver terminated\n");
4864        return 0;
4865}
4866
4867/* ********* acknowledge sender ******** */
4868
4869static int got_conn_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4870{
4871        struct p_req_state_reply *p = pi->data;
4872        int retcode = be32_to_cpu(p->retcode);
4873
4874        if (retcode >= SS_SUCCESS) {
4875                set_bit(CONN_WD_ST_CHG_OKAY, &tconn->flags);
4876        } else {
4877                set_bit(CONN_WD_ST_CHG_FAIL, &tconn->flags);
4878                conn_err(tconn, "Requested state change failed by peer: %s (%d)\n",
4879                         drbd_set_st_err_str(retcode), retcode);
4880        }
4881        wake_up(&tconn->ping_wait);
4882
4883        return 0;
4884}
4885
4886static int got_RqSReply(struct drbd_tconn *tconn, struct packet_info *pi)
4887{
4888        struct drbd_conf *mdev;
4889        struct p_req_state_reply *p = pi->data;
4890        int retcode = be32_to_cpu(p->retcode);
4891
4892        mdev = vnr_to_mdev(tconn, pi->vnr);
4893        if (!mdev)
4894                return -EIO;
4895
4896        if (test_bit(CONN_WD_ST_CHG_REQ, &tconn->flags)) {
4897                D_ASSERT(tconn->agreed_pro_version < 100);
4898                return got_conn_RqSReply(tconn, pi);
4899        }
4900
4901        if (retcode >= SS_SUCCESS) {
4902                set_bit(CL_ST_CHG_SUCCESS, &mdev->flags);
4903        } else {
4904                set_bit(CL_ST_CHG_FAIL, &mdev->flags);
4905                dev_err(DEV, "Requested state change failed by peer: %s (%d)\n",
4906                        drbd_set_st_err_str(retcode), retcode);
4907        }
4908        wake_up(&mdev->state_wait);
4909
4910        return 0;
4911}
4912
4913static int got_Ping(struct drbd_tconn *tconn, struct packet_info *pi)
4914{
4915        return drbd_send_ping_ack(tconn);
4916
4917}
4918
4919static int got_PingAck(struct drbd_tconn *tconn, struct packet_info *pi)
4920{
4921        /* restore idle timeout */
4922        tconn->meta.socket->sk->sk_rcvtimeo = tconn->net_conf->ping_int*HZ;
4923        if (!test_and_set_bit(GOT_PING_ACK, &tconn->flags))
4924                wake_up(&tconn->ping_wait);
4925
4926        return 0;
4927}
4928
4929static int got_IsInSync(struct drbd_tconn *tconn, struct packet_info *pi)
4930{
4931        struct drbd_conf *mdev;
4932        struct p_block_ack *p = pi->data;
4933        sector_t sector = be64_to_cpu(p->sector);
4934        int blksize = be32_to_cpu(p->blksize);
4935
4936        mdev = vnr_to_mdev(tconn, pi->vnr);
4937        if (!mdev)
4938                return -EIO;
4939
4940        D_ASSERT(mdev->tconn->agreed_pro_version >= 89);
4941
4942        update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4943
4944        if (get_ldev(mdev)) {
4945                drbd_rs_complete_io(mdev, sector);
4946                drbd_set_in_sync(mdev, sector, blksize);
4947                /* rs_same_csums is supposed to count in units of BM_BLOCK_SIZE */
4948                mdev->rs_same_csum += (blksize >> BM_BLOCK_SHIFT);
4949                put_ldev(mdev);
4950        }
4951        dec_rs_pending(mdev);
4952        atomic_add(blksize >> 9, &mdev->rs_sect_in);
4953
4954        return 0;
4955}
4956
4957static int
4958validate_req_change_req_state(struct drbd_conf *mdev, u64 id, sector_t sector,
4959                              struct rb_root *root, const char *func,
4960                              enum drbd_req_event what, bool missing_ok)
4961{
4962        struct drbd_request *req;
4963        struct bio_and_error m;
4964
4965        spin_lock_irq(&mdev->tconn->req_lock);
4966        req = find_request(mdev, root, id, sector, missing_ok, func);
4967        if (unlikely(!req)) {
4968                spin_unlock_irq(&mdev->tconn->req_lock);
4969                return -EIO;
4970        }
4971        __req_mod(req, what, &m);
4972        spin_unlock_irq(&mdev->tconn->req_lock);
4973
4974        if (m.bio)
4975                complete_master_bio(mdev, &m);
4976        return 0;
4977}
4978
4979static int got_BlockAck(struct drbd_tconn *tconn, struct packet_info *pi)
4980{
4981        struct drbd_conf *mdev;
4982        struct p_block_ack *p = pi->data;
4983        sector_t sector = be64_to_cpu(p->sector);
4984        int blksize = be32_to_cpu(p->blksize);
4985        enum drbd_req_event what;
4986
4987        mdev = vnr_to_mdev(tconn, pi->vnr);
4988        if (!mdev)
4989                return -EIO;
4990
4991        update_peer_seq(mdev, be32_to_cpu(p->seq_num));
4992
4993        if (p->block_id == ID_SYNCER) {
4994                drbd_set_in_sync(mdev, sector, blksize);
4995                dec_rs_pending(mdev);
4996                return 0;
4997        }
4998        switch (pi->cmd) {
4999        case P_RS_WRITE_ACK:
5000                what = WRITE_ACKED_BY_PEER_AND_SIS;
5001                break;
5002        case P_WRITE_ACK:
5003                what = WRITE_ACKED_BY_PEER;
5004                break;
5005        case P_RECV_ACK:
5006                what = RECV_ACKED_BY_PEER;
5007                break;
5008        case P_SUPERSEDED:
5009                what = CONFLICT_RESOLVED;
5010                break;
5011        case P_RETRY_WRITE:
5012                what = POSTPONE_WRITE;
5013                break;
5014        default:
5015                BUG();
5016        }
5017
5018        return validate_req_change_req_state(mdev, p->block_id, sector,
5019                                             &mdev->write_requests, __func__,
5020                                             what, false);
5021}
5022
5023static int got_NegAck(struct drbd_tconn *tconn, struct packet_info *pi)
5024{
5025        struct drbd_conf *mdev;
5026        struct p_block_ack *p = pi->data;
5027        sector_t sector = be64_to_cpu(p->sector);
5028        int size = be32_to_cpu(p->blksize);
5029        int err;
5030
5031        mdev = vnr_to_mdev(tconn, pi->vnr);
5032        if (!mdev)
5033                return -EIO;
5034
5035        update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5036
5037        if (p->block_id == ID_SYNCER) {
5038                dec_rs_pending(mdev);
5039                drbd_rs_failed_io(mdev, sector, size);
5040                return 0;
5041        }
5042
5043        err = validate_req_change_req_state(mdev, p->block_id, sector,
5044                                            &mdev->write_requests, __func__,
5045                                            NEG_ACKED, true);
5046        if (err) {
5047                /* Protocol A has no P_WRITE_ACKs, but has P_NEG_ACKs.
5048                   The master bio might already be completed, therefore the
5049                   request is no longer in the collision hash. */
5050                /* In Protocol B we might already have got a P_RECV_ACK
5051                   but then get a P_NEG_ACK afterwards. */
5052                drbd_set_out_of_sync(mdev, sector, size);
5053        }
5054        return 0;
5055}
5056
5057static int got_NegDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5058{
5059        struct drbd_conf *mdev;
5060        struct p_block_ack *p = pi->data;
5061        sector_t sector = be64_to_cpu(p->sector);
5062
5063        mdev = vnr_to_mdev(tconn, pi->vnr);
5064        if (!mdev)
5065                return -EIO;
5066
5067        update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5068
5069        dev_err(DEV, "Got NegDReply; Sector %llus, len %u.\n",
5070            (unsigned long long)sector, be32_to_cpu(p->blksize));
5071
5072        return validate_req_change_req_state(mdev, p->block_id, sector,
5073                                             &mdev->read_requests, __func__,
5074                                             NEG_ACKED, false);
5075}
5076
5077static int got_NegRSDReply(struct drbd_tconn *tconn, struct packet_info *pi)
5078{
5079        struct drbd_conf *mdev;
5080        sector_t sector;
5081        int size;
5082        struct p_block_ack *p = pi->data;
5083
5084        mdev = vnr_to_mdev(tconn, pi->vnr);
5085        if (!mdev)
5086                return -EIO;
5087
5088        sector = be64_to_cpu(p->sector);
5089        size = be32_to_cpu(p->blksize);
5090
5091        update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5092
5093        dec_rs_pending(mdev);
5094
5095        if (get_ldev_if_state(mdev, D_FAILED)) {
5096                drbd_rs_complete_io(mdev, sector);
5097                switch (pi->cmd) {
5098                case P_NEG_RS_DREPLY:
5099                        drbd_rs_failed_io(mdev, sector, size);
5100                case P_RS_CANCEL:
5101                        break;
5102                default:
5103                        BUG();
5104                }
5105                put_ldev(mdev);
5106        }
5107
5108        return 0;
5109}
5110
5111static int got_BarrierAck(struct drbd_tconn *tconn, struct packet_info *pi)
5112{
5113        struct p_barrier_ack *p = pi->data;
5114        struct drbd_conf *mdev;
5115        int vnr;
5116
5117        tl_release(tconn, p->barrier, be32_to_cpu(p->set_size));
5118
5119        rcu_read_lock();
5120        idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5121                if (mdev->state.conn == C_AHEAD &&
5122                    atomic_read(&mdev->ap_in_flight) == 0 &&
5123                    !test_and_set_bit(AHEAD_TO_SYNC_SOURCE, &mdev->flags)) {
5124                        mdev->start_resync_timer.expires = jiffies + HZ;
5125                        add_timer(&mdev->start_resync_timer);
5126                }
5127        }
5128        rcu_read_unlock();
5129
5130        return 0;
5131}
5132
5133static int got_OVResult(struct drbd_tconn *tconn, struct packet_info *pi)
5134{
5135        struct drbd_conf *mdev;
5136        struct p_block_ack *p = pi->data;
5137        struct drbd_work *w;
5138        sector_t sector;
5139        int size;
5140
5141        mdev = vnr_to_mdev(tconn, pi->vnr);
5142        if (!mdev)
5143                return -EIO;
5144
5145        sector = be64_to_cpu(p->sector);
5146        size = be32_to_cpu(p->blksize);
5147
5148        update_peer_seq(mdev, be32_to_cpu(p->seq_num));
5149
5150        if (be64_to_cpu(p->block_id) == ID_OUT_OF_SYNC)
5151                drbd_ov_out_of_sync_found(mdev, sector, size);
5152        else
5153                ov_out_of_sync_print(mdev);
5154
5155        if (!get_ldev(mdev))
5156                return 0;
5157
5158        drbd_rs_complete_io(mdev, sector);
5159        dec_rs_pending(mdev);
5160
5161        --mdev->ov_left;
5162
5163        /* let's advance progress step marks only for every other megabyte */
5164        if ((mdev->ov_left & 0x200) == 0x200)
5165                drbd_advance_rs_marks(mdev, mdev->ov_left);
5166
5167        if (mdev->ov_left == 0) {
5168                w = kmalloc(sizeof(*w), GFP_NOIO);
5169                if (w) {
5170                        w->cb = w_ov_finished;
5171                        w->mdev = mdev;
5172                        drbd_queue_work(&mdev->tconn->sender_work, w);
5173                } else {
5174                        dev_err(DEV, "kmalloc(w) failed.");
5175                        ov_out_of_sync_print(mdev);
5176                        drbd_resync_finished(mdev);
5177                }
5178        }
5179        put_ldev(mdev);
5180        return 0;
5181}
5182
5183static int got_skip(struct drbd_tconn *tconn, struct packet_info *pi)
5184{
5185        return 0;
5186}
5187
5188static int tconn_finish_peer_reqs(struct drbd_tconn *tconn)
5189{
5190        struct drbd_conf *mdev;
5191        int vnr, not_empty = 0;
5192
5193        do {
5194                clear_bit(SIGNAL_ASENDER, &tconn->flags);
5195                flush_signals(current);
5196
5197                rcu_read_lock();
5198                idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5199                        kref_get(&mdev->kref);
5200                        rcu_read_unlock();
5201                        if (drbd_finish_peer_reqs(mdev)) {
5202                                kref_put(&mdev->kref, &drbd_minor_destroy);
5203                                return 1;
5204                        }
5205                        kref_put(&mdev->kref, &drbd_minor_destroy);
5206                        rcu_read_lock();
5207                }
5208                set_bit(SIGNAL_ASENDER, &tconn->flags);
5209
5210                spin_lock_irq(&tconn->req_lock);
5211                idr_for_each_entry(&tconn->volumes, mdev, vnr) {
5212                        not_empty = !list_empty(&mdev->done_ee);
5213                        if (not_empty)
5214                                break;
5215                }
5216                spin_unlock_irq(&tconn->req_lock);
5217                rcu_read_unlock();
5218        } while (not_empty);
5219
5220        return 0;
5221}
5222
5223struct asender_cmd {
5224        size_t pkt_size;
5225        int (*fn)(struct drbd_tconn *tconn, struct packet_info *);
5226};
5227
5228static struct asender_cmd asender_tbl[] = {
5229        [P_PING]            = { 0, got_Ping },
5230        [P_PING_ACK]        = { 0, got_PingAck },
5231        [P_RECV_ACK]        = { sizeof(struct p_block_ack), got_BlockAck },
5232        [P_WRITE_ACK]       = { sizeof(struct p_block_ack), got_BlockAck },
5233        [P_RS_WRITE_ACK]    = { sizeof(struct p_block_ack), got_BlockAck },
5234        [P_SUPERSEDED]   = { sizeof(struct p_block_ack), got_BlockAck },
5235        [P_NEG_ACK]         = { sizeof(struct p_block_ack), got_NegAck },
5236        [P_NEG_DREPLY]      = { sizeof(struct p_block_ack), got_NegDReply },
5237        [P_NEG_RS_DREPLY]   = { sizeof(struct p_block_ack), got_NegRSDReply },
5238        [P_OV_RESULT]       = { sizeof(struct p_block_ack), got_OVResult },
5239        [P_BARRIER_ACK]     = { sizeof(struct p_barrier_ack), got_BarrierAck },
5240        [P_STATE_CHG_REPLY] = { sizeof(struct p_req_state_reply), got_RqSReply },
5241        [P_RS_IS_IN_SYNC]   = { sizeof(struct p_block_ack), got_IsInSync },
5242        [P_DELAY_PROBE]     = { sizeof(struct p_delay_probe93), got_skip },
5243        [P_RS_CANCEL]       = { sizeof(struct p_block_ack), got_NegRSDReply },
5244        [P_CONN_ST_CHG_REPLY]={ sizeof(struct p_req_state_reply), got_conn_RqSReply },
5245        [P_RETRY_WRITE]     = { sizeof(struct p_block_ack), got_BlockAck },
5246};
5247
5248int drbd_asender(struct drbd_thread *thi)
5249{
5250        struct drbd_tconn *tconn = thi->tconn;
5251        struct asender_cmd *cmd = NULL;
5252        struct packet_info pi;
5253        int rv;
5254        void *buf    = tconn->meta.rbuf;
5255        int received = 0;
5256        unsigned int header_size = drbd_header_size(tconn);
5257        int expect   = header_size;
5258        bool ping_timeout_active = false;
5259        struct net_conf *nc;
5260        int ping_timeo, tcp_cork, ping_int;
5261        struct sched_param param = { .sched_priority = 2 };
5262
5263        rv = sched_setscheduler(current, SCHED_RR, &param);
5264        if (rv < 0)
5265                conn_err(tconn, "drbd_asender: ERROR set priority, ret=%d\n", rv);
5266
5267        while (get_t_state(thi) == RUNNING) {
5268                drbd_thread_current_set_cpu(thi);
5269
5270                rcu_read_lock();
5271                nc = rcu_dereference(tconn->net_conf);
5272                ping_timeo = nc->ping_timeo;
5273                tcp_cork = nc->tcp_cork;
5274                ping_int = nc->ping_int;
5275                rcu_read_unlock();
5276
5277                if (test_and_clear_bit(SEND_PING, &tconn->flags)) {
5278                        if (drbd_send_ping(tconn)) {
5279                                conn_err(tconn, "drbd_send_ping has failed\n");
5280                                goto reconnect;
5281                        }
5282                        tconn->meta.socket->sk->sk_rcvtimeo = ping_timeo * HZ / 10;
5283                        ping_timeout_active = true;
5284                }
5285
5286                /* TODO: conditionally cork; it may hurt latency if we cork without
5287                   much to send */
5288                if (tcp_cork)
5289                        drbd_tcp_cork(tconn->meta.socket);
5290                if (tconn_finish_peer_reqs(tconn)) {
5291                        conn_err(tconn, "tconn_finish_peer_reqs() failed\n");
5292                        goto reconnect;
5293                }
5294                /* but unconditionally uncork unless disabled */
5295                if (tcp_cork)
5296                        drbd_tcp_uncork(tconn->meta.socket);
5297
5298                /* short circuit, recv_msg would return EINTR anyways. */
5299                if (signal_pending(current))
5300                        continue;
5301
5302                rv = drbd_recv_short(tconn->meta.socket, buf, expect-received, 0);
5303                clear_bit(SIGNAL_ASENDER, &tconn->flags);
5304
5305                flush_signals(current);
5306
5307                /* Note:
5308                 * -EINTR        (on meta) we got a signal
5309                 * -EAGAIN       (on meta) rcvtimeo expired
5310                 * -ECONNRESET   other side closed the connection
5311                 * -ERESTARTSYS  (on data) we got a signal
5312                 * rv <  0       other than above: unexpected error!
5313                 * rv == expected: full header or command
5314                 * rv <  expected: "woken" by signal during receive
5315                 * rv == 0       : "connection shut down by peer"
5316                 */
5317                if (likely(rv > 0)) {
5318                        received += rv;
5319                        buf      += rv;
5320                } else if (rv == 0) {
5321                        if (test_bit(DISCONNECT_SENT, &tconn->flags)) {
5322                                long t;
5323                                rcu_read_lock();
5324                                t = rcu_dereference(tconn->net_conf)->ping_timeo * HZ/10;
5325                                rcu_read_unlock();
5326
5327                                t = wait_event_timeout(tconn->ping_wait,
5328                                                       tconn->cstate < C_WF_REPORT_PARAMS,
5329                                                       t);
5330                                if (t)
5331                                        break;
5332                        }
5333                        conn_err(tconn, "meta connection shut down by peer.\n");
5334                        goto reconnect;
5335                } else if (rv == -EAGAIN) {
5336                        /* If the data socket received something meanwhile,
5337                         * that is good enough: peer is still alive. */
5338                        if (time_after(tconn->last_received,
5339                                jiffies - tconn->meta.socket->sk->sk_rcvtimeo))
5340                                continue;
5341                        if (ping_timeout_active) {
5342                                conn_err(tconn, "PingAck did not arrive in time.\n");
5343                                goto reconnect;
5344                        }
5345                        set_bit(SEND_PING, &tconn->flags);
5346                        continue;
5347                } else if (rv == -EINTR) {
5348                        continue;
5349                } else {
5350                        conn_err(tconn, "sock_recvmsg returned %d\n", rv);
5351                        goto reconnect;
5352                }
5353
5354                if (received == expect && cmd == NULL) {
5355                        if (decode_header(tconn, tconn->meta.rbuf, &pi))
5356                                goto reconnect;
5357                        cmd = &asender_tbl[pi.cmd];
5358                        if (pi.cmd >= ARRAY_SIZE(asender_tbl) || !cmd->fn) {
5359                                conn_err(tconn, "Unexpected meta packet %s (0x%04x)\n",
5360                                         cmdname(pi.cmd), pi.cmd);
5361                                goto disconnect;
5362                        }
5363                        expect = header_size + cmd->pkt_size;
5364                        if (pi.size != expect - header_size) {
5365                                conn_err(tconn, "Wrong packet size on meta (c: %d, l: %d)\n",
5366                                        pi.cmd, pi.size);
5367                                goto reconnect;
5368                        }
5369                }
5370                if (received == expect) {
5371                        bool err;
5372
5373                        err = cmd->fn(tconn, &pi);
5374                        if (err) {
5375                                conn_err(tconn, "%pf failed\n", cmd->fn);
5376                                goto reconnect;
5377                        }
5378
5379                        tconn->last_received = jiffies;
5380
5381                        if (cmd == &asender_tbl[P_PING_ACK]) {
5382                                /* restore idle timeout */
5383                                tconn->meta.socket->sk->sk_rcvtimeo = ping_int * HZ;
5384                                ping_timeout_active = false;
5385                        }
5386
5387                        buf      = tconn->meta.rbuf;
5388                        received = 0;
5389                        expect   = header_size;
5390                        cmd      = NULL;
5391                }
5392        }
5393
5394        if (0) {
5395reconnect:
5396                conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD);
5397                conn_md_sync(tconn);
5398        }
5399        if (0) {
5400disconnect:
5401                conn_request_state(tconn, NS(conn, C_DISCONNECTING), CS_HARD);
5402        }
5403        clear_bit(SIGNAL_ASENDER, &tconn->flags);
5404
5405        conn_info(tconn, "asender terminated\n");
5406
5407        return 0;
5408}
5409